Compare commits

...

10 Commits

8 changed files with 701 additions and 369 deletions

View File

@ -1,7 +1,77 @@
Flask-based web interface for user token adminstration of our hackerspace's door lock. Flask-based web interface for user token adminstration of our hackerspace's door lock.
## Installation
Clone the repo
```shell
git clone <path_to_repo>
```
Install using pip
```shell
pip install .
```
# Development ## Running the app
```shell
export FLASK_APP=imaginaerraum_door_admin
flask run
```
## Configuration
You can set custom configuration options by defining an environment variable
``APPLICATION_SETTINGS`` pointing to a file with configuration options.
For example, consider the following configuration file ``app_config.py``:
```
# door app configuration
SECRET_KEY = 'mysupersecretkey'
SECURITY_PASSWORD_SALT = 'saltycaramel'
TESTING = False
DEBUG = False
TOKEN_FILE = 'door_tokens.txt'
ADMIN_FILE = 'admins.txt'
NFC_SOCKET = "/tmp/nfc.sock"
NFC_LOG = "nfc.log"
```
To instruct the flask app to use this configuration, use the following commands:
```shell
export APPLICATION_SETTINGS=app_config.py
flask run
```
Below, you can find a list of configuration options.
### Flask app configuration
You can override common Flask configuration variables. In particular, you
definitely should set custom values for the ``SECRET_KEY`` and
``SECURITY_PASSWORD_SALT``.
### Token file
The token file is an ASCII file which list the IDs of RFID tokens that can be
used to unlock the door. You can specify the path to the token file using the
``TOKEN_FILE`` variable in the configuration file.
Here's an example of a token file (lines starting with ``#`` represent inactive
tokens):
```
# token | name | organization | email | valid_thru
#042979fa186280||||
04487cfa176280|Frodo|Hobbits|frodo@shire.me|
043a85fa1a6280|Gandalf|Wizards|gandalf@middleearth.me|
#04206e2aef6880|Boromir|Humans|boromir@gondor.me|
```
### Admin file
``ADMIN_FILE`` -> file to create new super admins
### User database
### NFC files
``NFC_SOCKET = "/tmp/nfc.sock"`` -> unix socket to interact with the door
``NFC_LOG = "nfc.log"`` -> log file of door events
## Development
```shell ```shell
cd tests cd tests
export APPLICATION_SETTINGS=/home/simon/imaginaerraum/door-lock/webinterface/tests/debug_app_config.py export APPLICATION_SETTINGS=/home/simon/imaginaerraum/door-lock/webinterface/tests/debug_app_config.py

View File

@ -5,7 +5,6 @@ from flask_security import Security, SQLAlchemyUserDatastore, hash_password
from email_validator import validate_email from email_validator import validate_email
from pathlib import Path from pathlib import Path
#from .webapp import door_app
from .door_handle import DoorHandle from .door_handle import DoorHandle
security = Security() security = Security()
@ -13,128 +12,85 @@ db = SQLAlchemy()
# create admin users (only if they don't exists already) # create admin users (only if they don't exists already)
def create_super_admins(app, db, user_datastore, logger): def create_super_admins(app, user_datastore):
admin_file = Path(app.config.get('ADMIN_FILE'))
# setup user database when starting the app # setup user database when starting the app
with app.app_context(): new_admin_data = []
new_admin_data = [] if not admin_file.exists():
if app.config['ADMIN_FILE'] is not None: app.logger.warning(
if not Path(app.config['ADMIN_FILE']).exists(): f"Admin user creation file not found at path "
logger.warning( f"{admin_file.absolute()}."
f"Admin user creation file not found at {app.config['ADMIN_FILE']}") f"No super admins have been created in the datastore."
else: )
# store data for new admins in memory s.t. the file can be deleted afterwards
with open(app.config['ADMIN_FILE']) as f:
for i, line in enumerate(f.readlines()):
if not line.strip().startswith('#'):
try:
user, email, pw = line.split()
validate_email(email)
new_admin_data.append(
{'username': user, 'email': email,
'password': pw})
except Exception as e:
print(
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
db.create_all()
super_admin_role = user_datastore.find_or_create_role(
'super_admin') # root admin = can create other admins
admin_role = user_datastore.find_or_create_role(
'admin') # 'normal' admin
local_role = user_datastore.find_or_create_role(
'local') # LDAP user or local user
for d in new_admin_data:
if user_datastore.find_user(email=d['email'],
username=d['username']) is None:
roles = [super_admin_role, admin_role]
if not d['password'] == 'LDAP':
roles.append(local_role)
logger.info(
f"New super admin user created with username '{d['username']}' and email '{d['email']}', roles = {[r.name for r in roles]}")
# create new admin (only if admin does not already exist)
new_admin = user_datastore.create_user(email=d['email'],
username=d[
'username'],
password=hash_password(
d['password']),
roles=roles)
db.session.commit()
def setup_logging(app):
# set up logging for the web app
logger = logging.getLogger('webapp')
logger.setLevel(logging.INFO)
if app.config['LOG_FILE'] is not None:
ch = logging.FileHandler(app.config['LOG_FILE'])
ch.setLevel(logging.INFO)
else: else:
# create console handler and set level to debug # store data for new admins in memory s.t. the file can be deleted
ch = logging.StreamHandler() # afterwards
ch.setLevel(logging.DEBUG) admin_data = admin_file.read_text().split('\n')
for i, line in enumerate(admin_data):
if not line.strip().startswith('#'):
try:
user, email, pw = line.split()
validate_email(email)
new_admin_data.append(
{'username': user, 'email': email,
'password': pw})
except Exception as e:
app.logger.error(
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created."
)
# create formatter with app.app_context():
formatter = logging.Formatter( db.create_all()
'%(asctime)s - %(name)s - %(levelname)s - %(message)s') super_admin_role = user_datastore.find_or_create_role(
# add formatter to ch 'super_admin') # root admin = can create other admins
ch.setFormatter(formatter) admin_role = user_datastore.find_or_create_role(
# add ch to logger 'admin') # 'normal' admin
logger.addHandler(ch) local_role = user_datastore.find_or_create_role(
'local') # LDAP user or local user
for d in new_admin_data:
if user_datastore.find_user(email=d['email'],
username=d['username']) is None:
roles = [super_admin_role, admin_role]
if not d['password'] == 'LDAP':
roles.append(local_role)
# create new admin (only if admin does not already exist)
new_admin = user_datastore.create_user(
email=d['email'], username=d['username'],
password=hash_password(d['password']), roles=roles
)
app.logger.info(
f"New super admin user created with username "
f"'{new_admin.username}' and email '{new_admin.email}'"
f", roles = {[r.name for r in new_admin.roles]}"
)
db.session.commit()
return logger
def create_app(): def create_app():
app = Flask(__name__) app = Flask(__name__)
app.config.from_object('imaginaerraum_door_admin.default_app_config.DefaultConfig') app.config.from_object('imaginaerraum_door_admin.default_app_config.DefaultConfig')
app.config.from_envvar('APPLICATION_SETTINGS') app.config.from_envvar('APPLICATION_SETTINGS', silent=True)
logger = setup_logging(app)
# do some checks for file existence etc. token_file = Path(app.config.get('TOKEN_FILE'))
try: if not token_file.exists():
with open(app.config['KEY_FILE']) as f: app.logger.warning(
data = f.readlines() f"Token file not found at {token_file.absolute()}. "
if 'SECRET_KEY' in data[0]: "An empty token file will be created."
secret_key = data[0].split()[-1] )
else: token_file.touch()
raise Exception("Could not read SECURITY_PASSWORD_SALT")
if 'SECURITY_PASSWORD_SALT' in data[1]:
security_password_salt = data[1].split()[-1]
else:
raise Exception("Could not read SECURITY_PASSWORD_SALT")
except Exception as e:
logger.warning(
f"Flask keys could not be read from file at {Path(app.config['KEY_FILE']).absolute()}. Exception: {e}. Using default values instead.")
secret_key = 'Q7PJu2fg2jabYwP-Psop6c6f2G4'
security_password_salt = '10036796768252925167749545152988277953'
if Path(app.config['TEMPLATE_FOLDER']).is_absolute(): # create door objects which provides access to the token file and current
if not Path(app.config['TEMPLATE_FOLDER']).exists(): # door state via MQTT
logger.error( app.door = DoorHandle(
f"Flask template folder not found at {Path(app.config['TEMPLATE_FOLDER']).absolute()}") token_file=token_file, mqtt_host=app.config['MQTT_HOST'],
else: nfc_socket=app.config['NFC_SOCKET'], logger=app.logger
if not (Path(__file__).parent / app.config['TEMPLATE_FOLDER']).exists(): )
logger.error(
f"Flask template folder not found at {(Path(__file__).parent / app.config['TEMPLATE_FOLDER']).absolute()}")
if Path(app.config['STATIC_FOLDER']).is_absolute():
if not Path(app.config['STATIC_FOLDER']).exists():
logger.error(
f"Flask static folder not found at {Path(app.config['STATIC_FOLDER']).absolute()}")
else:
if not (Path(__file__).parent / app.config['STATIC_FOLDER']).exists():
logger.error(
f"Flask static folder not found at {(Path(__file__).parent / app.config['STATIC_FOLDER']).absolute()}")
if not Path(app.config['TOKEN_FILE']).exists():
logger.warning(
f"Token file not found at {Path(app.config['TOKEN_FILE']).absolute()}")
# create door objects which provides access to the token file and current door state via MQTT
app.door = DoorHandle(token_file=app.config['TOKEN_FILE'], mqtt_host=app.config['MQTT_HOST'],
nfc_socket=app.config['NFC_SOCKET'],
logger=logger)
# Mail Config # Mail Config
#mail = Mail(app) #mail = Mail(app)
@ -151,6 +107,6 @@ def create_app():
user_datastore = SQLAlchemyUserDatastore(db, User, Role) user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security.init_app(app, user_datastore, login_form=ExtendedLoginForm) security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
create_super_admins(app, db, user_datastore, logger) create_super_admins(app, user_datastore)
return app return app

View File

@ -10,9 +10,7 @@ class DefaultConfig(object):
DEBUG = False DEBUG = False
SECRET_KEY = 'supersecret' SECRET_KEY = 'supersecret'
SECURITY_PASSWORD_SALT = 'salty'
TEMPLATE_FOLDER = 'templates'
STATIC_FOLDER = 'static'
SECURITY_REGISTERABLE = False SECURITY_REGISTERABLE = False
SECURITY_CHANGEABLE = True SECURITY_CHANGEABLE = True
@ -47,7 +45,8 @@ class DefaultConfig(object):
} }
KEY_FILE = '/root/flask_keys' KEY_FILE = '/root/flask_keys'
TOKEN_FILE = "/etc/door_tokens" TOKEN_FILE = "door_tokens"
ADMIN_FILE = 'admins'
LDAP_URL = "ldaps://ldap.imaginaerraum.de" LDAP_URL = "ldaps://ldap.imaginaerraum.de"
LDAP_USER_GROUP = 'Users' LDAP_USER_GROUP = 'Users'
@ -55,6 +54,6 @@ class DefaultConfig(object):
LDAP_DOMAIN_EXT = 'de' LDAP_DOMAIN_EXT = 'de'
NFC_SOCKET = "/tmp/nfc.sock" NFC_SOCKET = "/tmp/nfc.sock"
LOG_FILE = "/var/log/webinterface.log" LOG_FILE = "webinterface.log"
NFC_LOG = "/var/log/nfc.log" NFC_LOG = "nfc.log"
MQTT_HOST = '10.10.21.2' MQTT_HOST = '10.10.21.2'

View File

@ -4,23 +4,22 @@ from pathlib import Path
import logging import logging
from datetime import datetime from datetime import datetime
class DoorHandle: class DoorHandle:
def __init__(self, token_file, mqtt_host, mqtt_port=1883, nfc_socket='/tmp/nfc.sock', logger=None): def __init__(self, token_file, mqtt_host, mqtt_port=1883,
nfc_socket='/tmp/nfc.sock', logger=None):
self.state = None self.state = None
self.encoder_position = None self.encoder_position = None
if not Path(token_file).exists(): if not Path(token_file).exists():
raise FileNotFoundError(f"File with door tokens could not be found at {Path(token_file).absolute()}") raise FileNotFoundError(
"File with door tokens could not be found at "
f"{Path(token_file).absolute()}"
)
self.token_file = token_file self.token_file = token_file
self.last_invalid = {} self.last_invalid = {}
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
self.mqtt_client.loop_start()
if logger: if logger:
self.logger = logger self.logger = logger
else: else:
@ -31,19 +30,31 @@ class DoorHandle:
self.nfc_sock.connect(nfc_socket) self.nfc_sock.connect(nfc_socket)
self.logger.info(f"Connected to NFC socket at {nfc_socket}.") self.logger.info(f"Connected to NFC socket at {nfc_socket}.")
except Exception as e: except Exception as e:
print(f"Could not connect to NFC socket at {nfc_socket}. Exception: {e}") print(f"Could not connect to NFC socket at {nfc_socket}. "
f"Exception: {e}")
self.nfc_sock = None self.nfc_sock = None
#raise #raise
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
self.mqtt_client.loop_start()
self.data_fields = ['name', 'organization', 'email', 'valid_thru'] self.data_fields = ['name', 'organization', 'email', 'valid_thru']
pass
# The callback for when the client receives a CONNACK response from the server. # The callback for when the client receives a CONNACK response from the server.
def on_connect(self, client, userdata, flags, rc): def on_connect(self, client, userdata, flags, rc):
self.logger.info("Connected to MQTT broker with result code " + str(rc)) self.logger.info("Connected to MQTT broker with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and # Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed. # reconnect then subscriptions will be renewed.
client.subscribe("#") client.subscribe("door/#")
pass
# The callback for when a PUBLISH message is received from the server. # The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg): def on_message(self, client, userdata, msg):
@ -98,13 +109,13 @@ class DoorHandle:
if self.nfc_sock is not None: if self.nfc_sock is not None:
self.nfc_sock.send(b'open ' + user.encode() + b'\n') self.nfc_sock.send(b'open ' + user.encode() + b'\n')
else: else:
raise Exception("No connection to NFC socket. Cannot close door!") raise RuntimeError("No connection to NFC socket. Cannot close door!")
def close_door(self, user=''): def close_door(self, user=''):
if self.nfc_sock is not None: if self.nfc_sock is not None:
self.nfc_sock.send(b'close ' + user.encode() + b'\n') self.nfc_sock.send(b'close ' + user.encode() + b'\n')
else: else:
raise Exception("No connection to NFC socket. Cannot close door!") raise RuntimeError("No connection to NFC socket. Cannot close door!")
def get_most_recent_token(self): def get_most_recent_token(self):
# read last invalid token from logfile # read last invalid token from logfile

View File

@ -0,0 +1,46 @@
from flask import flash
from flask_wtf import FlaskForm
from wtforms.fields import DateField, EmailField
from wtforms.fields import StringField, BooleanField
from wtforms.validators import DataRequired, ValidationError, EqualTo
from datetime import date
def validate_valid_thru_date(form, field):
if form.limit_validity.data:
# only check date format if limited validity of token is set
try:
if not field.data >= date.today():
raise ValueError
except ValueError as e:
flash("Ungültiges Datum")
raise ValidationError
return True
class TokenForm(FlaskForm):
name = StringField("Name", validators=[DataRequired()])
email = EmailField("E-Mail", validators=[DataRequired()])
organization = StringField("Organization", validators=[DataRequired()])
limit_validity = BooleanField("Gültigkeit begrenzen?")
valid_thru = DateField("Gültig bis", validators=[validate_valid_thru_date])
active = BooleanField("Aktiv?")
dsgvo = BooleanField(
"Einwilligung Nutzungsbedingungen erfragt?", validators=[DataRequired()]
)
class ConfirmDeleteForm(FlaskForm):
name = StringField(
"Name",
validators=[
DataRequired(),
EqualTo("name_confirm", "Name stimmt nicht überein"),
],
)
name_confirm = StringField("Name confirm")
class AdminCreationForm(FlaskForm):
name = StringField("Name", validators=[DataRequired()])
email = EmailField("E-Mail", validators=[DataRequired()])

View File

@ -1,11 +1,19 @@
from flask import render_template, request, flash, redirect, session, send_file, Blueprint, current_app from flask import (
render_template,
request,
flash,
redirect,
send_file,
Blueprint,
current_app,
)
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from flask_wtf import FlaskForm from flask_security import (
from wtforms.fields import DateField, EmailField auth_required,
from wtforms.fields import StringField, BooleanField hash_password,
from wtforms.validators import DataRequired, ValidationError, EqualTo current_user,
from flask_security import auth_required, hash_password, \ roles_required
current_user, roles_required )
from flask_security.views import change_password from flask_security.views import change_password
from email_validator import validate_email from email_validator import validate_email
@ -17,207 +25,231 @@ import tempfile
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from imaginaerraum_door_admin import db, security from imaginaerraum_door_admin import db, security
from imaginaerraum_door_admin.forms import (
AdminCreationForm,
ConfirmDeleteForm,
TokenForm,
)
def validate_valid_thru_date(form, field): door_app = Blueprint("door_app", __name__, template_folder="templates")
if form.limit_validity.data: # only check date format if limited validity of token is set
try:
if not field.data >= date.today():
raise ValueError
except ValueError as e:
flash("Ungültiges Datum")
raise ValidationError
return True
class TokenForm(FlaskForm): # we override the change_password view from flask security to only allow local
name = StringField('Name', validators=[DataRequired()]) # users to change their passwords
email = EmailField('E-Mail', validators=[DataRequired()])
organization = StringField('Organization', validators=[DataRequired()])
limit_validity = BooleanField('Gültigkeit begrenzen?')
valid_thru = DateField('Gültig bis', validators=[validate_valid_thru_date])
active = BooleanField('Aktiv?')
dsgvo = BooleanField('Einwilligung Nutzungsbedingungen erfragt?', validators=[DataRequired()])
class ConfirmDeleteForm(FlaskForm):
name = StringField('Name', validators=[DataRequired(), EqualTo('name_confirm', 'Name stimmt nicht überein')])
name_confirm = StringField('Name confirm')
class AdminCreationForm(FlaskForm):
name = StringField('Name', validators=[DataRequired()])
email = EmailField('E-Mail', validators=[DataRequired()])
door_app = Blueprint('door_app', __name__,
template_folder='templates')
# we override the change_password view from flask security to only allow local users to change their passwords
# LDAP users should use the LDAP self service for changing passwords # LDAP users should use the LDAP self service for changing passwords
# this route needs to be defined before the Flask Security setup # this route needs to be defined before the Flask Security setup
@door_app.route('/change', methods=['GET', 'POST']) @door_app.route("/change", methods=["GET", "POST"])
@auth_required() @auth_required()
def change_pw(): def change_pw():
if current_user.has_role('local'): if current_user.has_role("local"):
# local users can change their password # local users can change their password
return change_password() return change_password()
else: else:
# LDAP users get redirected to the LDAP self service # LDAP users get redirected to the LDAP self service
return redirect('https://ldap.imaginaerraum.de/') return redirect("https://ldap.imaginaerraum.de/")
# admin user management # admin user management
@door_app.route('/manage_admins', methods=['GET', 'POST']) @door_app.route("/manage_admins", methods=["GET", "POST"])
@roles_required('super_admin') @roles_required("super_admin")
def manage_admins(): def manage_admins():
form = AdminCreationForm() form = AdminCreationForm()
if request.method == 'GET': if request.method == "GET":
users = security.datastore.user_model.query.all() users = security.datastore.user_model.query.all()
admin_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, admin_data = [
'admin': u.has_role('admin'), 'super_admin': u.has_role('super_admin'), {
} for u in users] "username": u.username,
return render_template('admins.html', admin_data=admin_data, form=form) "email": u.email,
"active": u.is_active,
"admin": u.has_role("admin"),
"super_admin": u.has_role("super_admin"),
}
for u in users
]
return render_template("admins.html", admin_data=admin_data, form=form)
elif form.validate(): elif form.validate():
if security.datastore.find_user(username=form.name.data) is not None or \ if (
security.datastore.find_user(email=form.email.data) is not None: security.datastore.find_user(username=form.name.data) is not None
flash("Ein Benutzer mit diesem Nutzernamen oder dieser E-Mail-Adresse existiert bereits!") or security.datastore.find_user(email=form.email.data) is not None
):
flash(
"Ein Benutzer mit diesem Nutzernamen oder dieser E-Mail-Adresse"
" existiert bereits!"
)
else: else:
pw = secrets.token_urlsafe(16) pw = secrets.token_urlsafe(16)
new_user = security.datastore.create_user(username=form.name.data, email=form.email.data, new_user = security.datastore.create_user(
password=hash_password(pw)) username=form.name.data,
security.datastore.add_role_to_user(new_user, 'local') email=form.email.data,
password=hash_password(pw),
)
security.datastore.add_role_to_user(new_user, "local")
current_app.logger.info( current_app.logger.info(
f"Super admin {current_user.username} created new user account for {new_user.username} <{new_user.email}>") f"Super admin {current_user.username} created new user account "
flash(f"Ein Account für den Nutzer {new_user.username} wurde erstellt. Verwende das Passwort {pw} um den Nutzer einzuloggen.") f"for {new_user.username} <{new_user.email}>"
)
flash(
f"Ein Account für den Nutzer {new_user.username} wurde "
f"erstellt. Verwende das Passwort {pw} um den Nutzer "
f"einzuloggen."
)
db.session.commit() db.session.commit()
else: else:
flash(f"Ungültige Eingabe: {form.errors}") flash(f"Ungültige Eingabe: {form.errors}")
return redirect('/manage_admins') return redirect("/manage_admins")
@door_app.route('/delete_admins/<username>', methods=['GET', 'POST']) @door_app.route("/delete_admins/<username>", methods=["GET", "POST"])
@roles_required('super_admin') @roles_required("super_admin")
def delete_admins(username): def delete_admins(username):
user = security.datastore.find_user(username=username) user = security.datastore.find_user(username=username)
if user is None: if user is None:
flash(f"Ungültiger Nutzer {username}") flash(f"Ungültiger Nutzer {username}")
return redirect('/manage_admins') return redirect("/manage_admins")
if user.has_role('super_admin'): if user.has_role("super_admin"):
flash('Super-Admins können nicht gelöscht werden!') flash("Super-Admins können nicht gelöscht werden!")
return redirect('/manage_admins') return redirect("/manage_admins")
if user.is_active: if user.is_active:
flash('Aktive Nutzer können nicht gelöscht werden! Bitte den Benutzer zuerst deaktivieren.') flash(
return redirect('/manage_admins') "Aktive Nutzer können nicht gelöscht werden! Bitte den Benutzer "
"zuerst deaktivieren."
)
return redirect("/manage_admins")
# set up form for confirming deletion # set up form for confirming deletion
form = ConfirmDeleteForm() form = ConfirmDeleteForm()
form.name_confirm.data = username form.name_confirm.data = username
if request.method == 'GET': if request.method == "GET":
# return page asking the user to confirm delete # return page asking the user to confirm delete
return render_template('delete_user.html', username=username, form=form) return render_template("delete_user.html", username=username, form=form)
elif form.validate(): elif form.validate():
security.datastore.delete_user(user) security.datastore.delete_user(user)
flash(f"Benutzer {username} wurde gelöscht.") flash(f"Benutzer {username} wurde gelöscht.")
current_app.logger.info(f"Super admin {current_user.username} deleted admin user {username}") current_app.logger.info(
f"Super admin {current_user.username} deleted admin user {username}"
)
db.session.commit() db.session.commit()
return redirect('/manage_admins') return redirect("/manage_admins")
else: else:
flash("Der eingegebene Nutzername stimmt nicht überein. Der Benutzer wurde nicht gelöscht!") flash(
return redirect('/manage_admins') "Der eingegebene Nutzername stimmt nicht überein. Der Benutzer "
"wurde nicht gelöscht!"
)
return redirect("/manage_admins")
@door_app.route('/admin_toggle_active/<username>') @door_app.route("/admin_toggle_active/<username>")
@roles_required('super_admin') @roles_required("super_admin")
def admin_toggle_active(username): def admin_toggle_active(username):
user = security.datastore.find_user(username=username) user = security.datastore.find_user(username=username)
if user is None: if user is None:
flash(f"Ungültiger Nutzer {username}") flash(f"Ungültiger Nutzer {username}")
return redirect('/manage_admins') return redirect("/manage_admins")
if user.has_role('super_admin'): if user.has_role("super_admin"):
flash('Super-Admins können nicht deaktiviert werden!') flash("Super-Admins können nicht deaktiviert werden!")
return redirect('/manage_admins') return redirect("/manage_admins")
security.datastore.toggle_active(user) security.datastore.toggle_active(user)
if user.is_active: if user.is_active:
current_app.logger.info(f"Super admin {current_user.username} activated access for admin user {username}") current_app.logger.info(
f"Super admin {current_user.username} activated access for admin "
f"user {username}"
)
else: else:
current_app.logger.info(f"Super admin {current_user.username} deactivated access for admin user {username}") current_app.logger.info(
f"Super admin {current_user.username} deactivated access for admin "
f"user {username}"
)
db.session.commit() db.session.commit()
return redirect('/manage_admins') return redirect("/manage_admins")
@door_app.route('/promote_admin/<username>') @door_app.route("/promote_admin/<username>")
@roles_required('super_admin') @roles_required("super_admin")
def promote_admin(username): def promote_admin(username):
user = security.datastore.find_user(username=username) user = security.datastore.find_user(username=username)
if user is None: if user is None:
flash(f"Ungültiger Nutzer {username}") flash(f"Ungültiger Nutzer {username}")
return redirect('/manage_admins') return redirect("/manage_admins")
if user.has_role('admin'): if user.has_role("admin"):
flash(f'Benutzer {username} hat bereits Admin-Rechte!') flash(f"Benutzer {username} hat bereits Admin-Rechte!")
return redirect('/manage_admins') return redirect("/manage_admins")
security.datastore.add_role_to_user(user, 'admin') security.datastore.add_role_to_user(user, "admin")
current_app.logger.info(f"Super admin {current_user.username} granted admin privileges to user {username}") current_app.logger.info(
f"Super admin {current_user.username} granted admin privileges to "
f"user {username}"
)
db.session.commit() db.session.commit()
return redirect('/manage_admins') return redirect("/manage_admins")
@door_app.route('/demote_admin/<username>') @door_app.route("/demote_admin/<username>")
@roles_required('super_admin') @roles_required("super_admin")
def demote_admin(username): def demote_admin(username):
user = security.datastore.find_user(username=username) user = security.datastore.find_user(username=username)
if user is None: if user is None:
flash(f"Ungültiger Nutzer {username}") flash(f"Ungültiger Nutzer {username}")
return redirect('/manage_admins') return redirect("/manage_admins")
if user.has_role('super_admin'): if user.has_role("super_admin"):
flash(f'Benutzer {username} hat Super-Admin-Rechte und kann nicht ' flash(
'verändert werden!') f"Benutzer {username} hat Super-Admin-Rechte und kann nicht "
return redirect('/manage_admins') "verändert werden!"
if user.has_role('admin'): )
security.datastore.remove_role_from_user(user, 'admin') return redirect("/manage_admins")
current_app.logger.info(f"Super admin {current_user.username} revoked " if user.has_role("admin"):
f"admin privileges of user {username}") security.datastore.remove_role_from_user(user, "admin")
current_app.logger.info(
f"Super admin {current_user.username} revoked "
f"admin privileges of user {username}"
)
db.session.commit() db.session.commit()
else: else:
flash(f'Benutzer {username} ist bereits kein Admin!') flash(f"Benutzer {username} ist bereits kein Admin!")
return redirect('/manage_admins') return redirect("/manage_admins")
@door_app.route('/backup_user_datastore') @door_app.route("/backup_user_datastore")
@roles_required('super_admin') @roles_required("super_admin")
def backup_user_datastore(): def backup_user_datastore():
# get list of defined admin users for backup # get list of defined admin users for backup
users = security.datastore.user_model.query.all() users = security.datastore.user_model.query.all()
user_data = [{'username': u.username, 'email': u.email, user_data = [
'active': u.is_active, 'password_hash': u.password, {
'roles': [r.name for r in u.roles]} "username": u.username,
for u in users if not u.has_role('super_admin')] "email": u.email,
"active": u.is_active,
"password_hash": u.password,
"roles": [r.name for r in u.roles],
}
for u in users
if not u.has_role("super_admin")
]
try: try:
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
file = Path(tmpdir, 'user_data.txt') file = Path(tmpdir, "user_data.txt")
file.write_text(json.dumps(user_data)) file.write_text(json.dumps(user_data))
return send_file(file, as_attachment=True, cache_timeout=-1) return send_file(file, as_attachment=True, cache_timeout=-1)
except Exception as e: except Exception as e:
return str(e) return str(e)
@door_app.route('/restore_user_datastore', methods=['POST']) @door_app.route("/restore_user_datastore", methods=["POST"])
@roles_required('super_admin') @roles_required("super_admin")
def restore_user_datastore(): def restore_user_datastore():
# check if the post request has the file part # check if the post request has the file part
if 'file' not in request.files: if "file" not in request.files:
flash('Keine Datei ausgewählt!') flash("Keine Datei ausgewählt!")
return redirect(request.url) return redirect(request.url)
file = request.files['file'] file = request.files["file"]
# if user does not select file, browser also # if user does not select file, browser also
# submit an empty part without filename # submit an empty part without filename
if file.filename == '': if file.filename == "":
flash('Keine Datei ausgewählt!') flash("Keine Datei ausgewählt!")
return redirect('/manage_admins') return redirect("/manage_admins")
filename = secure_filename(file.filename) filename = secure_filename(file.filename)
if file and filename.endswith('.txt'): if file and filename.endswith(".txt"):
data = file.stream.read() data = file.stream.read()
try: try:
# check validity of user data # check validity of user data
@ -226,63 +258,97 @@ def restore_user_datastore():
valid &= all(type(d) == dict for d in user_data) valid &= all(type(d) == dict for d in user_data)
if valid: if valid:
for d in user_data: for d in user_data:
entry_valid = set(d.keys()) == { 'active', 'email', 'password_hash', 'username', 'roles'} entry_valid = set(d.keys()) == {
entry_valid &= all(len(d[key]) > 0 for key in ['email', 'password_hash', 'username']) "active",
entry_valid &= type(d['active']) == bool "email",
entry_valid &= type(d['roles']) == list "password_hash",
validate_email(d['email']) "username",
"roles",
}
entry_valid &= all(
len(d[key]) > 0
for key in ["email", "password_hash", "username"]
)
entry_valid &= type(d["active"]) == bool
entry_valid &= type(d["roles"]) == list
validate_email(d["email"])
if entry_valid: if entry_valid:
existing_user = security.datastore.find_user(username=d['username'], email=d['email']) existing_user = security.datastore.find_user(
username=d["username"], email=d["email"]
)
if existing_user is None: if existing_user is None:
security.datastore.create_user(username=d['username'], email=d['email'], security.datastore.create_user(
password=d['password_hash'], active=d['active'], username=d["username"],
roles=d['roles']) email=d["email"],
flash(f"Account für Benutzer '{d['username']} wurde wiederhergestellt.") password=d["password_hash"],
active=d["active"],
roles=d["roles"],
)
flash(
f"Account für Benutzer '{d['username']} wurde "
f"wiederhergestellt."
)
else: else:
flash(f"Benutzer '{d['username']} existiert bereits. Eintrag wird übersprungen.") flash(
f"Benutzer '{d['username']} existiert bereits."
f" Eintrag wird übersprungen."
)
else: else:
raise ValueError(f"Ungültige Daten für User Entry {d}") raise ValueError(f"Ungültige Daten für User Entry {d}")
else: else:
raise ValueError("Admin User Datei hat ungültiges Format.") raise ValueError("Admin User Datei hat ungültiges Format.")
except Exception as e: except Exception as e:
flash(f"Die Datei konnte nicht gelesen werden. Exception: {e}") flash(f"Die Datei konnte nicht gelesen werden. Exception: {e}")
return redirect('/manage_admins') return redirect("/manage_admins")
flash("Benutzer aus Datei gelesen.") flash("Benutzer aus Datei gelesen.")
db.session.commit() db.session.commit()
else: else:
flash("Ungültige Dateiendung") flash("Ungültige Dateiendung")
return redirect('/manage_admins') return redirect("/manage_admins")
# main page # main page
@door_app.route('/') @door_app.route("/")
def door_lock(): def door_lock():
return render_template('index.html', door_state=current_app.door.state, encoder_position=current_app.door.encoder_position) return render_template(
"index.html",
door_state=current_app.door.state,
encoder_position=current_app.door.encoder_position,
)
# token overview # token overview
@door_app.route('/tokens') @door_app.route("/tokens")
@roles_required('admin') @roles_required("admin")
def list_tokens(): def list_tokens():
tokens = current_app.door.get_tokens() tokens = current_app.door.get_tokens()
assigned_tokens = {t: data for t, data in tokens.items() if not data['inactive']} assigned_tokens = {t: data for t, data in tokens.items()
inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']} if not data["inactive"]}
return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens) inactive_tokens = {t: data for t, data in tokens.items()
if data["inactive"]}
return render_template(
"tokens.html",
assigned_tokens=assigned_tokens,
inactive_tokens=inactive_tokens
)
@door_app.route('/token-log') @door_app.route("/token-log")
@roles_required('super_admin') @roles_required("super_admin")
def token_log(): def token_log():
log = [] log = []
try: try:
with open(current_app.config['NFC_LOG']) as f: with open(current_app.config["NFC_LOG"]) as f:
log += f.readlines() log += f.readlines()
log.reverse() log.reverse()
log = [l.split(' - ') for l in log] log = [l.split(" - ") for l in log]
return render_template('token_log.html', log=log) return render_template("token_log.html", log=log)
except Exception as e: except Exception as e:
flash(f"NFC logfile {Path(current_app.config['NFC_LOG']).absolute()} konnte nicht gelesen werden. Exception: {e}") flash(
return redirect('/') f"NFC logfile {Path(current_app.config['NFC_LOG']).absolute()} "
f"konnte nicht gelesen werden. Exception: {e}"
)
return redirect("/")
def store_token(token_data): def store_token(token_data):
@ -291,69 +357,82 @@ def store_token(token_data):
This will use the token id and the associated data and create/modify a This will use the token id and the associated data and create/modify a
token and store the new token file to disk. token and store the new token file to disk.
""" """
token = token_data['token'] token = token_data["token"]
tokens = current_app.door.get_tokens() tokens = current_app.door.get_tokens()
tokens[token] = {'name': token_data['name'], tokens[token] = {
'email': token_data['email'], "name": token_data["name"],
'valid_thru': token_data['valid_thru'], "email": token_data["email"],
'inactive': token_data['inactive'], "valid_thru": token_data["valid_thru"],
'organization': token_data['organization']} "inactive": token_data["inactive"],
"organization": token_data["organization"],
}
try: try:
current_app.door.store_tokens(tokens) current_app.door.store_tokens(tokens)
current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}") current_app.logger.info(
f"Token {token} stored in database by admin user "
f"{current_user.username}"
)
except Exception as e: except Exception as e:
flash(f"Error during store_tokens. Exception: {e}") flash(f"Error during store_tokens. Exception: {e}")
# routes for registering, editing and deleting tokens # routes for registering, editing and deleting tokens
@door_app.route('/register-token', methods=['GET', 'POST']) @door_app.route("/register-token", methods=["GET", "POST"])
@roles_required('admin') @roles_required("admin")
def register(): def register():
"""Register new token for locking and unlocking the door. """Register new token for locking and unlocking the door.
This route displays the most recently scanned invalid token as reported in the logfile and provides a form for This route displays the most recently scanned invalid token as reported in
entering user info (name, email, valid thru date (optional)) for the new token. the logfile and provides a form for entering user info (name, email, valid
thru date (optional)) for the new token.
If the route is called via POST the provided form data is checked and if the check succeeds the /store-token route If the route is called via POST the provided form data is checked and if
will be called which adds the new token to the database. the check succeeds the /store-token route will be called which adds the new
token to the database.
""" """
token = current_app.door.get_most_recent_token() token = current_app.door.get_most_recent_token()
recent_token = {} recent_token = {}
if {'token', 'timestamp'}.issubset(set(token.keys())): if {"token", "timestamp"}.issubset(set(token.keys())):
dt = datetime.utcnow() - token['timestamp'] dt = datetime.utcnow() - token["timestamp"]
if dt < timedelta(minutes=10): if dt < timedelta(minutes=10):
recent_token = token recent_token = token
recent_token['timedelta_minutes'] = int(dt.total_seconds() / 60.0) recent_token["timedelta_minutes"] = int(dt.total_seconds() / 60.0)
form = TokenForm() form = TokenForm()
if request.method == 'GET': if request.method == "GET":
# set default valid thru date to today to make sure form validity check passes # set default valid thru date to today to make sure form validity check
# passes
# (will not be used if limited validity is disabled) # (will not be used if limited validity is disabled)
form.valid_thru.data = date.today() form.valid_thru.data = date.today()
elif request.method == 'POST' and form.validate(): elif request.method == "POST" and form.validate():
token_data = { token_data = {
'token': current_app.door.get_most_recent_token()['token'], "token": current_app.door.get_most_recent_token()["token"],
'name': form.name.data, 'email': form.email.data, "name": form.name.data,
'organization': form.organization.data, "email": form.email.data,
'inactive': not form.active.data, "organization": form.organization.data,
'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else '' "inactive": not form.active.data,
"valid_thru": form.valid_thru.data.isoformat()
if form.limit_validity.data
else "",
} }
store_token(token_data) store_token(token_data)
return redirect('/tokens') return redirect("/tokens")
else: else:
flash(f'Token konnte nicht registiert werden. Fehler: {form.errors}') flash(f"Token konnte nicht registiert werden. Fehler: {form.errors}")
return render_template('register.html', token=recent_token, form=form) return render_template("register.html", token=recent_token, form=form)
@door_app.route('/edit-token/<token>', methods=['GET', 'POST']) @door_app.route("/edit-token/<token>", methods=["GET", "POST"])
@roles_required('admin') @roles_required("admin")
def edit_token(token): def edit_token(token):
"""Edit data in the token file (name, email, valid_thru date, active/inactive). """Edit data in the token file (name, email, valid_thru date, active).
If the route is accessed via GET it will provide a form for editing the currently stored data for the user. If the route is accessed via GET it will provide a form for editing the
If the route is accessed via POST it will check if the form data is good and then store the modified user data in currently stored data for the user.
the database (by redirecting to the /store-token route) If the route is accessed via POST it will check if the form data is good
and then store the modified user data in the database (by redirecting to
the /store-token route)
Parameters Parameters
---------- ----------
@ -361,51 +440,60 @@ def edit_token(token):
The token for which data should be edited. The token for which data should be edited.
""" """
form = TokenForm(request.form) form = TokenForm(request.form)
form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed form.dsgvo.validators = (
# to it before []
if request.method == 'GET': )
# we skip the validation of the DSGVO checkbox here because we assume
# the user agreed to it before
if request.method == "GET":
tokens = current_app.door.get_tokens() tokens = current_app.door.get_tokens()
if token in tokens: if token in tokens:
# set default for form according to values from the token file # set default for form according to values from the token file
et = tokens[token] et = tokens[token]
form.active.data = not et['inactive'] form.active.data = not et["inactive"]
form.name.data = et['name'] if et['name'] else '' form.name.data = et["name"] if et["name"] else ""
form.email.data = et['email'] if et['email'] else '' form.email.data = et["email"] if et["email"] else ""
form.organization.data = et['organization'] if et['organization'] else '' form.organization.data = et["organization"] \
if et["organization"] else ""
# for the valid thru date we use today's date in case there is not valid date in the database # for the valid thru date we use today's date in case there is not
# valid date in the database
try: try:
form.valid_thru.data = date.fromisoformat(et['valid_thru']) form.valid_thru.data = date.fromisoformat(et["valid_thru"])
form.limit_validity.data = True form.limit_validity.data = True
except Exception: except Exception:
form.valid_thru.data = date.today() form.valid_thru.data = date.today()
return render_template('edit.html', token=token, form=form) return render_template("edit.html", token=token, form=form)
else: else:
# flash an error message if the route is accessed with an invalid token # flash an error message if the route is accessed with an invalid
flash(f'Ungültiger Token {token}!') # token
return redirect('/tokens') flash(f"Ungültiger Token {token}!")
elif request.method == 'POST': return redirect("/tokens")
elif request.method == "POST":
if form.validate(): if form.validate():
# store data in token_data cookie # store data in token_data cookie
token_data = {'token': token, token_data = {
'name': form.name.data, "token": token,
'organization': form.organization.data, "name": form.name.data,
'email': form.email.data, "organization": form.organization.data,
'inactive': not form.active.data, "email": form.email.data,
'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else '' "inactive": not form.active.data,
} "valid_thru": form.valid_thru.data.isoformat()
if form.limit_validity.data
else "",
}
store_token(token_data) store_token(token_data)
return redirect('/tokens') return redirect("/tokens")
else: else:
flash(f'Token konnte nicht editiert werden. Fehler: {form.errors}') flash(f"Token konnte nicht editiert werden. Fehler: {form.errors}")
return render_template('edit.html', token=token, form=form) return render_template("edit.html", token=token, form=form)
@door_app.route('/delete-token/<token>', methods=['GET', 'POST']) @door_app.route("/delete-token/<token>", methods=["GET", "POST"])
@roles_required('admin') @roles_required("admin")
def delete_token(token): def delete_token(token):
"""Delete the given token from the token file and store the new token file to disk """Delete the given token from the token file and store the new token file.
Parameters Parameters
---------- ----------
@ -415,29 +503,31 @@ def delete_token(token):
tokens = current_app.door.get_tokens() tokens = current_app.door.get_tokens()
if token not in tokens: if token not in tokens:
flash(f'Ungültiger Token {token} für Löschung.') flash(f"Ungültiger Token {token} für Löschung.")
return redirect('/tokens') return redirect("/tokens")
token_to_delete = tokens[token] token_to_delete = tokens[token]
# set up form for confirming deletion # set up form for confirming deletion
form = ConfirmDeleteForm() form = ConfirmDeleteForm()
form.name_confirm.data = token_to_delete['name'] form.name_confirm.data = token_to_delete["name"]
if request.method == 'GET': if request.method == "GET":
# return page asking the user to confirm delete # return page asking the user to confirm delete
return render_template('delete.html', token=token_to_delete, form=form) return render_template("delete.html", token=token_to_delete, form=form)
elif form.validate(): elif form.validate():
# form validation successful -> can delete the token # form validation successful -> can delete the token
tokens.pop(token) tokens.pop(token)
try: try:
current_app.door.store_tokens(tokens) current_app.door.store_tokens(tokens)
current_app.logger.info(f"Token {token} was deleted from database " current_app.logger.info(
f"by admin user {current_user.username}") f"Token {token} was deleted from database "
f"by admin user {current_user.username}"
)
except Exception as e: except Exception as e:
flash(f"Error during store_tokens. Exception: {e}") flash(f"Error during store_tokens. Exception: {e}")
flash(f"Token {token} wurde gelöscht!") flash(f"Token {token} wurde gelöscht!")
return redirect('/tokens') return redirect("/tokens")
else: else:
# form validation failed -> return to token overview and flash message # form validation failed -> return to token overview and flash message
flash( flash(
@ -445,13 +535,15 @@ def delete_token(token):
f"Der Token {token} von {token_to_delete['name']} wurde nicht " f"Der Token {token} von {token_to_delete['name']} wurde nicht "
"gelöscht." "gelöscht."
) )
return redirect('/tokens') return redirect("/tokens")
@door_app.route('/deactivate-token/<token>') @door_app.route("/deactivate-token/<token>")
@roles_required('admin') @roles_required("admin")
def deactivate_token(token): def deactivate_token(token):
"""Deactivate access for the given token. This updates the token file on disk. """Deactivate access for the given token.
This updates the token file on disk.
Parameters Parameters
---------- ----------
@ -461,50 +553,54 @@ def deactivate_token(token):
tokens = current_app.door.get_tokens() tokens = current_app.door.get_tokens()
if token not in tokens: if token not in tokens:
flash(f'Ungültiger Token {token} für Deaktivierung.') flash(f"Ungültiger Token {token} für Deaktivierung.")
return redirect('/tokens') return redirect("/tokens")
tokens[token]['inactive'] = True tokens[token]["inactive"] = True
try: try:
current_app.door.store_tokens(tokens) current_app.door.store_tokens(tokens)
current_app.logger.info(f"Token {token} deactivated by admin user {current_user.username}") current_app.logger.info(
f"Token {token} deactivated by admin user {current_user.username}"
)
except Exception as e: except Exception as e:
flash(f"Error during store_tokens. Exception: {e}") flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens') return redirect("/tokens")
@door_app.route('/backup_tokens') @door_app.route("/backup_tokens")
@roles_required('admin') @roles_required("admin")
def backup_tokens(): def backup_tokens():
# get list of defined admin users for backup # get list of defined admin users for backup
tokens = current_app.door.get_tokens() tokens = current_app.door.get_tokens()
try: try:
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
file = Path(tmpdir, 'token_data.txt') file = Path(tmpdir, "token_data.txt")
file.write_text(json.dumps(tokens)) file.write_text(json.dumps(tokens))
return send_file(file, as_attachment=True, cache_timeout=-1) return send_file(file, as_attachment=True, cache_timeout=-1)
except Exception as e: except Exception as e:
return str(e) return str(e)
@door_app.route('/open') @door_app.route("/open")
@auth_required() @auth_required()
def open_door(): def open_door():
try: try:
current_app.door.open_door(user=current_user.username) current_app.door.open_door(user=current_user.username)
current_app.logger.info(f"Door opened by admin user {current_user.username}") current_app.logger.info(f"Door opened by admin user "
f"{current_user.username}")
except Exception as e: except Exception as e:
flash(f'Could not open door. Exception: {e}') flash(f"Could not open door. Exception: {e}")
return redirect('/') return redirect("/")
# routes for opening and closing the door via the web interface # routes for opening and closing the door via the web interface
@door_app.route('/close') @door_app.route("/close")
@auth_required() @auth_required()
def close_door(): def close_door():
try: try:
current_app.door.close_door(user=current_user.username) current_app.door.close_door(user=current_user.username)
current_app.logger.info(f"Door closed by admin user {current_user.username}") current_app.logger.info(f"Door closed by admin user "
f"{current_user.username}")
except Exception as e: except Exception as e:
flash(f'Could not close door. Exception: {e}') flash(f"Could not close door. Exception: {e}")
return redirect('/') return redirect("/")

154
tests/test_door_handle.py Normal file
View File

@ -0,0 +1,154 @@
import datetime
import time
import pytest
from pathlib import Path
from imaginaerraum_door_admin.door_handle import DoorHandle
import paho.mqtt.client as mqtt
@pytest.fixture
def nfc_socket(tmp_path):
# mock up a Unix socket server for testing
import threading
import socketserver
class ThreadedRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
data = str(self.request.recv(1024), 'ascii')
cur_thread = threading.current_thread()
response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
try:
self.request.sendall(response)
except BrokenPipeError:
pass
class TreadedUnixServer(socketserver.ThreadingMixIn,
socketserver.UnixStreamServer):
pass
nfc_socket_file = tmp_path / 'nfc.sock'
server = TreadedUnixServer(str(nfc_socket_file), ThreadedRequestHandler)
with server:
nfc_socket = server.server_address
# Start a thread with the server -- that thread will then start one
# more thread for each request
server_thread = threading.Thread(target=server.serve_forever)
# Exit the server thread when the main thread terminates
server_thread.daemon = True
server_thread.start()
print("Server loop running in thread:", server_thread.name)
yield nfc_socket
server.shutdown()
def test_create_door_handle(nfc_socket, tmp_path):
# test with invalid token file
with pytest.raises(FileNotFoundError):
door_handle = DoorHandle(token_file='no_such_file',
mqtt_host='test.mosquitto.org')
token_file = tmp_path / 'door_tokens'
token_file.write_text('')
door_handle = DoorHandle(
token_file=token_file, mqtt_host='test.mosquitto.org',
nfc_socket=nfc_socket
)
door_handle.nfc_sock.close()
@pytest.fixture
def door_handle(nfc_socket, tmp_path):
token_file = tmp_path / 'door_tokens'
token_file.write_text(
"""# token | name | organization | email | valid_thru
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
"""
)
door_handle = DoorHandle(
token_file=token_file, mqtt_host='test.mosquitto.org',
nfc_socket=nfc_socket
)
yield door_handle
door_handle.nfc_sock.close()
@pytest.fixture
def door_handle_no_nfc(tmp_path):
token_file = tmp_path / 'door_tokens'
token_file.write_text('')
door_handle = DoorHandle(
token_file=token_file, mqtt_host='test.mosquitto.org',
)
yield door_handle
def test_store_tokens(door_handle):
new_tokens = {
'042979fa186280': {
'name': 'Pippin',
'organization': 'Hobbits',
'email': 'pippin@shire.me',
'valid_thru': None,
'inactive': False
}
}
door_handle.store_tokens(new_tokens)
token_file = Path(door_handle.token_file)
assert token_file.exists()
assert '042979fa186280' in token_file.read_text()
def test_mqtt_messages(door_handle):
# test sending messages to the door_handle via MQTT and see if the state
# gets updated accordingly
client = mqtt.Client()
con = client.connect('test.mosquitto.org', 1883)
client.publish('door/position/value', 10).wait_for_publish(1)
time.sleep(1)
assert door_handle.encoder_position == 10
client.publish('door/state/value', 'open').wait_for_publish(1)
time.sleep(1)
assert door_handle.state == 'open'
timestamp = datetime.datetime.now().replace(microsecond=0)
token = '042979fa186280'
client.publish(
'door/token/last_invalid',
f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')};{token}"
).wait_for_publish(1)
time.sleep(1)
most_recent_token = door_handle.get_most_recent_token()
assert most_recent_token['timestamp'] == timestamp
assert most_recent_token['token'] == token
client.disconnect()
def test_open_door(door_handle):
door_handle.open_door()
def test_open_door_broken_socket(door_handle_no_nfc):
# test broken nfc_socket connection
with pytest.raises(RuntimeError):
door_handle_no_nfc.open_door()
def test_close_door(door_handle):
door_handle.close_door()
def test_close_door_broken_socket(door_handle_no_nfc):
# test broken nfc_socket connection
with pytest.raises(RuntimeError):
door_handle_no_nfc.close_door()

View File

@ -18,7 +18,7 @@ def test_login(browser, live_server):
response = browser.get(f'http://localhost:{live_server.port}/login') response = browser.get(f'http://localhost:{live_server.port}/login')
email_form = browser.find_element('id', 'email').send_keys('gandalf@shire.me') email_form = browser.find_element('id', 'email').send_keys('gandalf')
password_form = browser.find_element('id', 'password').send_keys('shadowfax') password_form = browser.find_element('id', 'password').send_keys('shadowfax')
submit_button = browser.find_element('id', 'submit').click() submit_button = browser.find_element('id', 'submit').click()