from typing import Callable, Iterable, Optional, Tuple, TextIO, BinaryIO, IO, Union, List from os import PathLike from threading import RLock, Condition import datetime import logging import logging.handlers import sys _log_handlers: List[logging.Handler] = [] _log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' def init_logging(*, level: int = logging.DEBUG, output_file: Union[None, str, PathLike] = None, output_host: Optional[Tuple[str, int]] = None, output_stream: Optional[TextIO] = sys.stdout) -> None: """Set up the logging format and outputs. level: Logging level (DEBUG) output_file: File to write to (None) output_host: Tuple of host and port that will receive log datagrams """ global _log_handlers _log_handlers = [] if output_file != None: # Create a file handler file_handler = logging.FileHandler(output_file) file_handler.setLevel(level) _log_handlers.append(file_handler) if output_host != None: dgram_handler = logging.handlers.DatagramHandler(*output_host) dgram_handler.setLevel(logging.DEBUG) _log_handlers.append(dgram_handler) if output_stream != None: console_handler = logging.StreamHandler(output_stream) console_handler.setLevel(logging.DEBUG) _log_handlers.append(console_handler) # Default log output logging.basicConfig(force=True, level=logging.DEBUG, format=_log_format, handlers=_log_handlers) def logger(*path: Iterable[str]) -> logging.Logger: logger = logging.getLogger("door_pi_control") for p in path: logger = logger.getChild(p) return logger def now() -> datetime.datetime: return datetime.datetime.now() def timestamp() -> str: return now().strftime("%Y-%m-%d %H:%M:%S") class Loggable: def __init__(self, *path): self._logger_path = path def _logger(self) -> logging.Logger: return logger(*self._logger_path, type(self).__name__) class Event: def __init__(self, lock = None): self._lock = lock if lock != None else RLock() self._condition = Condition(self._lock) self._subscribers = {} def acquire(self, *args): return self._condition.acquire(*args) def notify(self, *args, **kwargs): with self._lock: for cv, cb in self._subscribers: with cv: if cb: cb(*args, **kwargs) cv.notify() self._condition.notify() def notify_all(self): with self._lock: for cv, cb in self._subscribers: with cv: if cb: cb() cv.notify_all() self._condition.notify_all() def subscribe(self, cv: Condition, callback: Callable = None): with self._lock: self._subscribers[cv] = callback def unsubscribe(self, cv: Condition): with self._lock: if cv in self._subscribers: del self._subscribers[cv] def wait(self, timeout=None): return self._condition.wait(timeout) def wait_for(self, predicate, timeout=None): return self._condition.wait_for(predicate, timeout) def __enter__(self): return self._lock.__enter__() def __exit__(self, *args): self._lock.__exit__()