DoorControl/door_pi_control/door/control.py

238 lines
9.5 KiB
Python

from datetime import datetime
import os
from threading import Lock, RLock, Condition, Thread
from .communication import Communication
from .. import util
from .. import mqtt
from .constants import state_names, state
from . import constants
import sys
class Control(util.Loggable):
def __init__(self, config, mqtt_client=None):
super().__init__("door")
self._config = config
self._mutex = RLock()
self._control_update = Condition(self._mutex)
self._comms = Communication(config.serial_port)
self._position_task: Thread = None
self._control_task: Thread = None
def handle_target(client, userdata, msg):
self._logger().debug("Incoming MQTT message on %s: %s", msg.topic, msg.payload)
if msg.topic == "door/state/target" and msg.payload == b'closed':
self.close()
self.target = mqtt.Value(mqtt_client, "door/state/target",
persistent=True,
translate=state_names,
remote_update_callback = handle_target)
self.state = mqtt.Value(mqtt_client, "door/state/value",
persistent=True,
translate=state_names)
self.position = mqtt.Value(mqtt_client, "door/position/value",
persistent=True)
self._speed = 0
self._started = False
self._stop = False
self._idle = True
def _run_position(self):
cond = Condition(self._mutex)
warned = False
def update_position(_, line):
p, s = map(int, line.split())
with cond:
if p != self.position():
self._logger().debug(f"Position: {p}, {s}")
self.position(p)
self._speed = s
cond.notify()
self._comms.subscribe("pos", update_position)
last_update = datetime.now()
last_movement = datetime.now()
while True:
with cond:
if self._stop:
return
if not cond.wait(5):
self._comms.cmd_report()
dt = (datetime.now() - last_update).total_seconds()
if dt > 10 and not warned:
warned = True
self._logger().warn(f"No position for {dt} seconds")
else:
last_update = datetime.now()
warned = False
if self._speed != 0:
last_movement = datetime.now()
mov_dt = (datetime.now() - last_movement).total_seconds()
if mov_dt > constants.MIN_IDLE_TIME:
if self.position() < constants.CLOSED_THRESHOLD:
self.state(state.CLOSE)
self._control_update.notify()
#elif self.position() > constants.ERROR_THRESHOLD:
# self.state(state.ERROR)
# self._control_update.notify()
elif self.position() > constants.OPEN_THRESHOLD:
self.state(state.OPEN)
self._control_update.notify()
elif not self._idle:
self._control_update.notify()
self._idle = True
else:
self._idle = False
def _run_control(self):
# Not controlling the lock
controlling = False
cmd = {
state.RESTART: self._comms.cmd_restart,
state.OPEN: self._comms.cmd_open,
state.OPEN_THEN_CLOSE: self._comms.cmd_open,
state.CLOSE: self._comms.cmd_close,
state.CLOSE_THEN_OPEN: self._comms.cmd_close,
}
with self._control_update:
last_target = state.IDLE
last_state = None
action = None
# When starting, reset the MCU once
self.target(state.CLOSE)
# Starting time of the current action
start_time = datetime.now()
timeouts = 0
while not self._stop:
# Wait for an update
self._control_update.wait(1)
if not self._idle:
# If still moving, continue
self._logger().debug(f"Not idle")
continue
# Update was that the target has changed
if self.target() != last_target:
self._logger().debug(f"Target update: {state_names[self.target()]}")
action = cmd.get(self.target(), None)
last_action = cmd.get(last_target, None)
if action != None and last_action != action:
# We need to send a different command for this
self._logger().debug(f"Calling {action}")
action()
elif action == last_action:
self._logger().debug(f"Same command as {state_names[last_target]}")
# Update last known target and starting time
last_target = self.target()
start_time = datetime.now()
if self.state() != last_state:
# State from position handling differs from last known state
self._logger().debug(f"State update, target is {state_names[self.target()]}")
last_state = self.state()
self._logger().info("Reached state "
f"{state_names.get(last_state, last_state)}")
if action == None and self.state() != state.ERROR:
self._logger().info("Probably somebody using the key")
self.target(last_state)
last_target = last_state
elif last_state == last_target:
# Reached target
timeouts = 0
self._logger().debug("Target reached")
if last_target == state.CLOSE \
and self.position() > constants.CLOSED_WANT:
self._logger().info(
f"Position is {self.position()}, "
"closing some more")
self._comms.cmd_close()
controlling = False
action = None
elif self.state() == state.ERROR:
# Position too high, restart
self._logger().debug("Error")
self._comm.cmd_restart()
sleep(5)
self._comm.cmd_close()
sleep(1)
sys.exit(1)
else:
self._logger().debug("Reached wrong state")
if timeouts < 3:
timeouts += 1
if self.target() == last_target:
# Initially, switch to the other one
# and execute that
self._logger().debug("Switching targets")
last_target = {
state.CLOSE: state.OPEN_THEN_CLOSE,
state.OPEN: state.CLOSE_THEN_OPEN
}.get(last_target, state.RESTART)
cmd[last_target]()
else:
# Then go back
self._logger().debug("Going back to actual target")
last_target = self.target()
cmd[last_target]()
else:
# Tried too often, restart
self._logger().debug("Something's wrong")
self.target(state.RESTART)
def start(self):
with self._mutex:
if not self._started:
self._started = True
self._stop = False
self._comms.start()
self._position_task = Thread(target=self._run_position,
daemon=True)
self._control_task = Thread(target=self._run_control,
daemon=True)
self._position_task.start()
self._control_task.start()
def stop(self):
with self._mutex:
if self._started:
self._started = False
self._stop = True
self._comms.stop()
self._mutex.release()
self._position_task.join()
self._control_task.join()
def open(self):
with self._mutex:
self._logger().info("Opening")
self.target(state.OPEN)
self._control_update.notify()
def close(self):
with self._mutex:
self._logger().info("Closing")
self.target(state.CLOSE)
self._control_update.notify()
def toggle(self):
with self._mutex:
self._logger().debug("Asked to toggle")
if self.target() == state.OPEN:
self.close()
else:
self.open()
def state(self):
with self._mutex:
return self._state