155 lines
4.5 KiB
Python
155 lines
4.5 KiB
Python
|
import datetime
|
||
|
import time
|
||
|
|
||
|
import pytest
|
||
|
from pathlib import Path
|
||
|
from imaginaerraum_door_admin.door_handle import DoorHandle
|
||
|
import paho.mqtt.client as mqtt
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def nfc_socket(tmp_path):
|
||
|
# mock up a Unix socket server for testing
|
||
|
import threading
|
||
|
import socketserver
|
||
|
|
||
|
class ThreadedRequestHandler(socketserver.BaseRequestHandler):
|
||
|
def handle(self):
|
||
|
data = str(self.request.recv(1024), 'ascii')
|
||
|
cur_thread = threading.current_thread()
|
||
|
response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
|
||
|
try:
|
||
|
self.request.sendall(response)
|
||
|
except BrokenPipeError:
|
||
|
pass
|
||
|
|
||
|
class TreadedUnixServer(socketserver.ThreadingMixIn,
|
||
|
socketserver.UnixStreamServer):
|
||
|
pass
|
||
|
|
||
|
nfc_socket_file = tmp_path / 'nfc.sock'
|
||
|
server = TreadedUnixServer(str(nfc_socket_file), ThreadedRequestHandler)
|
||
|
with server:
|
||
|
nfc_socket = server.server_address
|
||
|
|
||
|
# Start a thread with the server -- that thread will then start one
|
||
|
# more thread for each request
|
||
|
server_thread = threading.Thread(target=server.serve_forever)
|
||
|
# Exit the server thread when the main thread terminates
|
||
|
server_thread.daemon = True
|
||
|
server_thread.start()
|
||
|
print("Server loop running in thread:", server_thread.name)
|
||
|
|
||
|
yield nfc_socket
|
||
|
|
||
|
server.shutdown()
|
||
|
|
||
|
|
||
|
def test_create_door_handle(nfc_socket, tmp_path):
|
||
|
# test with invalid token file
|
||
|
with pytest.raises(FileNotFoundError):
|
||
|
door_handle = DoorHandle(token_file='no_such_file',
|
||
|
mqtt_host='test.mosquitto.org')
|
||
|
|
||
|
token_file = tmp_path / 'door_tokens'
|
||
|
token_file.write_text('')
|
||
|
door_handle = DoorHandle(
|
||
|
token_file=token_file, mqtt_host='test.mosquitto.org',
|
||
|
nfc_socket=nfc_socket
|
||
|
)
|
||
|
door_handle.nfc_sock.close()
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def door_handle(nfc_socket, tmp_path):
|
||
|
token_file = tmp_path / 'door_tokens'
|
||
|
token_file.write_text(
|
||
|
"""# token | name | organization | email | valid_thru
|
||
|
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
|
||
|
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
|
||
|
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
|
||
|
"""
|
||
|
)
|
||
|
door_handle = DoorHandle(
|
||
|
token_file=token_file, mqtt_host='test.mosquitto.org',
|
||
|
nfc_socket=nfc_socket
|
||
|
)
|
||
|
yield door_handle
|
||
|
door_handle.nfc_sock.close()
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def door_handle_no_nfc(tmp_path):
|
||
|
token_file = tmp_path / 'door_tokens'
|
||
|
token_file.write_text('')
|
||
|
door_handle = DoorHandle(
|
||
|
token_file=token_file, mqtt_host='test.mosquitto.org',
|
||
|
)
|
||
|
yield door_handle
|
||
|
|
||
|
|
||
|
def test_store_tokens(door_handle):
|
||
|
new_tokens = {
|
||
|
'042979fa186280': {
|
||
|
'name': 'Pippin',
|
||
|
'organization': 'Hobbits',
|
||
|
'email': 'pippin@shire.me',
|
||
|
'valid_thru': None,
|
||
|
'inactive': False
|
||
|
}
|
||
|
}
|
||
|
|
||
|
door_handle.store_tokens(new_tokens)
|
||
|
|
||
|
token_file = Path(door_handle.token_file)
|
||
|
assert token_file.exists()
|
||
|
assert '042979fa186280' in token_file.read_text()
|
||
|
|
||
|
|
||
|
def test_mqtt_messages(door_handle):
|
||
|
# test sending messages to the door_handle via MQTT and see if the state
|
||
|
# gets updated accordingly
|
||
|
client = mqtt.Client()
|
||
|
|
||
|
con = client.connect('test.mosquitto.org', 1883)
|
||
|
client.publish('door/position/value', 10).wait_for_publish(1)
|
||
|
time.sleep(1)
|
||
|
assert door_handle.encoder_position == 10
|
||
|
|
||
|
client.publish('door/state/value', 'open').wait_for_publish(1)
|
||
|
time.sleep(1)
|
||
|
assert door_handle.state == 'open'
|
||
|
|
||
|
timestamp = datetime.datetime.now().replace(microsecond=0)
|
||
|
token = '042979fa186280'
|
||
|
client.publish(
|
||
|
'door/token/last_invalid',
|
||
|
f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')};{token}"
|
||
|
).wait_for_publish(1)
|
||
|
time.sleep(1)
|
||
|
most_recent_token = door_handle.get_most_recent_token()
|
||
|
assert most_recent_token['timestamp'] == timestamp
|
||
|
assert most_recent_token['token'] == token
|
||
|
|
||
|
client.disconnect()
|
||
|
|
||
|
|
||
|
def test_open_door(door_handle):
|
||
|
door_handle.open_door()
|
||
|
|
||
|
|
||
|
def test_open_door_broken_socket(door_handle_no_nfc):
|
||
|
# test broken nfc_socket connection
|
||
|
with pytest.raises(RuntimeError):
|
||
|
door_handle_no_nfc.open_door()
|
||
|
|
||
|
|
||
|
def test_close_door(door_handle):
|
||
|
door_handle.close_door()
|
||
|
|
||
|
|
||
|
def test_close_door_broken_socket(door_handle_no_nfc):
|
||
|
# test broken nfc_socket connection
|
||
|
with pytest.raises(RuntimeError):
|
||
|
door_handle_no_nfc.close_door()
|