import os import select import socket import time from threading import Lock, RLock, Condition, Thread from typing import Callable, Optional, IO, Container from . import mqtt, util from .util import timestamp from .door.constants import state_names class DoorControlSocket(util.Loggable): class LineBuffer(object): def __init__(self, f, handler): self.data = b'' self.f = f self.handler = handler def update(self): data = self.f.recv(1024) print(repr(data)) if not data: print("Eep") return False self.data += data d = self.data.split(b'\n') d, self.data = d[:-1], d[-1] for i in d: print("Handling", repr(i)) self.handler(self.f, i) return True def __init__(self, config, control, nfc): self._config = config self._control = control self._nfc = nfc self._fifo = self._open_control_socket() self._stop = True self._mutex = RLock() self._cond = Condition(self._mutex) def _open_control_socket(self) -> socket.socket: """(Re-)creates and opens the control socket. Config must have a control_socket member.""" path = self._config.control_socket self._logger().debug(f"Opening control socket {path}") if os.path.exists(path): os.unlink(path) s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.bind(path) s.listen(5) return s def _run(self): buffers = {} sockets = [] with self._cond: while not self._stop: if not self._fifo: self._fifo = self._open_control_socket() if not self._fifo: time.sleep(5) continue readable, _, _ = select.select( [self._fifo] + sockets, [], [], 1) for fd in readable: if fd == self._fifo: self._logger().debug("Got connection") sock = fd.accept()[0] buffers[sock] = DoorControlSocket.LineBuffer(sock, self.handle_cmd) sockets += [sock] else: if not buffers[fd].update(): del buffers[fd] sockets.remove(fd) def handle_cmd(self, comm, data): cmd = data.decode('utf8').split() try: cmd, args = cmd[0], cmd[1:] except: return self._logger().debug(f"Got command: {data}") send = lambda x: comm.send(x.encode('utf8')) if cmd == 'fake': self._logger().debug(f"Faking token {args[0]}") send("Handling token\n") self._nfc.handle_token(args[0]) elif cmd == 'reset': self._logger().info("Resetting") send("Resetting MCU") self._control._comms.cmd_restart() elif cmd == 'open': if len(args) > 0: self._logger().info(f"Control socket opening door for {args[0]}") send("Opening door") self._control.open() else: send("Missing login") elif cmd == 'close': if len(args) > 0: self._logger().info(f"Control socket closing door for {args[0]}") send("Closing door") self._control.close() else: send("Missing login") elif cmd == 'rld': self._logger().debug("Reloading tokens") send("Reloading tokens") self._nfc._read_valid_tokens() elif cmd == 'stat': send("Door status is %s, position is %d. Current action: %s\n" % ( self._control.state.str(), self._control.position(), self._control.target.str())) def start(self): with self._mutex: if self._stop: self._stop = False self._task = Thread(target=self._run, daemon=True) self._task.start() def stop(self): with self._cond: if not self._stop: self._stop = True self._cond.notify() self._task.join() if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument("--serial_port", default="/dev/serial/by-id/usb-Imaginaerraum.de_DoorControl_43363220195053573A002C0-if01") parser.add_argument("--nfc_fifo", default="/tmp/nfc_fifo") parser.add_argument("--control_socket", default="/tmp/nfc.sock") parser.add_argument("--valid_tokens", default="/etc/door_tokens") parser.add_argument("--log_file", default="/tmp/nfc.log") parser.add_argument("--state_timeout", type=float, default=10) parser.add_argument("--state_timeout_speed", type=float, default=3) parser.add_argument("--repeat_time", type=float, default=COMMAND_IDLE_TIME) parser.add_argument("--mqtt_host", default="10.10.21.2") config = parser.parse_args() util.init_logging() def start(): dc = DoorControl(config) dc._comms.start() t = Thread(target=dc.run) t.start() return dc