added tests for DoorHandle class

blueprint_refactoring
Simon Pirkelmann 2022-02-02 21:39:52 +01:00
parent 1320fc55ca
commit 47f378fe6e
2 changed files with 177 additions and 12 deletions

View File

@ -4,23 +4,22 @@ from pathlib import Path
import logging
from datetime import datetime
class DoorHandle:
def __init__(self, token_file, mqtt_host, mqtt_port=1883, nfc_socket='/tmp/nfc.sock', logger=None):
def __init__(self, token_file, mqtt_host, mqtt_port=1883,
nfc_socket='/tmp/nfc.sock', logger=None):
self.state = None
self.encoder_position = None
if not Path(token_file).exists():
raise FileNotFoundError(f"File with door tokens could not be found at {Path(token_file).absolute()}")
raise FileNotFoundError(
"File with door tokens could not be found at "
f"{Path(token_file).absolute()}"
)
self.token_file = token_file
self.last_invalid = {}
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
self.mqtt_client.loop_start()
if logger:
self.logger = logger
else:
@ -31,19 +30,31 @@ class DoorHandle:
self.nfc_sock.connect(nfc_socket)
self.logger.info(f"Connected to NFC socket at {nfc_socket}.")
except Exception as e:
print(f"Could not connect to NFC socket at {nfc_socket}. Exception: {e}")
print(f"Could not connect to NFC socket at {nfc_socket}. "
f"Exception: {e}")
self.nfc_sock = None
#raise
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
self.mqtt_client.loop_start()
self.data_fields = ['name', 'organization', 'email', 'valid_thru']
pass
# The callback for when the client receives a CONNACK response from the server.
def on_connect(self, client, userdata, flags, rc):
self.logger.info("Connected to MQTT broker with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("#")
client.subscribe("door/#")
pass
# The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg):
@ -98,13 +109,13 @@ class DoorHandle:
if self.nfc_sock is not None:
self.nfc_sock.send(b'open ' + user.encode() + b'\n')
else:
raise Exception("No connection to NFC socket. Cannot close door!")
raise RuntimeError("No connection to NFC socket. Cannot close door!")
def close_door(self, user=''):
if self.nfc_sock is not None:
self.nfc_sock.send(b'close ' + user.encode() + b'\n')
else:
raise Exception("No connection to NFC socket. Cannot close door!")
raise RuntimeError("No connection to NFC socket. Cannot close door!")
def get_most_recent_token(self):
# read last invalid token from logfile

154
tests/test_door_handle.py Normal file
View File

@ -0,0 +1,154 @@
import datetime
import time
import pytest
from pathlib import Path
from imaginaerraum_door_admin.door_handle import DoorHandle
import paho.mqtt.client as mqtt
@pytest.fixture
def nfc_socket(tmp_path):
# mock up a Unix socket server for testing
import threading
import socketserver
class ThreadedRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
data = str(self.request.recv(1024), 'ascii')
cur_thread = threading.current_thread()
response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
try:
self.request.sendall(response)
except BrokenPipeError:
pass
class TreadedUnixServer(socketserver.ThreadingMixIn,
socketserver.UnixStreamServer):
pass
nfc_socket_file = tmp_path / 'nfc.sock'
server = TreadedUnixServer(str(nfc_socket_file), ThreadedRequestHandler)
with server:
nfc_socket = server.server_address
# Start a thread with the server -- that thread will then start one
# more thread for each request
server_thread = threading.Thread(target=server.serve_forever)
# Exit the server thread when the main thread terminates
server_thread.daemon = True
server_thread.start()
print("Server loop running in thread:", server_thread.name)
yield nfc_socket
server.shutdown()
def test_create_door_handle(nfc_socket, tmp_path):
# test with invalid token file
with pytest.raises(FileNotFoundError):
door_handle = DoorHandle(token_file='no_such_file',
mqtt_host='test.mosquitto.org')
token_file = tmp_path / 'door_tokens'
token_file.write_text('')
door_handle = DoorHandle(
token_file=token_file, mqtt_host='test.mosquitto.org',
nfc_socket=nfc_socket
)
door_handle.nfc_sock.close()
@pytest.fixture
def door_handle(nfc_socket, tmp_path):
token_file = tmp_path / 'door_tokens'
token_file.write_text(
"""# token | name | organization | email | valid_thru
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
"""
)
door_handle = DoorHandle(
token_file=token_file, mqtt_host='test.mosquitto.org',
nfc_socket=nfc_socket
)
yield door_handle
door_handle.nfc_sock.close()
@pytest.fixture
def door_handle_no_nfc(tmp_path):
token_file = tmp_path / 'door_tokens'
token_file.write_text('')
door_handle = DoorHandle(
token_file=token_file, mqtt_host='test.mosquitto.org',
)
yield door_handle
def test_store_tokens(door_handle):
new_tokens = {
'042979fa186280': {
'name': 'Pippin',
'organization': 'Hobbits',
'email': 'pippin@shire.me',
'valid_thru': None,
'inactive': False
}
}
door_handle.store_tokens(new_tokens)
token_file = Path(door_handle.token_file)
assert token_file.exists()
assert '042979fa186280' in token_file.read_text()
def test_mqtt_messages(door_handle):
# test sending messages to the door_handle via MQTT and see if the state
# gets updated accordingly
client = mqtt.Client()
con = client.connect('test.mosquitto.org', 1883)
client.publish('door/position/value', 10).wait_for_publish(1)
time.sleep(1)
assert door_handle.encoder_position == 10
client.publish('door/state/value', 'open').wait_for_publish(1)
time.sleep(1)
assert door_handle.state == 'open'
timestamp = datetime.datetime.now().replace(microsecond=0)
token = '042979fa186280'
client.publish(
'door/token/last_invalid',
f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')};{token}"
).wait_for_publish(1)
time.sleep(1)
most_recent_token = door_handle.get_most_recent_token()
assert most_recent_token['timestamp'] == timestamp
assert most_recent_token['token'] == token
client.disconnect()
def test_open_door(door_handle):
door_handle.open_door()
def test_open_door_broken_socket(door_handle_no_nfc):
# test broken nfc_socket connection
with pytest.raises(RuntimeError):
door_handle_no_nfc.open_door()
def test_close_door(door_handle):
door_handle.close_door()
def test_close_door_broken_socket(door_handle_no_nfc):
# test broken nfc_socket connection
with pytest.raises(RuntimeError):
door_handle_no_nfc.close_door()