import datetime import logging import typing import time from typing import Any, Optional, Callable from paho.mqtt.client import Client as Client _logger = logging.getLogger("door_pi_control").getChild("util") class Value: def __init__(self, client: Client, topic: str, start_value: Any = None, *, persistent: bool = False, translate: typing.Union[ None, typing.Dict[Any, str], typing.Callable] = None, max_update: float = 0.1, remote_update_callback = None): self.client = client self.topic = topic self.persistent = persistent self.value = start_value self.last_update = datetime.datetime.now() - datetime.timedelta( seconds=max_update) self.last_update_value = None self.max_update = max_update if translate is None: self.translate: Callable = str elif callable(translate): self.translate = translate else: self.translate = lambda x: translate.get(x, "None") if start_value is not None: self.update(start_value) if remote_update_callback is not None: _logger.debug("Subscribing to topic %s", self.topic) self.client.my_message_callback_add(self.topic, remote_update_callback) while self.client.subscribe(self.topic, 2)[1] is None: time.sleep(0.5) _logger.warning("Retrying...") def update(self, value: Any, *, force: bool = False, no_update: bool = False) -> None: if value != self.value or value != self.last_update_value: self.value = value if value is not None and \ (force or (not no_update and value != self.last_update_value and (datetime.datetime.now() - self.last_update ).total_seconds() >= self.max_update)): self.last_update = datetime.datetime.now() self.last_update_value = value if self.client is not None: self.client.publish( self.topic, self.translate(value), qos=2, retain=self.persistent) def force_update(self, value) -> None: self.update(value, force=True) def __call__(self, value=None, **kwargs) -> typing.Optional[Any]: if value is not None: self.update(value, **kwargs) return value return self.value def str(self): return self.translate(self.value) def reconnecting_client(host: str, *, keepalive: int = 60): _logger.debug("Creating MQTT client with keepalive interval %d", keepalive) client = Client() client.mmcbdata = {} def mmcb(topic, cb): client.mmcbdata[topic] = cb client.message_callback_add(topic, cb) def on_connect(client, userdata, flags, rc): _logger.debug("Connected to mqtt host") for topic, cb in client.mmcbdata.items(): _logger.debug("Re-adding subscription for %s", topic); client.message_callback_add(topic, cb) client.my_message_callback_add = mmcb client.on_connect = on_connect client.on_disconnect = lambda client, userdata, rc: \ _logger.debug("Disconnected from mqtt host") # client.enable_logger(_logger.getChild("paho")) client.connect_async(host, keepalive=keepalive) def on_message(client, userdata, msg): _logger.debug("Unmatched message on %s: %s", msg.topic, msg.payload) client.on_message = on_message client.loop_start() return client