145 lines
4.1 KiB
Python
145 lines
4.1 KiB
Python
import os
|
|
import select
|
|
import serial
|
|
import time
|
|
|
|
from threading import Lock, RLock, Thread
|
|
from typing import IO, Dict, Callable, Union
|
|
|
|
from ..util import Loggable
|
|
|
|
class Communication(Loggable):
|
|
def __init__(self, port: Union[os.PathLike, IO]):
|
|
self._mutex: RLock = RLock()
|
|
self._write_mutex: Lock = Lock()
|
|
|
|
self._subscribers: Dict[str, Callable] = {}
|
|
self._read_pipe, self._write_pipe = os.pipe()
|
|
os.set_blocking(self._read_pipe, False)
|
|
self._read_pipe = os.fdopen(self._read_pipe, 'rb', buffering=0, )
|
|
self._write_pipe = os.fdopen(self._write_pipe, 'wb', buffering=0)
|
|
|
|
self._port_path = port
|
|
self._serial_port = None
|
|
self._open_serial_port()
|
|
|
|
self._stopped = True
|
|
self._task = None
|
|
|
|
def _forward(self) -> None:
|
|
data = self._read_pipe.read()
|
|
while True:
|
|
try:
|
|
return self._write(data)
|
|
except serial.serialutil.SerialException:
|
|
self._open_serial_port()
|
|
|
|
def _open_serial_port(self) -> None:
|
|
self._logger().debug("Opening port")
|
|
port = self._port_path
|
|
if type(port) == str:
|
|
i = 0
|
|
while i < 30 and not os.path.exists(port):
|
|
time.sleep(1)
|
|
i += 1
|
|
self._serial_port = serial.Serial(port, timeout=2)
|
|
else:
|
|
self._serial_port = port
|
|
|
|
def _process(self):
|
|
try:
|
|
line = self._read()
|
|
except serial.serialutil.SerialException:
|
|
line = None
|
|
if line == None:
|
|
self._logger().info("Empty line, reopening serial port")
|
|
self._open_serial_port()
|
|
return
|
|
|
|
if ':' not in line:
|
|
self._logger().debug(f"Line: {line}")
|
|
self._send_event("", line.strip())
|
|
return
|
|
|
|
try:
|
|
prefix, data = line.split(":", maxsplit=2)
|
|
if not prefix.startswith("pos"):
|
|
self._logger().debug(f"Line: {line}")
|
|
self._send_event(prefix, data.strip())
|
|
except:
|
|
pass
|
|
|
|
def _read(self) -> str:
|
|
data = self._serial_port.readline()
|
|
if data == "":
|
|
return None
|
|
return data.decode("ascii").strip()
|
|
|
|
def _run(self):
|
|
while True:
|
|
with self._mutex:
|
|
if self._stopped:
|
|
return
|
|
|
|
readable, _, _ = select.select(
|
|
[self._serial_port, self._read_pipe],
|
|
[], [], 0.1)
|
|
for r in readable:
|
|
if r == self._serial_port:
|
|
self._process()
|
|
elif r == self._read_pipe:
|
|
self._forward()
|
|
|
|
def _send_event(self, prefix, data):
|
|
with self._mutex:
|
|
if prefix in self._subscribers:
|
|
for sub in self._subscribers[prefix]:
|
|
sub(prefix, data)
|
|
|
|
def _write(self, data: bytes) -> None:
|
|
with self._write_mutex:
|
|
self._logger().debug(f"Sending {repr(data)}")
|
|
if self._serial_port:
|
|
self._serial_port.write(data)
|
|
|
|
|
|
def start(self):
|
|
with self._mutex:
|
|
self._logger().debug("Starting")
|
|
if self._task is None:
|
|
self._stopped = False
|
|
self._task = Thread(target=self._run, daemon=True)
|
|
self._task.start()
|
|
|
|
def stop(self) -> None:
|
|
self._mutex.acquire()
|
|
if self._task is not None:
|
|
self._stopped = True
|
|
self._mutex.release()
|
|
self._task.join()
|
|
self._mutex.acquire()
|
|
self._task = None
|
|
self._mutex.release()
|
|
|
|
def subscribe(self, prefix: str, callback: Callable):
|
|
with self._mutex:
|
|
if prefix not in self._subscribers:
|
|
self._subscribers[prefix] = []
|
|
self._subscribers[prefix].append(callback)
|
|
|
|
def write(self, data):
|
|
with self._write_mutex:
|
|
self._write_pipe.write(data)
|
|
|
|
def cmd_open(self):
|
|
self.write(b"O")
|
|
|
|
def cmd_close(self):
|
|
self.write(b"C")
|
|
|
|
def cmd_report(self):
|
|
self.write(b">r1\nR")
|
|
|
|
def cmd_restart(self):
|
|
self.write(b"S")
|