Improve state handling
This commit is contained in:
parent
d7fc4e366b
commit
eccf936f6c
|
@ -11,11 +11,12 @@ import serial
|
|||
|
||||
UPDATE_RATE = 2
|
||||
MAX_UPDATE_RATE = 20
|
||||
MIN_SPEED = 5
|
||||
ERROR_THRESHOLD = 250
|
||||
OPEN_THRESHOLD = 190
|
||||
CLOSED_THRESHOLD = 160
|
||||
CLOSED_WANT = 40
|
||||
CLOSED_THRESHOLD = 180
|
||||
CLOSED_WANT = 50
|
||||
MIN_IDLE_TIME = 1
|
||||
COMMAND_IDLE_TIME = 1.5
|
||||
|
||||
def timestamp():
|
||||
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
@ -69,8 +70,6 @@ class DoorControl:
|
|||
IDLE: "idling",
|
||||
OPEN: "waiting for open door",
|
||||
CLOSE: "waiting for closed door",
|
||||
OPEN_THEN_CLOSE: "waiting for open door, then closing again",
|
||||
CLOSE_THEN_OPEN: "waiting for closed door, then opening again",
|
||||
ERROR: "error",
|
||||
}
|
||||
|
||||
|
@ -90,7 +89,7 @@ class DoorControl:
|
|||
|
||||
# Current door state
|
||||
self.state = MqttValue(self.mqttc, "door/state/value", True, translate = self.state_names)
|
||||
self.state_target = MqttValue(self.mqttc, "door/state/target", True, translate = self.state_names, max_update = 0)
|
||||
self.target_state = MqttValue(self.mqttc, "door/state/target", True, DoorControl.CLOSE, translate = self.state_names, max_update = 0)
|
||||
self.state_pos = MqttValue(self.mqttc, "door/position/value", True, 0)
|
||||
self.last_invalid_token = MqttValue(self.mqttc, "door/token/last_invalid", True)
|
||||
self.speed = MqttValue(self.mqttc, "door/position/speed")
|
||||
|
@ -105,6 +104,14 @@ class DoorControl:
|
|||
self.last_door_pos_time = datetime.datetime.now() - datetime.timedelta(minutes = 10)
|
||||
self.last_position = 0
|
||||
self.last_handle_state_timestamp = datetime.datetime.now()
|
||||
self.idle_start_time = None
|
||||
self.last_command_time = datetime.datetime.now() - datetime.timedelta(COMMAND_IDLE_TIME)
|
||||
|
||||
def idle_time(self):
|
||||
if self.idle_start_time != None:
|
||||
return (datetime.datetime.now() - self.idle_start_time).total_seconds()
|
||||
else:
|
||||
return 0
|
||||
|
||||
def _send_door_cmd(self, cmd: bytes):
|
||||
"""Send a command to the door."""
|
||||
|
@ -195,15 +202,23 @@ class DoorControl:
|
|||
|
||||
return valid
|
||||
|
||||
def _set_position(self, data: int):
|
||||
def _set_position(self, data: str):
|
||||
"""Set a new door position"""
|
||||
self.state_pos(data)
|
||||
if data > ERROR_THRESHOLD and self.state() != DoorControl.ERROR:
|
||||
pos, speed = [ int(x) for x in data.strip().split() ]
|
||||
self.state_pos(pos)
|
||||
self.speed(speed)
|
||||
|
||||
if self.speed() != 0 and self.idle_start_time != None:
|
||||
self.idle_start_time = None
|
||||
elif self.speed() == 0 and self.idle_start_time == None:
|
||||
self.idle_start_time = datetime.datetime.now()
|
||||
|
||||
if pos > ERROR_THRESHOLD and self.state() != DoorControl.ERROR:
|
||||
self.logger.error("Invalid position:", state)
|
||||
self.state(DoorControl.ERROR)
|
||||
elif data > OPEN_THRESHOLD and self.state() != DoorControl.OPEN:
|
||||
elif pos > OPEN_THRESHOLD and self.state() != DoorControl.OPEN:
|
||||
self.state(DoorControl.OPEN)
|
||||
elif data < CLOSED_THRESHOLD and self.state() != DoorControl.CLOSE:
|
||||
elif pos < CLOSED_THRESHOLD and self.state() != DoorControl.CLOSE:
|
||||
self.state(DoorControl.CLOSE)
|
||||
|
||||
def _check_reporting(self, current: int):
|
||||
|
@ -218,7 +233,7 @@ class DoorControl:
|
|||
data = self._read_door_line().strip()
|
||||
|
||||
handling = {
|
||||
"pos": (int, self._set_position),
|
||||
"pos": (str, self._set_position),
|
||||
"pos reporting": (int, self._check_reporting),
|
||||
}
|
||||
|
||||
|
@ -231,9 +246,9 @@ class DoorControl:
|
|||
pass
|
||||
|
||||
def poll_door_state(self):
|
||||
"""Checks the door state if the last polling was at least 5 seconds ago, and returns the current state"""
|
||||
"""Checks the door state if the last polling was a while ago, and returns the current state"""
|
||||
t = (datetime.datetime.now() - self.last_door_pos_time).total_seconds()
|
||||
if t >= 5:
|
||||
if t >= 0.5:
|
||||
self.last_door_pos_time = datetime.datetime.now()
|
||||
self._send_door_cmd(b"R")
|
||||
self.handle_door_line()
|
||||
|
@ -252,9 +267,6 @@ class DoorControl:
|
|||
old_state = self.last_handled_state
|
||||
delta_t = (now - self.last_handle_state_timestamp).total_seconds()
|
||||
|
||||
if 1. / delta_t > MAX_UPDATE_RATE:
|
||||
return
|
||||
|
||||
# If no serial port, try to open it or return
|
||||
if not self.serial_port:
|
||||
try:
|
||||
|
@ -262,14 +274,13 @@ class DoorControl:
|
|||
except:
|
||||
return
|
||||
|
||||
# Get new state
|
||||
self.poll_door_state()
|
||||
self.last_handled_state = self.state()
|
||||
|
||||
speed = abs(self.state_pos() - self.last_position) / delta_t
|
||||
speed = 10 * round(speed / 10)
|
||||
self.speed(speed)
|
||||
self.last_position = self.state_pos()
|
||||
if (self.idle_time() < MIN_IDLE_TIME) and delta_t < 5:
|
||||
return
|
||||
|
||||
# Get new state
|
||||
self.last_handled_state = self.state()
|
||||
self.last_handle_state_timestamp = now
|
||||
|
||||
if self.state() == DoorControl.ERROR:
|
||||
|
@ -281,70 +292,71 @@ class DoorControl:
|
|||
if self.action() == DoorControl.IDLE:
|
||||
if self.state() != old_state:
|
||||
self.logger.info(f"Door changed unexpectedly: {DoorControl.state_names[self.state()]}")
|
||||
self.target_state(self.state())
|
||||
self.start_time = now
|
||||
if self.start_time and (now - self.start_time).total_seconds() >= self.config.state_timeout:
|
||||
if self.state() == DoorControl.CLOSE \
|
||||
and self.start_time \
|
||||
and (now - self.start_time).total_seconds() >= self.config.state_timeout:
|
||||
self.start_time = None
|
||||
if self.state_pos() <= CLOSED_THRESHOLD and self.state_pos() > CLOSED_WANT:
|
||||
self.logger.info("Closing door a bit more")
|
||||
self._send_door_cmd(target_state_cmd[DoorControl.CLOSE])
|
||||
return
|
||||
|
||||
# Target state, next action, timeout action
|
||||
actions = {
|
||||
DoorControl.OPEN: (DoorControl.OPEN, DoorControl.IDLE, DoorControl.CLOSE_THEN_OPEN ),
|
||||
DoorControl.CLOSE: (DoorControl.CLOSE, DoorControl.IDLE, DoorControl.OPEN_THEN_CLOSE ),
|
||||
DoorControl.OPEN_THEN_CLOSE: (DoorControl.OPEN, DoorControl.CLOSE, DoorControl.CLOSE ),
|
||||
DoorControl.CLOSE_THEN_OPEN: (DoorControl.CLOSE, DoorControl.OPEN, DoorControl.OPEN )
|
||||
}
|
||||
|
||||
if self.start_time is None:
|
||||
self.start_time = now
|
||||
self._send_door_cmd(target_state_cmd[actions[self.action()][0]])
|
||||
self._send_door_cmd(target_state_cmd[self.action()])
|
||||
|
||||
# Target state reached
|
||||
if self.state() == actions[self.action()][0]:
|
||||
# Select next action, reset start time and repetitions
|
||||
self.action(actions[self.action()][1])
|
||||
self.start_time = now
|
||||
self.repeats = 0
|
||||
# On idle, we're done
|
||||
if self.action() == DoorControl.IDLE:
|
||||
if self.state() == self.target_state():
|
||||
self.action(DoorControl.IDLE)
|
||||
self.logger.debug(f"Reached target position: {DoorControl.state_names[self.state()]}")
|
||||
self.repeats = 0
|
||||
self.start_time = now
|
||||
return
|
||||
elif self.state() != old_state:
|
||||
# Changed state, try to go to the target state again
|
||||
self.start_time = now
|
||||
self.action(old_state)
|
||||
self._send_door_cmd(target_state_cmd[self.action()])
|
||||
|
||||
# Execution time
|
||||
t = (now - self.start_time).total_seconds()
|
||||
if t >= self.config.state_timeout or (t >= self.config.state_timeout_speed and self.speed() < MIN_SPEED and delta_t >= 1. / UPDATE_RATE):
|
||||
if t >= self.config.state_timeout:
|
||||
# Timeout -> switch to timeout action
|
||||
self.state(actions[self.action()][0], force = True, no_update = True)
|
||||
self.action(actions[self.action()][2])
|
||||
if self.action() == DoorControl.OPEN:
|
||||
self.action(DoorControl.CLOSE)
|
||||
else:
|
||||
self.action(DoorControl.OPEN)
|
||||
self.start_time = None
|
||||
self.repeats = 0
|
||||
self.logger.debug(f"Timeout. Switching to {DoorControl.action_names[self.action()]}")
|
||||
elif t >= (1 + self.repeats) * self.config.repeat_time:
|
||||
# Repeat every couple of seconds
|
||||
self.repeats += 1
|
||||
self.logger.debug(f"Repeating command: {target_state_cmd[actions[self.action()][0]]}")
|
||||
self._send_door_cmd(target_state_cmd[actions[self.action()][0]])
|
||||
self.logger.debug(f"Repeating command: {target_state_cmd[self.action()]}")
|
||||
self._send_door_cmd(target_state_cmd[self.action()])
|
||||
|
||||
def open_door(self):
|
||||
self.logger.info("Opening the door")
|
||||
self.state_target(DoorControl.OPEN)
|
||||
self.target_state(DoorControl.OPEN)
|
||||
if self.action() == DoorControl.IDLE:
|
||||
self.action(DoorControl.OPEN)
|
||||
self.start_time = None
|
||||
self.handle_door_state()
|
||||
|
||||
def close_door(self):
|
||||
self.logger.info("Closing the door")
|
||||
self.state_target(DoorControl.CLOSE)
|
||||
self.target_state(DoorControl.CLOSE)
|
||||
if self.action() == DoorControl.IDLE:
|
||||
self.action(DoorControl.CLOSE)
|
||||
self.start_time = None
|
||||
self.handle_door_state()
|
||||
|
||||
def toggle_door_state(self):
|
||||
if not self.action() == DoorControl.IDLE:
|
||||
if self.action() != DoorControl.IDLE:
|
||||
return
|
||||
if self.state_target() == DoorControl.CLOSE:
|
||||
if self.target_state() == DoorControl.CLOSE:
|
||||
self.open_door()
|
||||
else:
|
||||
self.close_door()
|
||||
|
@ -434,7 +446,7 @@ def main():
|
|||
parser.add_argument("--log_file", default="/tmp/nfc.log")
|
||||
parser.add_argument("--state_timeout", type=float, default=10)
|
||||
parser.add_argument("--state_timeout_speed", type=float, default=3)
|
||||
parser.add_argument("--repeat_time", type=float, default=5)
|
||||
parser.add_argument("--repeat_time", type=float, default=COMMAND_IDLE_TIME)
|
||||
parser.add_argument("--mqtt_host", default="10.10.21.2")
|
||||
|
||||
config = parser.parse_args()
|
||||
|
|
Loading…
Reference in New Issue
Block a user