DoorControl/door_pi_control/util.py

109 lines
3.2 KiB
Python
Raw Normal View History

2022-11-06 13:30:11 +00:00
from typing import Callable, Iterable, Optional, Tuple, TextIO, BinaryIO, IO, Union, List
from os import PathLike
from threading import RLock, Condition
import datetime
import logging
import sys
_log_handlers: List[logging.Handler] = []
_log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
def init_logging(*,
level: int = logging.DEBUG,
output_file: Union[None, str, PathLike] = None,
output_host: Optional[Tuple[str, int]] = None,
output_stream: Optional[TextIO] = sys.stdout) -> None:
"""Set up the logging format and outputs.
level: Logging level (DEBUG)
output_file: File to write to (None)
output_host: Tuple of host and port that will receive log datagrams
"""
global _log_handlers
_log_handlers = []
if output_file != None:
# Create a file handler
file_handler = logging.FileHandler(output_file)
_log_handlers.append(file_handler)
if output_host != None:
dgram_handler = logging.DatagramHandler(*output_host)
_log_handlers.append(dgram_handler)
if output_stream != None:
console_handler = logging.StreamHandler(output_stream)
_log_handlers.append(console_handler)
for h in _log_handlers:
h.setLevel(level)
# Default log output
logging.basicConfig(force=True, level=level, format=_log_format, handlers=_log_handlers)
def logger(*path: Iterable[str]) -> logging.Logger:
logger = logging.getLogger("door_pi_control")
for p in path:
logger = logger.getChild(p)
return logger
def now() -> datetime.datetime:
return datetime.datetime.now()
def timestamp() -> str:
return now().strftime("%Y-%m-%d %H:%M:%S")
class Loggable:
def _logger(self) -> logging.Logger:
return logger(type(self).__name__)
class Event:
def __init__(self, lock = None):
self._lock = lock if lock != None else RLock()
self._condition = Condition(self._lock)
self._subscribers = {}
def acquire(self, *args):
return self._condition.acquire(*args)
def notify(self, *args, **kwargs):
with self._lock:
for cv, cb in self._subscribers:
with cv:
if cb:
cb(*args, **kwargs)
cv.notify()
self._condition.notify()
def notify_all(self):
with self._lock:
for cv, cb in self._subscribers:
with cv:
if cb:
cb()
cv.notify_all()
self._condition.notify_all()
def subscribe(self, cv: Condition, callback: Callable = None):
with self._lock:
self._subscribers[cv] = callback
def unsubscribe(self, cv: Condition):
with self._lock:
if cv in self._subscribers:
del self._subscribers[cv]
def wait(self, timeout=None):
return self._condition.wait(timeout)
def wait_for(self, predicate, timeout=None):
return self._condition.wait_for(predicate, timeout)
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, *args):
self._lock.__exit__()