import os import select import serial import time from threading import Lock, RLock, Thread from typing import IO, Dict, Callable, Union from ..util import Loggable class Communication(Loggable): def __init__(self, port: Union[os.PathLike, IO]): self._mutex: RLock = RLock() self._write_mutex: Lock = Lock() self._subscribers: Dict[str, Callable] = {} self._read_pipe, self._write_pipe = os.pipe() os.set_blocking(self._read_pipe, False) self._read_pipe = os.fdopen(self._read_pipe, 'rb', buffering=0, ) self._write_pipe = os.fdopen(self._write_pipe, 'wb', buffering=0) self._port_path = port self._serial_port = None self._open_serial_port() self._stopped = True self._task = None def _forward(self) -> None: data = self._read_pipe.read() while True: try: return self._write(data) except serial.serialutil.SerialException: self._open_serial_port() def _open_serial_port(self) -> None: self._logger().debug("Opening port") port = self._port_path if type(port) == str: i = 0 while i < 30 and not os.path.exists(port): time.sleep(1) i += 1 self._serial_port = serial.Serial(port, timeout=2) else: self._serial_port = port def _process(self): try: line = self._read() except serial.serialutil.SerialException: line = None if line == None: self._logger().info("Empty line, reopening serial port") self._open_serial_port() return if ':' not in line: self._logger().debug(f"Line: {line}") self._send_event("", line.strip()) return try: prefix, data = line.split(":", maxsplit=2) if not prefix.startswith("pos"): self._logger().debug(f"Line: {line}") self._send_event(prefix, data.strip()) except: pass def _read(self) -> str: data = self._serial_port.readline() if data == "": return None return data.decode("ascii").strip() def _run(self): while True: with self._mutex: if self._stopped: return readable, _, _ = select.select( [self._serial_port, self._read_pipe], [], [], 0.1) for r in readable: if r == self._serial_port: self._process() elif r == self._read_pipe: self._forward() def _send_event(self, prefix, data): with self._mutex: if prefix in self._subscribers: for sub in self._subscribers[prefix]: sub(prefix, data) def _write(self, data: bytes) -> None: with self._write_mutex: self._logger().debug(f"Sending {repr(data)}") if self._serial_port: self._serial_port.write(data) def start(self): with self._mutex: self._logger().debug("Starting") if self._task is None: self._stopped = False self._task = Thread(target=self._run, daemon=True) self._task.start() def stop(self) -> None: self._mutex.acquire() if self._task is not None: self._stopped = True self._mutex.release() self._task.join() self._mutex.acquire() self._task = None self._mutex.release() def subscribe(self, prefix: str, callback: Callable): with self._mutex: if prefix not in self._subscribers: self._subscribers[prefix] = [] self._subscribers[prefix].append(callback) def write(self, data): with self._write_mutex: self._write_pipe.write(data) def cmd_open(self): self.write(b"O") def cmd_close(self): self.write(b"C") def cmd_report(self): self.write(b">r1\nR") def cmd_restart(self): self.write(b"S")