DoorControl/door_pi_control/util.py
2022-11-11 17:12:18 +01:00

112 lines
3.4 KiB
Python

from typing import Callable, Iterable, Optional, Tuple, TextIO, BinaryIO, IO, Union, List
from os import PathLike
from threading import RLock, Condition
import datetime
import logging
import logging.handlers
import sys
_log_handlers: List[logging.Handler] = []
_log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
def init_logging(*,
level: int = logging.DEBUG,
output_file: Union[None, str, PathLike] = None,
output_host: Optional[Tuple[str, int]] = None,
output_stream: Optional[TextIO] = sys.stdout) -> None:
"""Set up the logging format and outputs.
level: Logging level (DEBUG)
output_file: File to write to (None)
output_host: Tuple of host and port that will receive log datagrams
"""
global _log_handlers
_log_handlers = []
if output_file != None:
# Create a file handler
file_handler = logging.FileHandler(output_file)
file_handler.setLevel(level)
_log_handlers.append(file_handler)
if output_host != None:
dgram_handler = logging.handlers.DatagramHandler(*output_host)
dgram_handler.setLevel(logging.DEBUG)
_log_handlers.append(dgram_handler)
if output_stream != None:
console_handler = logging.StreamHandler(output_stream)
console_handler.setLevel(logging.DEBUG)
_log_handlers.append(console_handler)
# Default log output
logging.basicConfig(force=True, level=logging.DEBUG, format=_log_format, handlers=_log_handlers)
def logger(*path: Iterable[str]) -> logging.Logger:
logger = logging.getLogger("door_pi_control")
for p in path:
logger = logger.getChild(p)
return logger
def now() -> datetime.datetime:
return datetime.datetime.now()
def timestamp() -> str:
return now().strftime("%Y-%m-%d %H:%M:%S")
class Loggable:
def __init__(self, *path):
self._logger_path = path
def _logger(self) -> logging.Logger:
return logger(*self._logger_path, type(self).__name__)
class Event:
def __init__(self, lock = None):
self._lock = lock if lock != None else RLock()
self._condition = Condition(self._lock)
self._subscribers = {}
def acquire(self, *args):
return self._condition.acquire(*args)
def notify(self, *args, **kwargs):
with self._lock:
for cv, cb in self._subscribers:
with cv:
if cb:
cb(*args, **kwargs)
cv.notify()
self._condition.notify()
def notify_all(self):
with self._lock:
for cv, cb in self._subscribers:
with cv:
if cb:
cb()
cv.notify_all()
self._condition.notify_all()
def subscribe(self, cv: Condition, callback: Callable = None):
with self._lock:
self._subscribers[cv] = callback
def unsubscribe(self, cv: Condition):
with self._lock:
if cv in self._subscribers:
del self._subscribers[cv]
def wait(self, timeout=None):
return self._condition.wait(timeout)
def wait_for(self, predicate, timeout=None):
return self._condition.wait_for(predicate, timeout)
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, *args):
self._lock.__exit__()