From 0982a213dd055f3a5dba18c08bf7b9a730386a95 Mon Sep 17 00:00:00 2001 From: Valentin Ochs Date: Tue, 10 Aug 2021 01:28:33 +0200 Subject: [PATCH] mqtt wrapper --- door.py | 139 +++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 86 insertions(+), 53 deletions(-) diff --git a/door.py b/door.py index cf10716..b33478b 100644 --- a/door.py +++ b/door.py @@ -28,6 +28,43 @@ config = parser.parse_args() def timestamp(): return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") +class MqttValue: + def __init__(self, client, topic, persistent = False, start_value = None, *, translate = None, max_update = 0.1): + self.client = client + self.topic = topic + self.persistent = persistent + self.value = start_value + self.last_update = datetime.datetime.now() - datetime.timedelta(seconds = max_update) + self.last_update_value = None + self.max_update = max_update + + if translate == None: + self.translate = str + elif callable(translate): + self.translate = translate + else: + self.translate = lambda x: translate[x] + + if start_value != None: + self.update(start_value) + + def update(self, value, force = False, no_update = False): + if value != self.value or value != self.last_update_value: + self.value = value + if force or (not no_update and value != self.last_update_value and (datetime.datetime.now() - self.last_update).total_seconds() >= self.max_update): + self.last_update = datetime.datetime.now() + self.last_update_value = value + self.client.publish(self.topic, self.translate(value), qos = 2, retain = self.persistent) + + def force_update(self, value): + self.update(value, force = True) + + def __call__(self, value = None, **kwargs): + if value != None: + self.update(value, **kwargs) + else: + return self.value + class DoorControl: # Actions IDLE, CLOSE, OPEN_THEN_CLOSE, OPEN, CLOSE_THEN_OPEN, ERROR = range(6) @@ -60,10 +97,13 @@ class DoorControl: self.mqttc.loop_start() # Current door state - self.state = None - self.state_pos = 0 + self.state = MqttValue(self.mqttc, "door/state/value", True, translate = self.state_names) + self.state_target = MqttValue(self.mqttc, "door/state/target", True, translate = self.state_names, max_update = 0) + self.state_pos = MqttValue(self.mqttc, "door/position/value", True, 0) + self.last_invalid_token = MqttValue(self.mqttc, "door/token/last_invalid", True) + self.speed = MqttValue(self.mqttc, "door/position/speed") # Current target action - self.action = DoorControl.CLOSE + self.action = MqttValue(self.mqttc, "door/state/action", True, DoorControl.CLOSE, translate = self.action_names, max_update = 0) # Start time of the current action self.start_time = None # How often the action was repeated @@ -104,10 +144,6 @@ class DoorControl: return logger - def mqtt(self, topic: str, msg: str, persistent: bool = True): - """Publishes data to a topic""" - self.mqttc.publish("door/" + topic, msg, qos=2, retain=persistent) - def _open_control_socket(self, config): """(Re-)creates and opens the control socket. Config must have a control_socket member.""" self.logger.debug("Opening control socket") @@ -127,7 +163,7 @@ class DoorControl: self.serial_port = serial.Serial(config.serial_port, timeout=2) self._send_door_cmd(b'r') except: - serial_port = None + self.serial_port = None return self.serial_port def _open_nfc_fifo(self, config): @@ -169,18 +205,14 @@ class DoorControl: def _set_position(self, data: int): """Set a new door position""" - if self.state_pos != data: - self.mqtt("position/value", data, True) - self.state_pos = data - if data > ERROR_THRESHOLD and self.state != DoorControl.ERROR: - self.logger.error("Invalid position:", state) - self.state = DoorControl.ERROR - elif data > OPEN_THRESHOLD and self.state != DoorControl.OPEN: - self.mqtt("state/value", "open", True) - self.state = DoorControl.OPEN - elif data < CLOSED_THRESHOLD and self.state != DoorControl.CLOSE: - self.mqtt("state/value", "closed", True) - self.state = DoorControl.CLOSE + self.state_pos(data) + if data > ERROR_THRESHOLD and self.state() != DoorControl.ERROR: + self.logger.error("Invalid position:", state) + self.state(DoorControl.ERROR) + elif data > OPEN_THRESHOLD and self.state() != DoorControl.OPEN: + self.state(DoorControl.OPEN) + elif data < CLOSED_THRESHOLD and self.state() != DoorControl.CLOSE: + self.state(DoorControl.CLOSE) def _check_reporting(self, current: int): if current == 0: @@ -213,7 +245,7 @@ class DoorControl: self.last_door_pos_time = datetime.datetime.now() self._send_door_cmd(b"R") self.handle_door_line() - return self.state + return self.state() def handle_door_state(self): """Checks the door state and executes any actions necessary to reach the target state""" @@ -231,13 +263,6 @@ class DoorControl: if 1. / delta_t > MAX_UPDATE_RATE: return - speed = abs(self.state_pos - self.last_position) / delta_t - self.last_position = self.state_pos - self.last_handle_state_timestamp = now - - if speed >= 0 or delta_t >= 1. / UPDATE_RATE: - self.mqtt("position/speed", f"{speed}") - # If no serial port, try to open it or return if not self.serial_port: try: @@ -247,21 +272,27 @@ class DoorControl: # Get new state self.poll_door_state() - self.last_handled_state = self.state + self.last_handled_state = self.state() + + speed = abs(self.state_pos() - self.last_position) / delta_t + speed = 10 * round(speed / 10) + self.speed(speed) + self.last_position = self.state_pos() + self.last_handle_state_timestamp = now - if self.state == DoorControl.ERROR: + if self.state() == DoorControl.ERROR: self.logger.error("Restarting the MCU and exiting.") self.send_door_cmd(b'S') return # Idle + change = key? - if self.action == DoorControl.IDLE: - if self.state != old_state: - self.logger.info(f"Door changed unexpectedly: {DoorControl.state_names[self.state]}") + if self.action() == DoorControl.IDLE: + if self.state() != old_state: + self.logger.info(f"Door changed unexpectedly: {DoorControl.state_names[self.state()]}") self.start_time = now if self.start_time and (now - self.start_time).total_seconds() >= self.config.state_timeout: self.start_time = None - if self.state_pos <= CLOSED_THRESHOLD and self.state_pos > CLOSED_WANT: + if self.state_pos() <= CLOSED_THRESHOLD and self.state_pos() > CLOSED_WANT: self.logger.info("Closing door a bit more") self._send_door_cmd(target_state_cmd[DoorControl.CLOSE]) return @@ -276,50 +307,52 @@ class DoorControl: if self.start_time is None: self.start_time = now - self._send_door_cmd(target_state_cmd[actions[self.action][0]]) + self._send_door_cmd(target_state_cmd[actions[self.action()][0]]) # Target state reached - if self.state == actions[self.action][0]: + if self.state() == actions[self.action()][0]: # Select next action, reset start time and repetitions - self.action = actions[self.action][1] + self.action(actions[self.action()][1]) self.start_time = now self.repeats = 0 # On idle, we're done - if self.action == DoorControl.IDLE: - self.logger.debug(f"Reached target position: {DoorControl.state_names[self.state]}") + if self.action() == DoorControl.IDLE: + self.logger.debug(f"Reached target position: {DoorControl.state_names[self.state()]}") return # Execution time t = (now - self.start_time).total_seconds() - if t >= self.config.state_timeout or (t >= self.config.state_timeout_speed and speed < MIN_SPEED and delta_t >= 1. / UPDATE_RATE): + if t >= self.config.state_timeout or (t >= self.config.state_timeout_speed and self.speed() < MIN_SPEED and delta_t >= 1. / UPDATE_RATE): # Timeout -> switch to timeout action - self.state = actions[self.action][0] - self.action = actions[self.action][2] + self.state(actions[self.action()][0], force = True, no_update = True) + self.action(actions[self.action()][2]) self.start_time = None self.repeats = 0 - self.logger.debug(f"Timeout. Switching to {DoorControl.action_names[self.action]}") + self.logger.debug(f"Timeout. Switching to {DoorControl.action_names[self.action()]}") elif t >= (1 + self.repeats) * self.config.repeat_time: # Repeat every couple of seconds self.repeats += 1 - self.logger.debug(f"Repeating command: {target_state_cmd[actions[self.action][0]]}") - self._send_door_cmd(target_state_cmd[actions[self.action][0]]) + self.logger.debug(f"Repeating command: {target_state_cmd[actions[self.action()][0]]}") + self._send_door_cmd(target_state_cmd[actions[self.action()][0]]) def open_door(self): self.logger.info("Opening the door") - self.mqtt("state/target", "open", True) - self.action = DoorControl.OPEN + self.state_target(DoorControl.OPEN) + self.action(DoorControl.OPEN) self.start_time = None self.handle_door_state() def close_door(self): self.logger.info("Closing the door") - self.mqtt("state/target", "closed", True) - self.action = DoorControl.CLOSE + self.state_target(DoorControl.CLOSE) + self.action(DoorControl.CLOSE) self.start_time = None self.handle_door_state() def toggle_door_state(self): - if self.state == DoorControl.CLOSE: + if not self.action() == DoorControl.IDLE: + return + if self.state_target() == DoorControl.CLOSE: self.open_door() else: self.close_door() @@ -351,7 +384,7 @@ class DoorControl: self.logger.warning(f"Token {token} of {data['name']} expired on {data['valid_thru']}") else: self.logger.warning(f"Invalid token: {token}") - self.mqtt("token/last_invalid", "%s;%s" % (timestamp(), token)) + self.last_invalid_token(f"{timestamp()};{token}") class LineBuffer(object): def __init__(self, f, handler): @@ -396,8 +429,8 @@ class DoorControl: elif cmd == 'stat': send("Door status is %s, position is %d. Current action: %s (%g seconds ago)\n" % ( DoorControl.state_names.get(self.state, "None"), - self.state_pos, - DoorControl.action_names[self.action], + self.state_pos.value, + DoorControl.action_names[self.action()], (datetime.datetime.now() - self.start_time).total_seconds()))