DoorControl/door_pi_control/door/control.py

219 lines
8.6 KiB
Python

from datetime import datetime
import os
from threading import Lock, RLock, Condition, Thread
from .communication import Communication
from .. import util
from .. import mqtt
from .constants import state_names, state
from . import constants
class Control(util.Loggable):
def __init__(self, config, mqtt_client=None):
self._config = config
self._mutex = RLock()
self._control_update = Condition(self._mutex)
self._comms = Communication(config.serial_port)
self._position_task: Thread = None
self._control_task: Thread = None
self.target = mqtt.Value(mqtt_client, "door/state/target",
persistent=True,
translate=state_names)
self.state = mqtt.Value(mqtt_client, "door/state/value",
persistent=True,
translate=state_names)
self.position = mqtt.Value(mqtt_client, "door/position/value",
persistent=True)
self._speed = 0
self._started = False
self._stop = False
self._idle = True
def _run_position(self):
cond = Condition(self._mutex)
warned = False
def update_position(_, line):
p, s = map(int, line.split())
with cond:
if p != self.position():
self._logger().debug(f"Position: {p}, {s}")
self.position(p)
self._speed = s
cond.notify()
self._comms.subscribe("pos", update_position)
last_update = datetime.now()
last_movement = datetime.now()
while True:
with cond:
if self._stop:
return
if not cond.wait(5):
self._comms.cmd_report()
dt = (datetime.now() - last_update).total_seconds()
if dt > 10 and not warned:
warned = True
self._logger().warn(f"No position for {dt} seconds")
else:
last_update = datetime.now()
warned = False
if self._speed != 0:
last_movement = datetime.now()
mov_dt = (datetime.now() - last_movement).total_seconds()
if mov_dt > constants.MIN_IDLE_TIME:
if self.position() < constants.CLOSED_THRESHOLD:
self.state(state.CLOSE)
self._control_update.notify()
elif self.position() > constants.ERROR_THRESHOLD:
self.state(state.ERROR)
self._control_update.notify()
elif self.position() > constants.OPEN_THRESHOLD:
self.state(state.OPEN)
self._control_update.notify()
elif not self._idle:
self._control_update.notify()
self._idle = True
else:
self._idle = False
def _run_control(self):
# Last known state
st = state.IDLE
# Not controlling the lock
controlling = False
cmd = {
state.RESTART: self._comms.cmd_restart,
state.OPEN: self._comms.cmd_open,
state.OPEN_THEN_CLOSE: self._comms.cmd_open,
state.CLOSE: self._comms.cmd_close,
state.CLOSE_THEN_OPEN: self._comms.cmd_close,
}
with self._control_update:
last_target = state.IDLE
last_state = None
action = None
# When starting, reset the MCU once
self.target(state.CLOSE)
# Starting time of the current action
start_time = datetime.now()
timeouts = 0
while not self._stop:
# Wait for an update
self._control_update.wait(1)
if not self._idle:
# If still moving, continue
self._logger().debug(f"Not idle")
continue
# Update was that the target has changed
if self.target() != last_target:
self._logger().debug(f"Target update: {state_names[self.target()]}")
action = cmd.get(self.target(), None)
last_action = cmd.get(last_target, None)
if action != None and last_action != action:
# We need to send a different command for this
self._logger().debug(f"Calling {action}")
action()
elif action == last_action:
self._logger().debug(f"Same command as {state_names[last_target]}")
# Update last known target and starting time
last_target = self.target()
start_time = datetime.now()
if self.state() != last_state:
# State from position handling differs from last known state
self._logger().debug(f"State update, target is {state_names[self.target()]}")
last_state = self.state()
self._logger().info("Reached state "
f"{state_names.get(st, st)}")
if action == None and self.state() != state.ERROR:
self._logger().info("Probably somebody using the key")
self.target(last_state)
last_target = last_state
elif last_state == last_target:
# Reached target
timeouts = 0
if last_target == state.CLOSE \
and self.position() > constants.CLOSED_WANT:
self._logger().info(
f"Position is {self.position()}, "
"closing some more")
self._comms.cmd_close()
controlling = False
elif self.state() == state.ERROR:
# Position too high, restart
self._comm.cmd_restart()
self.target(state.CLOSE)
else:
if timeouts < 3:
timeouts += 1
if self.target() == last_target:
# Initially, switch to the other one
# and execute that
last_target = {
state.CLOSE: state.OPEN_THEN_CLOSE,
state.OPEN: state.CLOSE_THEN_OPEN
}.get(last_target, state.RESTART)
cmd[last_target]()
else:
# Then go back
last_target = self.target()
cmd[last_target]()
else:
# Tried too often, restart
self.target(state.RESTART)
def start(self):
with self._mutex:
if not self._started:
self._started = True
self._stop = False
self._comms.start()
self._position_task = Thread(target=self._run_position,
daemon=True)
self._control_task = Thread(target=self._run_control,
daemon=True)
self._position_task.start()
self._control_task.start()
def stop(self):
with self._mutex:
if self._started:
self._started = False
self._stop = True
self._comms.stop()
self._mutex.release()
self._position_task.join()
self._control_task.join()
def open(self):
with self._mutex:
self.target(state.OPEN)
self._control_update.notify()
def close(self):
with self._mutex:
self.target(state.CLOSE)
self._control_update.notify()
def toggle(self):
with self._mutex:
self._logger().debug("Asked to toggle")
if self.target() == state.OPEN:
self.close()
else:
self.open()
def state(self):
with self._mutex:
return self._state