107 lines
3.8 KiB
Python
107 lines
3.8 KiB
Python
import datetime
|
|
import logging
|
|
import typing
|
|
import time
|
|
from typing import Any, Optional, Callable
|
|
|
|
from paho.mqtt.client import Client as Client
|
|
|
|
_logger = logging.getLogger("door_pi_control").getChild("util")
|
|
|
|
class Value:
|
|
def __init__(self,
|
|
client: Client,
|
|
topic: str,
|
|
start_value: Any = None,
|
|
*,
|
|
persistent: bool = False,
|
|
translate: typing.Union[
|
|
None,
|
|
typing.Dict[Any, str],
|
|
typing.Callable]
|
|
= None,
|
|
max_update: float = 0.1,
|
|
remote_update_callback = None):
|
|
self.client = client
|
|
self.topic = topic
|
|
self.persistent = persistent
|
|
self.value = start_value
|
|
self.last_update = datetime.datetime.now() - datetime.timedelta(
|
|
seconds=max_update)
|
|
self.last_update_value = None
|
|
self.max_update = max_update
|
|
|
|
if translate is None:
|
|
self.translate: Callable = str
|
|
elif callable(translate):
|
|
self.translate = translate
|
|
else:
|
|
self.translate = lambda x: translate.get(x, "None")
|
|
|
|
if start_value is not None:
|
|
self.update(start_value)
|
|
|
|
if remote_update_callback is not None:
|
|
_logger.debug("Subscribing to topic %s", self.topic)
|
|
self.client.my_message_callback_add(self.topic, remote_update_callback)
|
|
while self.client.subscribe(self.topic, 2)[1] is None:
|
|
time.sleep(0.5)
|
|
_logger.warning("Retrying...")
|
|
|
|
def update(self, value: Any, *,
|
|
force: bool = False,
|
|
no_update: bool = False) -> None:
|
|
if value != self.value or value != self.last_update_value:
|
|
self.value = value
|
|
if value is not None and \
|
|
(force
|
|
or (not no_update
|
|
and value != self.last_update_value
|
|
and (datetime.datetime.now()
|
|
- self.last_update
|
|
).total_seconds() >= self.max_update)):
|
|
self.last_update = datetime.datetime.now()
|
|
self.last_update_value = value
|
|
if self.client is not None:
|
|
self.client.publish(
|
|
self.topic,
|
|
self.translate(value),
|
|
qos=2,
|
|
retain=self.persistent)
|
|
|
|
def force_update(self, value) -> None:
|
|
self.update(value, force=True)
|
|
|
|
def __call__(self, value=None, **kwargs) -> typing.Optional[Any]:
|
|
if value is not None:
|
|
self.update(value, **kwargs)
|
|
return value
|
|
return self.value
|
|
|
|
def str(self):
|
|
return self.translate(self.value)
|
|
|
|
def reconnecting_client(host: str, *, keepalive: int = 60):
|
|
_logger.debug("Creating MQTT client with keepalive interval %d", keepalive)
|
|
client = Client()
|
|
client.mmcbdata = {}
|
|
def mmcb(topic, cb):
|
|
client.mmcbdata[topic] = cb
|
|
client.message_callback_add(topic, cb)
|
|
def on_connect(client, userdata, flags, rc):
|
|
_logger.debug("Connected to mqtt host")
|
|
for topic, cb in client.mmcbdata.items():
|
|
_logger.debug("Re-adding subscription for %s", topic);
|
|
client.message_callback_add(topic, cb)
|
|
client.my_message_callback_add = mmcb
|
|
client.on_connect = on_connect
|
|
client.on_disconnect = lambda client, userdata, rc: \
|
|
_logger.debug("Disconnected from mqtt host")
|
|
# client.enable_logger(_logger.getChild("paho"))
|
|
client.connect_async(host, keepalive=keepalive)
|
|
def on_message(client, userdata, msg):
|
|
_logger.debug("Unmatched message on %s: %s", msg.topic, msg.payload)
|
|
client.on_message = on_message
|
|
client.loop_start()
|
|
return client
|