DoorAdmin/door.py

79 lines
2.9 KiB
Python
Raw Normal View History

import paho.mqtt.client as mqtt
import regex as re
class Door:
def __init__(self, host, port=1883):
self.state = None
self.encoder_position = None
self.token_file = 'door_tokens'
self.nfc_logfile = 'nfc.log'
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
self.mqtt_client.connect(host=host, port=port)
self.mqtt_client.loop_start()
self.data_fields = ['name', 'organization', 'email', 'valid_thru']
# The callback for when the client receives a CONNACK response from the server.
def on_connect(self, client, userdata, flags, rc):
print("Connected with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("#")
# The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
if msg.topic == 'door/state/value':
self.state = msg.payload.decode()
elif msg.topic == 'door/position/value':
self.encoder_position = int(msg.payload)
def get_tokens(self):
tokens = {}
with open(self.token_file) as f:
token_lines = f.readlines()
for line in token_lines[1:]:
data = line.split('|')
inactive = '#' in data[0]
token = data[0].strip().strip('#')
name = data[1].strip() if len(data) > 1 else None
organization = data[2].strip() if len(data) > 2 else None
email = data[3].strip() if len(data) > 3 else None
valid_thru = data[4].strip() if len(data) > 4 else None
tokens[token] = {'name': name,
'organization': organization,
'email': email,
'valid_thru': valid_thru,
'inactive': inactive}
return tokens
def store_tokens(self, tokens):
output = '# token | ' + ' | '.join(self.data_fields) + '\n'
for t, data in tokens.items():
output += '#' if data['inactive'] else ''
output += t
for key in self.data_fields:
output += '|'
output += data[key] if data[key] else ''
output += '\n'
with open(self.token_file, 'w') as f:
f.write(output)
def get_most_recent_token(self):
# read last invalid token from logfile
with open(self.nfc_logfile) as f:
nfc_log = f.read()
match = re.search(r"(?P<date>\d{4}-\d{2}-\d{2}) (?P<time>\d{2}:\d{2}:\d{2}) Invalid token: (?P<token>[[:xdigit:]]{14})",
nfc_log, re.REVERSE)
return match