479 lines
18 KiB
Python
479 lines
18 KiB
Python
#!/usr/bin/python
|
|
import argparse
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import select
|
|
import socket
|
|
|
|
import paho.mqtt.client as mqcl
|
|
import serial
|
|
|
|
UPDATE_RATE = 2
|
|
MAX_UPDATE_RATE = 20
|
|
ERROR_THRESHOLD = 250
|
|
OPEN_THRESHOLD = 190
|
|
CLOSED_THRESHOLD = 180
|
|
CLOSED_WANT = 50
|
|
MIN_IDLE_TIME = 1
|
|
COMMAND_IDLE_TIME = 1.5
|
|
|
|
def timestamp():
|
|
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
class MqttValue:
|
|
def __init__(self, client, topic, persistent = False, start_value = None, *, translate = None, max_update = 0.1):
|
|
self.client = client
|
|
self.topic = topic
|
|
self.persistent = persistent
|
|
self.value = start_value
|
|
self.last_update = datetime.datetime.now() - datetime.timedelta(seconds = max_update)
|
|
self.last_update_value = None
|
|
self.max_update = max_update
|
|
|
|
if translate == None:
|
|
self.translate = str
|
|
elif callable(translate):
|
|
self.translate = translate
|
|
else:
|
|
self.translate = lambda x: translate[x]
|
|
|
|
if start_value != None:
|
|
self.update(start_value)
|
|
|
|
def update(self, value, force = False, no_update = False):
|
|
if value != self.value or value != self.last_update_value:
|
|
self.value = value
|
|
if force or (not no_update and value != self.last_update_value and (datetime.datetime.now() - self.last_update).total_seconds() >= self.max_update):
|
|
self.last_update = datetime.datetime.now()
|
|
self.last_update_value = value
|
|
self.client.publish(self.topic, self.translate(value), qos = 2, retain = self.persistent)
|
|
|
|
def force_update(self, value):
|
|
self.update(value, force = True)
|
|
|
|
def __call__(self, value = None, **kwargs):
|
|
if value != None:
|
|
self.update(value, **kwargs)
|
|
else:
|
|
return self.value
|
|
|
|
class DoorControl:
|
|
# Actions
|
|
IDLE, CLOSE, OPEN_THEN_CLOSE, OPEN, CLOSE_THEN_OPEN, ERROR = range(6)
|
|
state_names = {
|
|
OPEN: "open",
|
|
CLOSE: "close",
|
|
ERROR: "error",
|
|
}
|
|
action_names = {
|
|
IDLE: "idling",
|
|
OPEN: "waiting for open door",
|
|
CLOSE: "waiting for closed door",
|
|
ERROR: "error",
|
|
}
|
|
|
|
def __init__(self, config):
|
|
self.config = config
|
|
|
|
self.mqttc = mqcl.Client()
|
|
self.logger = self._setup_logging(config)
|
|
self._open_serial_port(config)
|
|
self.valid_tokens = self._read_valid_tokens(config)
|
|
self.nfc_fifo = self._open_nfc_fifo(config)
|
|
self.control_socket, self.comm_channels = self._open_control_socket(config)
|
|
|
|
self.mqttc.on_connect = lambda client, userdata, flags, rc: self.logger.info(f"Connected to mqtt host with result {rc}")
|
|
self.mqttc.connect_async(config.mqtt_host, keepalive=60)
|
|
self.mqttc.loop_start()
|
|
|
|
# Current door state
|
|
self.state = MqttValue(self.mqttc, "door/state/value", True, translate = self.state_names)
|
|
self.target_state = MqttValue(self.mqttc, "door/state/target", True, DoorControl.CLOSE, translate = self.state_names, max_update = 0)
|
|
self.state_pos = MqttValue(self.mqttc, "door/position/value", True, 0)
|
|
self.last_invalid_token = MqttValue(self.mqttc, "door/token/last_invalid", True)
|
|
self.speed = MqttValue(self.mqttc, "door/position/speed")
|
|
# Current target action
|
|
self.action = MqttValue(self.mqttc, "door/state/action", True, DoorControl.CLOSE, translate = self.action_names, max_update = 0)
|
|
# Start time of the current action
|
|
self.start_time = None
|
|
# How often the action was repeated
|
|
self.repeats = 0
|
|
|
|
self.last_handled_state = self.OPEN
|
|
self.last_door_pos_time = datetime.datetime.now() - datetime.timedelta(minutes = 10)
|
|
self.last_position = 0
|
|
self.last_handle_state_timestamp = datetime.datetime.now()
|
|
self.idle_start_time = None
|
|
self.last_command_time = datetime.datetime.now() - datetime.timedelta(COMMAND_IDLE_TIME)
|
|
|
|
def idle_time(self):
|
|
if self.idle_start_time != None:
|
|
return (datetime.datetime.now() - self.idle_start_time).total_seconds()
|
|
else:
|
|
return 0
|
|
|
|
def _send_door_cmd(self, cmd: bytes):
|
|
"""Send a command to the door."""
|
|
if cmd != b'R': self.logger.debug(f"Sending {cmd}")
|
|
return self.serial_port.write(cmd)
|
|
|
|
def _read_door_line(self):
|
|
"""Read a single line from the serial port"""
|
|
return self.serial_port.readline().decode('ascii')
|
|
|
|
def _setup_logging(self, config):
|
|
"""Set up logging"""
|
|
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
logging.basicConfig(force = True,
|
|
level = logging.DEBUG,
|
|
format = log_format)
|
|
|
|
# create logger
|
|
logger = logging.getLogger('nfc_log')
|
|
|
|
# create console handler and set level to debug
|
|
fh = logging.FileHandler(config.log_file)
|
|
fh.setLevel(logging.DEBUG)
|
|
|
|
# create formatter
|
|
formatter = logging.Formatter(log_format)
|
|
fh.setFormatter(formatter)
|
|
logger.addHandler(fh)
|
|
|
|
return logger
|
|
|
|
def _open_control_socket(self, config):
|
|
"""(Re-)creates and opens the control socket. Config must have a control_socket member."""
|
|
self.logger.debug("Opening control socket")
|
|
if os.path.exists(config.control_socket):
|
|
os.unlink(config.control_socket)
|
|
|
|
control_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
control_socket.bind(config.control_socket)
|
|
control_socket.listen(5)
|
|
comm_channels = []
|
|
|
|
return control_socket, comm_channels
|
|
|
|
def _open_serial_port(self, config):
|
|
"""Opens the serial port controlling the door opener. config must have a serial_port member."""
|
|
try:
|
|
self.serial_port = serial.Serial(config.serial_port, timeout=2)
|
|
self._send_door_cmd(b'>r1\n')
|
|
except:
|
|
self.serial_port = None
|
|
return self.serial_port
|
|
|
|
def _open_nfc_fifo(self, config):
|
|
"""Opens config.nfc_fifo as the FIFO through which detected tokens are passed in."""
|
|
nfc_fifo = open(config.nfc_fifo, "r")
|
|
return nfc_fifo
|
|
|
|
def _read_valid_tokens(self, config):
|
|
"""Refreshes all tokens from config.valid_tokens"""
|
|
valid = {}
|
|
try:
|
|
self.logger.info("Loading tokens")
|
|
lines =[ s.strip() for s in open(config.valid_tokens, "r").readlines() ]
|
|
for i, line in enumerate(lines):
|
|
l = line.split('|')
|
|
if len(l) == 5:
|
|
if not l[0].strip().startswith('#'):
|
|
token, name, organization, email, valid_thru = l
|
|
try:
|
|
if len(valid_thru.strip()) > 0:
|
|
valid_thru = datetime.date.fromisoformat(valid_thru)
|
|
else:
|
|
valid_thru = None
|
|
except Exception:
|
|
logging.error(f"Could not parse valid thru date for token {token} in line {i}")
|
|
valid_thru = None
|
|
logging.debug(f"Got token {token} associated with {name} <{email}> of {organization}, valid thru {valid_thru}")
|
|
if token in valid:
|
|
logging.warning(f"Overwriting token {token}")
|
|
valid[token] = {'name': name, 'organization': organization, 'email': email,
|
|
'valid_thru': valid_thru}
|
|
else:
|
|
logging.warning(f"Skipping line {i} ({line}) since it does not contain exactly 5 data field")
|
|
except Exception as e:
|
|
valid = {}
|
|
logging.error(f"Error reading token file. Exception: {e}")
|
|
|
|
return valid
|
|
|
|
def _set_position(self, data: str):
|
|
"""Set a new door position"""
|
|
pos, speed = [ int(x) for x in data.strip().split() ]
|
|
self.state_pos(pos)
|
|
self.speed(speed)
|
|
|
|
if self.speed() != 0 and self.idle_start_time != None:
|
|
self.idle_start_time = None
|
|
elif self.speed() == 0 and self.idle_start_time == None:
|
|
self.idle_start_time = datetime.datetime.now()
|
|
|
|
if pos > ERROR_THRESHOLD and self.state() != DoorControl.ERROR:
|
|
self.logger.error("Invalid position:", state)
|
|
self.state(DoorControl.ERROR)
|
|
elif pos > OPEN_THRESHOLD and self.state() != DoorControl.OPEN:
|
|
self.state(DoorControl.OPEN)
|
|
elif pos < CLOSED_THRESHOLD and self.state() != DoorControl.CLOSE:
|
|
self.state(DoorControl.CLOSE)
|
|
|
|
def _check_reporting(self, current: int):
|
|
if current == 0:
|
|
self.logger.info("Turning position reporting on")
|
|
self._send_door_cmd(b'>r1\n')
|
|
else:
|
|
self.logger.info("Position reporting is on")
|
|
|
|
def handle_door_line(self):
|
|
"""Reads a single line from the serial port and handles it"""
|
|
data = self._read_door_line().strip()
|
|
|
|
handling = {
|
|
"pos": (str, self._set_position),
|
|
"pos reporting": (int, self._check_reporting),
|
|
}
|
|
|
|
try:
|
|
prefix, data = data.split(':')
|
|
if prefix in handling:
|
|
data_type, fun = handling[prefix]
|
|
fun(data_type(data))
|
|
except:
|
|
pass
|
|
|
|
def poll_door_state(self):
|
|
"""Checks the door state if the last polling was a while ago, and returns the current state"""
|
|
t = (datetime.datetime.now() - self.last_door_pos_time).total_seconds()
|
|
if t >= 0.5:
|
|
self.last_door_pos_time = datetime.datetime.now()
|
|
self._send_door_cmd(b"R")
|
|
self.handle_door_line()
|
|
return self.state()
|
|
|
|
def handle_door_state(self):
|
|
"""Checks the door state and executes any actions necessary to reach the target state"""
|
|
# Commands associated with each target state
|
|
target_state_cmd = {
|
|
DoorControl.OPEN: b'O',
|
|
DoorControl.CLOSE: b'C'
|
|
}
|
|
|
|
now = datetime.datetime.now()
|
|
|
|
old_state = self.last_handled_state
|
|
delta_t = (now - self.last_handle_state_timestamp).total_seconds()
|
|
|
|
# If no serial port, try to open it or return
|
|
if not self.serial_port:
|
|
try:
|
|
self._open_serial_port(self.config)
|
|
except:
|
|
return
|
|
|
|
self.poll_door_state()
|
|
|
|
if (self.idle_time() < MIN_IDLE_TIME) and delta_t < 5:
|
|
return
|
|
|
|
# Get new state
|
|
self.last_handled_state = self.state()
|
|
self.last_handle_state_timestamp = now
|
|
|
|
if self.state() == DoorControl.ERROR:
|
|
self.logger.error("Restarting the MCU and exiting.")
|
|
self.send_door_cmd(b'S')
|
|
return
|
|
|
|
# Idle + change = key?
|
|
if self.action() == DoorControl.IDLE:
|
|
if self.state() != old_state:
|
|
self.logger.info(f"Door changed unexpectedly: {DoorControl.state_names[self.state()]}")
|
|
self.target_state(self.state())
|
|
self.start_time = now
|
|
if self.state() == DoorControl.CLOSE \
|
|
and self.start_time \
|
|
and (now - self.start_time).total_seconds() >= self.config.state_timeout:
|
|
self.start_time = None
|
|
if self.state_pos() <= CLOSED_THRESHOLD and self.state_pos() > CLOSED_WANT:
|
|
self.logger.info("Closing door a bit more")
|
|
self._send_door_cmd(target_state_cmd[DoorControl.CLOSE])
|
|
return
|
|
|
|
if self.start_time is None:
|
|
self.start_time = now
|
|
self._send_door_cmd(target_state_cmd[self.action()])
|
|
|
|
# Target state reached
|
|
if self.state() == self.target_state():
|
|
self.action(DoorControl.IDLE)
|
|
self.logger.debug(f"Reached target position: {DoorControl.state_names[self.state()]}")
|
|
self.repeats = 0
|
|
self.start_time = now
|
|
return
|
|
elif self.state() != old_state:
|
|
# Changed state, try to go to the target state again
|
|
self.start_time = now
|
|
self.action(old_state)
|
|
self._send_door_cmd(target_state_cmd[self.action()])
|
|
|
|
# Execution time
|
|
t = (now - self.start_time).total_seconds()
|
|
if t >= self.config.state_timeout:
|
|
# Timeout -> switch to timeout action
|
|
if self.action() == DoorControl.OPEN:
|
|
self.action(DoorControl.CLOSE)
|
|
else:
|
|
self.action(DoorControl.OPEN)
|
|
self.start_time = None
|
|
self.repeats = 0
|
|
self.logger.debug(f"Timeout. Switching to {DoorControl.action_names[self.action()]}")
|
|
elif t >= (1 + self.repeats) * self.config.repeat_time:
|
|
# Repeat every couple of seconds
|
|
self.repeats += 1
|
|
self.logger.debug(f"Repeating command: {target_state_cmd[self.action()]}")
|
|
self._send_door_cmd(target_state_cmd[self.action()])
|
|
|
|
def open_door(self):
|
|
self.logger.info("Opening the door")
|
|
self.target_state(DoorControl.OPEN)
|
|
if self.action() == DoorControl.IDLE:
|
|
self.action(DoorControl.OPEN)
|
|
self.start_time = None
|
|
self.handle_door_state()
|
|
|
|
def close_door(self):
|
|
self.logger.info("Closing the door")
|
|
self.target_state(DoorControl.CLOSE)
|
|
if self.action() == DoorControl.IDLE:
|
|
self.action(DoorControl.CLOSE)
|
|
self.start_time = None
|
|
self.handle_door_state()
|
|
|
|
def toggle_door_state(self):
|
|
if self.action() != DoorControl.IDLE:
|
|
return
|
|
if self.target_state() == DoorControl.CLOSE:
|
|
self.open_door()
|
|
else:
|
|
self.close_door()
|
|
|
|
def handle_nfc_token(self, token = None):
|
|
if not token:
|
|
token = self.nfc_fifo.readline().strip()
|
|
self.logger.debug(f"Token from nfc_fifo: {token}")
|
|
if token == "":
|
|
self.logger.debug("Opening nfc_fifo")
|
|
self.nfc_fifo = self._open_nfc_fifo(self.config)
|
|
return
|
|
|
|
token = token.strip()
|
|
if token in self.valid_tokens:
|
|
data = self.valid_tokens[token]
|
|
|
|
if data['valid_thru'] is not None:
|
|
# if a valid thru date has been set we check if the token is still valid
|
|
authorized = datetime.date.today() <= data['valid_thru']
|
|
else:
|
|
# otherwise we don't need to check
|
|
authorized = True
|
|
|
|
if authorized:
|
|
self.logger.info(f"Valid token {token} of {data['name']}")
|
|
self.toggle_door_state()
|
|
else:
|
|
self.logger.warning(f"Token {token} of {data['name']} expired on {data['valid_thru']}")
|
|
else:
|
|
self.logger.warning(f"Invalid token: {token}")
|
|
self.last_invalid_token(f"{timestamp()};{token}")
|
|
|
|
class LineBuffer(object):
|
|
def __init__(self, f, handler):
|
|
self.data = b''
|
|
self.f = f
|
|
self.handler = handler
|
|
|
|
def update(self):
|
|
data = self.f.recv(1024)
|
|
if not data:
|
|
return False
|
|
self.data += data
|
|
d = self.data.split(b'\n')
|
|
d, self.data = d[:-1], d[-1]
|
|
for i in d:
|
|
self.handler(self.f, i)
|
|
return True
|
|
|
|
def handle_cmd(self, comm, data):
|
|
cmd = data.decode('utf8').split()
|
|
cmd, args = cmd[0], cmd[1:]
|
|
self.logger.debug(f"Got command: {data}")
|
|
|
|
send = lambda x: comm.send(x.encode('utf8'))
|
|
|
|
if cmd == 'fake':
|
|
self.logger.debug(f"Faking token {args[0]}")
|
|
send("Handling token\n")
|
|
self.handle_nfc_token(args[0])
|
|
elif cmd == 'open':
|
|
self.logger.debug("Control socket opening door")
|
|
send("Opening door")
|
|
self.open_door()
|
|
elif cmd == 'close':
|
|
self.logger.debug("Control socket closing door")
|
|
send("Closing door")
|
|
self.close_door()
|
|
elif cmd == 'rld':
|
|
self.logger.debug("Reloading tokens")
|
|
send("Reloading tokens")
|
|
self.valid_tokens = self._read_valid_tokens(self.config)
|
|
elif cmd == 'stat':
|
|
send("Door status is %s, position is %d. Current action: %s (%g seconds ago)\n" % (
|
|
DoorControl.state_names.get(self.state, "None"),
|
|
self.state_pos.value,
|
|
DoorControl.action_names[self.action()],
|
|
(datetime.datetime.now() - self.start_time).total_seconds()))
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--serial_port", default="/dev/serial/by-id/usb-Imaginaerraum.de_DoorControl_43363220195053573A002C0-if01")
|
|
parser.add_argument("--nfc_fifo", default="/tmp/nfc_fifo")
|
|
parser.add_argument("--control_socket", default="/tmp/nfc.sock")
|
|
parser.add_argument("--valid_tokens", default="/etc/door_tokens")
|
|
parser.add_argument("--log_file", default="/tmp/nfc.log")
|
|
parser.add_argument("--state_timeout", type=float, default=10)
|
|
parser.add_argument("--state_timeout_speed", type=float, default=3)
|
|
parser.add_argument("--repeat_time", type=float, default=COMMAND_IDLE_TIME)
|
|
parser.add_argument("--mqtt_host", default="10.10.21.2")
|
|
|
|
config = parser.parse_args()
|
|
|
|
dc = DoorControl(config)
|
|
|
|
buffers = {}
|
|
while True:
|
|
readable = select.select([ dc.serial_port.fileno(), dc.nfc_fifo, dc.control_socket ] + dc.comm_channels, [], [], 1 / UPDATE_RATE)[0]
|
|
|
|
for c in readable:
|
|
if c == dc.serial_port.fileno():
|
|
dc.handle_door_line()
|
|
elif c == dc.nfc_fifo:
|
|
dc.handle_nfc_token()
|
|
elif c == dc.control_socket:
|
|
dc.logger.info("Got connection")
|
|
sock = dc.control_socket.accept()[0]
|
|
buffers[sock] = dc.LineBuffer(sock, dc.handle_cmd)
|
|
dc.comm_channels += [sock]
|
|
else:
|
|
if not buffers[c].update():
|
|
dc.logger.info("Lost connection")
|
|
del buffers[c]
|
|
dc.comm_channels.remove(c)
|
|
dc.handle_door_state()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|