#!/usr/bin/python import argparse import datetime import logging import os import select import socket import paho.mqtt.client as mqcl import serial UPDATE_RATE = 2 MAX_UPDATE_RATE = 20 ERROR_THRESHOLD = 250 OPEN_THRESHOLD = 190 CLOSED_THRESHOLD = 180 CLOSED_WANT = 50 MIN_IDLE_TIME = 1 COMMAND_IDLE_TIME = 1.5 def timestamp(): return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") class MqttValue: def __init__(self, client, topic, persistent = False, start_value = None, *, translate = None, max_update = 0.1): self.client = client self.topic = topic self.persistent = persistent self.value = start_value self.last_update = datetime.datetime.now() - datetime.timedelta(seconds = max_update) self.last_update_value = None self.max_update = max_update if translate == None: self.translate = str elif callable(translate): self.translate = translate else: self.translate = lambda x: translate[x] if start_value != None: self.update(start_value) def update(self, value, force = False, no_update = False): if value != self.value or value != self.last_update_value: self.value = value if force or (not no_update and value != self.last_update_value and (datetime.datetime.now() - self.last_update).total_seconds() >= self.max_update): self.last_update = datetime.datetime.now() self.last_update_value = value self.client.publish(self.topic, self.translate(value), qos = 2, retain = self.persistent) def force_update(self, value): self.update(value, force = True) def __call__(self, value = None, **kwargs): if value != None: self.update(value, **kwargs) else: return self.value class DoorControl: # Actions IDLE, CLOSE, OPEN_THEN_CLOSE, OPEN, CLOSE_THEN_OPEN, ERROR = range(6) state_names = { OPEN: "open", CLOSE: "close", ERROR: "error", } action_names = { IDLE: "idling", OPEN: "waiting for open door", CLOSE: "waiting for closed door", ERROR: "error", } def __init__(self, config): self.config = config self.mqttc = mqcl.Client() self.logger = self._setup_logging(config) self._open_serial_port(config) self.valid_tokens = self._read_valid_tokens(config) self.nfc_fifo = self._open_nfc_fifo(config) self.control_socket, self.comm_channels = self._open_control_socket(config) self.mqttc.on_connect = lambda client, userdata, flags, rc: self.logger.debug(f"Connected to mqtt host ({rc})") self.mqttc.connect_async(config.mqtt_host, keepalive=60) self.mqttc.loop_start() # Current door state self.state = MqttValue(self.mqttc, "door/state/value", True, translate = self.state_names) self.target_state = MqttValue(self.mqttc, "door/state/target", True, DoorControl.CLOSE, translate = self.state_names, max_update = 0) self.state_pos = MqttValue(self.mqttc, "door/position/value", True, 0) self.last_invalid_token = MqttValue(self.mqttc, "door/token/last_invalid", True) self.speed = MqttValue(self.mqttc, "door/position/speed") # Current target action self.action = MqttValue(self.mqttc, "door/state/action", True, DoorControl.CLOSE, translate = self.action_names, max_update = 0) # Start time of the current action self.start_time = None # How often the action was repeated self.repeats = 0 self.last_handled_state = self.OPEN self.last_door_pos_time = datetime.datetime.now() - datetime.timedelta(minutes = 10) self.last_position = 0 self.last_handle_state_timestamp = datetime.datetime.now() self.idle_start_time = None self.last_command_time = datetime.datetime.now() - datetime.timedelta(COMMAND_IDLE_TIME) def idle_time(self): if self.idle_start_time != None: return (datetime.datetime.now() - self.idle_start_time).total_seconds() else: return 0 def _send_door_cmd(self, cmd: bytes): """Send a command to the door.""" if cmd != b'R': self.logger.debug(f"Sending {cmd}") return self.serial_port.write(cmd) def _read_door_line(self): """Read a single line from the serial port""" return self.serial_port.readline().decode('ascii') def _setup_logging(self, config): """Set up logging""" log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' logging.basicConfig(force = True, level = logging.DEBUG, format = log_format) # create logger logger = logging.getLogger('nfc_log') # create console handler and set level to debug fh = logging.FileHandler(config.log_file) fh.setLevel(logging.DEBUG) # create formatter formatter = logging.Formatter(log_format) fh.setFormatter(formatter) logger.addHandler(fh) return logger def _open_control_socket(self, config): """(Re-)creates and opens the control socket. Config must have a control_socket member.""" self.logger.debug("Opening control socket") if os.path.exists(config.control_socket): os.unlink(config.control_socket) control_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) control_socket.bind(config.control_socket) control_socket.listen(5) comm_channels = [] return control_socket, comm_channels def _open_serial_port(self, config): """Opens the serial port controlling the door opener. config must have a serial_port member.""" try: self.serial_port = serial.Serial(config.serial_port, timeout=2) self._send_door_cmd(b'>r1\n') except: self.serial_port = None return self.serial_port def _open_nfc_fifo(self, config): """Opens config.nfc_fifo as the FIFO through which detected tokens are passed in.""" nfc_fifo = open(config.nfc_fifo, "r") return nfc_fifo def _read_valid_tokens(self, config): """Refreshes all tokens from config.valid_tokens""" valid = {} try: self.logger.info("Loading tokens") lines =[ s.strip() for s in open(config.valid_tokens, "r").readlines() ] for i, line in enumerate(lines): l = line.split('|') if len(l) == 5: if not l[0].strip().startswith('#'): token, name, organization, email, valid_thru = l try: if len(valid_thru.strip()) > 0: valid_thru = datetime.date.fromisoformat(valid_thru) else: valid_thru = None except Exception: logging.error(f"Could not parse valid thru date for token {token} in line {i}") valid_thru = None logging.debug(f"Got token {token} associated with {name} <{email}> of {organization}, valid thru {valid_thru}") if token in valid: logging.warning(f"Overwriting token {token}") valid[token] = {'name': name, 'organization': organization, 'email': email, 'valid_thru': valid_thru} else: logging.warning(f"Skipping line {i} ({line}) since it does not contain exactly 5 data field") except Exception as e: valid = {} logging.error(f"Error reading token file. Exception: {e}") return valid def _set_position(self, data: str): """Set a new door position""" pos, speed = [ int(x) for x in data.strip().split() ] self.state_pos(pos) self.speed(speed) if self.speed() != 0 and self.idle_start_time != None: self.idle_start_time = None elif self.speed() == 0 and self.idle_start_time == None: self.idle_start_time = datetime.datetime.now() if pos > ERROR_THRESHOLD and self.state() != DoorControl.ERROR: self.logger.error("Invalid position:", state) self.state(DoorControl.ERROR) elif pos > OPEN_THRESHOLD and self.state() != DoorControl.OPEN: self.state(DoorControl.OPEN) elif pos < CLOSED_THRESHOLD and self.state() != DoorControl.CLOSE: self.state(DoorControl.CLOSE) def _check_reporting(self, current: int): if current == 0: self.logger.info("Turning position reporting on") self._send_door_cmd(b'>r1\n') else: self.logger.info("Position reporting is on") def handle_door_line(self): """Reads a single line from the serial port and handles it""" data = self._read_door_line().strip() handling = { "pos": (str, self._set_position), "pos reporting": (int, self._check_reporting), } try: prefix, data = data.split(':') if prefix in handling: data_type, fun = handling[prefix] fun(data_type(data)) except: pass def poll_door_state(self): """Checks the door state if the last polling was a while ago, and returns the current state""" t = (datetime.datetime.now() - self.last_door_pos_time).total_seconds() if t >= 0.5: self.last_door_pos_time = datetime.datetime.now() self._send_door_cmd(b"R") self.handle_door_line() return self.state() def handle_door_state(self): """Checks the door state and executes any actions necessary to reach the target state""" # Commands associated with each target state target_state_cmd = { DoorControl.OPEN: b'O', DoorControl.CLOSE: b'C' } now = datetime.datetime.now() old_state = self.last_handled_state delta_t = (now - self.last_handle_state_timestamp).total_seconds() # If no serial port, try to open it or return if not self.serial_port: try: self._open_serial_port(self.config) except: return self.poll_door_state() if (self.idle_time() < MIN_IDLE_TIME) and delta_t < 5: return # Get new state self.last_handled_state = self.state() self.last_handle_state_timestamp = now if self.state() == DoorControl.ERROR: self.logger.error("Restarting the MCU and exiting.") self.send_door_cmd(b'S') return # Idle + change = key? if self.action() == DoorControl.IDLE: if self.state() != old_state: self.logger.info(f"Door changed unexpectedly: {DoorControl.state_names[self.state()]}") self.target_state(self.state()) self.start_time = now if self.state() == DoorControl.CLOSE \ and self.start_time \ and (now - self.start_time).total_seconds() >= self.config.state_timeout: self.start_time = None if self.state_pos() <= CLOSED_THRESHOLD and self.state_pos() > CLOSED_WANT: self.logger.info("Closing door a bit more") self._send_door_cmd(target_state_cmd[DoorControl.CLOSE]) return if self.start_time is None: self.start_time = now self._send_door_cmd(target_state_cmd[self.action()]) # Target state reached if self.state() == self.target_state(): self.action(DoorControl.IDLE) self.logger.debug(f"Reached target position: {DoorControl.state_names[self.state()]}") self.repeats = 0 self.start_time = now return elif self.state() != old_state: # Changed state, try to go to the target state again self.start_time = now self.action(old_state) self._send_door_cmd(target_state_cmd[self.action()]) # Execution time t = (now - self.start_time).total_seconds() if t >= self.config.state_timeout: # Timeout -> switch to timeout action if self.action() == DoorControl.OPEN: self.action(DoorControl.CLOSE) else: self.action(DoorControl.OPEN) self.start_time = None self.repeats = 0 self.logger.debug(f"Timeout. Switching to {DoorControl.action_names[self.action()]}") elif t >= (1 + self.repeats) * self.config.repeat_time: # Repeat every couple of seconds self.repeats += 1 self.logger.debug(f"Repeating command: {target_state_cmd[self.action()]}") self._send_door_cmd(target_state_cmd[self.action()]) def open_door(self): self.logger.info("Opening the door") self.target_state(DoorControl.OPEN) if self.action() == DoorControl.IDLE: self.action(DoorControl.OPEN) self.start_time = None self.handle_door_state() def close_door(self): self.logger.info("Closing the door") self.target_state(DoorControl.CLOSE) if self.action() == DoorControl.IDLE: self.action(DoorControl.CLOSE) self.start_time = None self.handle_door_state() def toggle_door_state(self): if self.action() != DoorControl.IDLE: return if self.target_state() == DoorControl.CLOSE: self.open_door() else: self.close_door() def handle_nfc_token(self, token = None): if not token: token = self.nfc_fifo.readline().strip() self.logger.debug(f"Token from nfc_fifo: {token}") if token == "": self.logger.debug("Opening nfc_fifo") self.nfc_fifo = self._open_nfc_fifo(self.config) return token = token.strip() if token in self.valid_tokens: data = self.valid_tokens[token] if data['valid_thru'] is not None: # if a valid thru date has been set we check if the token is still valid authorized = datetime.date.today() <= data['valid_thru'] else: # otherwise we don't need to check authorized = True if authorized: self.logger.info(f"Valid token {token} of {data['name']}") self.toggle_door_state() else: self.logger.warning(f"Token {token} of {data['name']} expired on {data['valid_thru']}") else: self.logger.warning(f"Invalid token: {token}") self.last_invalid_token(f"{timestamp()};{token}") class LineBuffer(object): def __init__(self, f, handler): self.data = b'' self.f = f self.handler = handler def update(self): data = self.f.recv(1024) if not data: return False self.data += data d = self.data.split(b'\n') d, self.data = d[:-1], d[-1] for i in d: self.handler(self.f, i) return True def handle_cmd(self, comm, data): cmd = data.decode('utf8').split() cmd, args = cmd[0], cmd[1:] self.logger.debug(f"Got command: {data}") send = lambda x: comm.send(x.encode('utf8')) if cmd == 'fake': self.logger.debug(f"Faking token {args[0]}") send("Handling token\n") self.handle_nfc_token(args[0]) elif cmd == 'reset': self.logger.info("Resetting") send("Resetting MCU") self._send_door_cmd(b'S') elif cmd == 'open': if len(args) > 0: self.logger.info(f"Control socket opening door for {args[0]}") send("Opening door") self.open_door() else: send("Missing login") elif cmd == 'close': if len(args) > 0: self.logger.info(f"Control socket closing door for {args[0]}") send("Closing door") self.close_door() else: send("Missing login") elif cmd == 'rld': self.logger.debug("Reloading tokens") send("Reloading tokens") self.valid_tokens = self._read_valid_tokens(self.config) elif cmd == 'stat': send("Door status is %s, position is %d. Current action: %s (%g seconds ago)\n" % ( DoorControl.state_names.get(self.state, "None"), self.state_pos.value, DoorControl.action_names[self.action()], (datetime.datetime.now() - self.start_time).total_seconds())) def main(): parser = argparse.ArgumentParser() parser.add_argument("--serial_port", default="/dev/serial/by-id/usb-Imaginaerraum.de_DoorControl_43363220195053573A002C0-if01") parser.add_argument("--nfc_fifo", default="/tmp/nfc_fifo") parser.add_argument("--control_socket", default="/tmp/nfc.sock") parser.add_argument("--valid_tokens", default="/etc/door_tokens") parser.add_argument("--log_file", default="/tmp/nfc.log") parser.add_argument("--state_timeout", type=float, default=10) parser.add_argument("--state_timeout_speed", type=float, default=3) parser.add_argument("--repeat_time", type=float, default=COMMAND_IDLE_TIME) parser.add_argument("--mqtt_host", default="10.10.21.2") config = parser.parse_args() dc = DoorControl(config) buffers = {} while True: readable = select.select([ dc.serial_port.fileno(), dc.nfc_fifo, dc.control_socket ] + dc.comm_channels, [], [], 1 / UPDATE_RATE)[0] for c in readable: if c == dc.serial_port.fileno(): dc.handle_door_line() elif c == dc.nfc_fifo: dc.handle_nfc_token() elif c == dc.control_socket: dc.logger.debug("Got connection") sock = dc.control_socket.accept()[0] buffers[sock] = dc.LineBuffer(sock, dc.handle_cmd) dc.comm_channels += [sock] else: if not buffers[c].update(): dc.logger.debug("Lost connection") del buffers[c] dc.comm_channels.remove(c) dc.handle_door_state() if __name__ == '__main__': main()