DoorControl/door_pi_control/nfc.py

133 lines
5.4 KiB
Python

import select
import socket
import time
import datetime
from threading import Lock, RLock, Condition, Thread
from typing import Callable, Optional, IO, Container
from . import mqtt, util
from .util import timestamp
class DoorControlNfc(util.Loggable):
"""Validates tokens read from a socket and tells a DoorControl to toggle its target"""
def __init__(self, config, control, mqtt_client = None):
super().__init__("nfc")
self._config = config
self._nfc = self._open_nfc_fifo()
self._control = control
self._stop = True
self._mutex = RLock()
self._cond = Condition(self._mutex)
self._fifo = None
self._mqttc = mqtt_client
self.last_invalid_token = mqtt.Value(self._mqttc,
"door/token/last_invalid", persistent=True)
def _open_nfc_fifo(self) -> IO:
"""Opens config.nfc_fifo as the FIFO through which detected tokens are passed in."""
self._logger().debug(f"Opening FIFO {self._config.nfc_fifo}")
return open(self._config.nfc_fifo, "r")
def _read_valid_tokens(self) -> Container:
"""Refreshes all tokens from config.valid_tokens"""
with self._mutex:
valid = {}
try:
self._logger().info("Loading tokens")
lines =[ s.strip() for s in open(self._config.valid_tokens, "r").readlines() ]
for i, line in enumerate(lines):
l = [s.strip() for s in line.split('|')]
if len(l) == 5:
if not l[0].strip().startswith('#'):
token, name, organization, email, valid_thru = l
try:
if len(valid_thru.strip()) > 0:
valid_thru = datetime.date.fromisoformat(valid_thru)
else:
valid_thru = None
except Exception:
self._logger().exception(f"Could not parse valid thru date {valid_thru} for token {token} in line {i+1}")
valid_thru = None
self._logger().debug(f"Got token {token} associated with {name} <{email}> of {organization}, valid thru {valid_thru}")
if token in valid:
self._logger().warning(f"Overwriting token {token}")
valid[token] = {
'name': name,
'organization': organization,
'email': email,
'valid_thru': valid_thru,
}
else:
self._logger().warning(f"Skipping line {i} ({line}) since it does not contain exactly 5 data field")
except Exception as e:
valid = {}
self._logger().error(f"Error reading token file. Exception: {e}")
self._tokens = valid
def _run(self):
with self._cond:
while not self._stop:
if not self._fifo:
self._read_valid_tokens()
self._fifo = self._open_nfc_fifo()
if not self._fifo:
time.sleep(5)
continue
self._cond.release()
readable, _, _ = select.select(
[self._fifo],
[], [], 1)
self._cond.acquire()
if self._fifo in readable:
token = self._fifo.readline().strip()
if token == "":
self._fifo = None
continue
self._logger().debug(f"Token from nfc_fifo: {token}")
self.handle_token(token)
def handle_token(self, token):
with self._mutex:
if token not in self._tokens:
split_token = token.split(':')[0]
if split_token in self._tokens:
self._logger().warning(f"Old-style token: {split_token} -> {token}")
token = split_token
if token in self._tokens:
data = self._tokens[token]
if data['valid_thru'] is not None:
# if a valid thru date has been set we check if the token is still valid
authorized = datetime.date.today() <= data['valid_thru']
if not authorized:
self._logger().info(f"Expired token: {token} of {data['name']}")
else:
authorized = True
if authorized:
self._logger().info(f"Valid token {token} of {data['name']}")
self._control.toggle()
else:
self._logger().warning(f"Invalid token: {token}")
self.last_invalid_token(f"{timestamp()};{token}")
def start(self):
with self._mutex:
if self._stop:
self._stop = False
self._task = Thread(target=self._run, daemon=True)
self._task.start()
print("Done")
def stop(self):
with self._cond:
if not self._stop:
self._stop = True
self._cond.notify()
self._task.join()