DoorControl/door_pi_control/mqtt.py

107 lines
3.8 KiB
Python

import datetime
import logging
import typing
import time
from typing import Any, Optional, Callable
from paho.mqtt.client import Client as Client
_logger = logging.getLogger("door_pi_control").getChild("util")
class Value:
def __init__(self,
client: Client,
topic: str,
start_value: Any = None,
*,
persistent: bool = False,
translate: typing.Union[
None,
typing.Dict[Any, str],
typing.Callable]
= None,
max_update: float = 0.1,
remote_update_callback = None):
self.client = client
self.topic = topic
self.persistent = persistent
self.value = start_value
self.last_update = datetime.datetime.now() - datetime.timedelta(
seconds=max_update)
self.last_update_value = None
self.max_update = max_update
if translate is None:
self.translate: Callable = str
elif callable(translate):
self.translate = translate
else:
self.translate = lambda x: translate.get(x, "None")
if start_value is not None:
self.update(start_value)
if remote_update_callback is not None:
_logger.debug("Subscribing to topic %s", self.topic)
self.client.my_message_callback_add(self.topic, remote_update_callback)
while self.client.subscribe(self.topic, 2)[1] is None:
time.sleep(0.5)
_logger.warning("Retrying...")
def update(self, value: Any, *,
force: bool = False,
no_update: bool = False) -> None:
if value != self.value or value != self.last_update_value:
self.value = value
if value is not None and \
(force
or (not no_update
and value != self.last_update_value
and (datetime.datetime.now()
- self.last_update
).total_seconds() >= self.max_update)):
self.last_update = datetime.datetime.now()
self.last_update_value = value
if self.client is not None:
self.client.publish(
self.topic,
self.translate(value),
qos=2,
retain=self.persistent)
def force_update(self, value) -> None:
self.update(value, force=True)
def __call__(self, value=None, **kwargs) -> typing.Optional[Any]:
if value is not None:
self.update(value, **kwargs)
return value
return self.value
def str(self):
return self.translate(self.value)
def reconnecting_client(host: str, *, keepalive: int = 60):
_logger.debug("Creating MQTT client with keepalive interval %d", keepalive)
client = Client()
client.mmcbdata = {}
def mmcb(topic, cb):
client.mmcbdata[topic] = cb
client.message_callback_add(topic, cb)
def on_connect(client, userdata, flags, rc):
_logger.debug("Connected to mqtt host")
for topic, cb in client.mmcbdata.items():
_logger.debug("Re-adding subscription for %s", topic);
client.message_callback_add(topic, cb)
client.my_message_callback_add = mmcb
client.on_connect = on_connect
client.on_disconnect = lambda client, userdata, rc: \
_logger.debug("Disconnected from mqtt host")
# client.enable_logger(_logger.getChild("paho"))
client.connect_async(host, keepalive=keepalive)
def on_message(client, userdata, msg):
_logger.debug("Unmatched message on %s: %s", msg.topic, msg.payload)
client.on_message = on_message
client.loop_start()
return client