DoorControl/door_pi_control/door/communication.py
2022-11-11 17:12:18 +01:00

146 lines
4.2 KiB
Python

import os
import select
import serial
import time
from threading import Lock, RLock, Thread
from typing import IO, Dict, Callable, Union
from ..util import Loggable
class Communication(Loggable):
def __init__(self, port: Union[os.PathLike, IO]):
super().__init__("door")
self._mutex: RLock = RLock()
self._write_mutex: Lock = Lock()
self._subscribers: Dict[str, Callable] = {}
self._read_pipe, self._write_pipe = os.pipe()
os.set_blocking(self._read_pipe, False)
self._read_pipe = os.fdopen(self._read_pipe, 'rb', buffering=0, )
self._write_pipe = os.fdopen(self._write_pipe, 'wb', buffering=0)
self._port_path = port
self._serial_port = None
self._open_serial_port()
self._stopped = True
self._task = None
def _forward(self) -> None:
data = self._read_pipe.read()
while True:
try:
return self._write(data)
except serial.serialutil.SerialException:
self._open_serial_port()
def _open_serial_port(self) -> None:
self._logger().debug("Opening port")
port = self._port_path
if type(port) == str:
i = 0
while i < 30 and not os.path.exists(port):
time.sleep(1)
i += 1
self._serial_port = serial.Serial(port, timeout=2)
else:
self._serial_port = port
def _process(self):
try:
line = self._read()
except serial.serialutil.SerialException:
line = None
if line == None:
self._logger().info("Empty line, reopening serial port")
self._open_serial_port()
return
if ':' not in line:
self._logger().debug(f"Line: {line}")
self._send_event("", line.strip())
return
try:
prefix, data = line.split(":", maxsplit=2)
if not prefix.startswith("pos"):
self._logger().debug(f"Line: {line}")
self._send_event(prefix, data.strip())
except:
pass
def _read(self) -> str:
data = self._serial_port.readline()
if data == "":
return None
return data.decode("ascii").strip()
def _run(self):
while True:
with self._mutex:
if self._stopped:
return
readable, _, _ = select.select(
[self._serial_port, self._read_pipe],
[], [], 0.1)
for r in readable:
if r == self._serial_port:
self._process()
elif r == self._read_pipe:
self._forward()
def _send_event(self, prefix, data):
with self._mutex:
if prefix in self._subscribers:
for sub in self._subscribers[prefix]:
sub(prefix, data)
def _write(self, data: bytes) -> None:
with self._write_mutex:
self._logger().debug(f"Sending {repr(data)}")
if self._serial_port:
self._serial_port.write(data)
def start(self):
with self._mutex:
self._logger().debug("Starting")
if self._task is None:
self._stopped = False
self._task = Thread(target=self._run, daemon=True)
self._task.start()
def stop(self) -> None:
self._mutex.acquire()
if self._task is not None:
self._stopped = True
self._mutex.release()
self._task.join()
self._mutex.acquire()
self._task = None
self._mutex.release()
def subscribe(self, prefix: str, callback: Callable):
with self._mutex:
if prefix not in self._subscribers:
self._subscribers[prefix] = []
self._subscribers[prefix].append(callback)
def write(self, data):
with self._write_mutex:
self._write_pipe.write(data)
def cmd_open(self):
self.write(b"O")
def cmd_close(self):
self.write(b"C")
def cmd_report(self):
self.write(b">r1\nR")
def cmd_restart(self):
self.write(b"S")