Compare commits
No commits in common. "6d9e90631a6dbf323ebef8151b0d271726d29405" and "1320fc55ca05c2738c02f2346391c9fc007dcdf3" have entirely different histories.
6d9e90631a
...
1320fc55ca
72
README.md
72
README.md
|
@ -1,77 +1,7 @@
|
||||||
Flask-based web interface for user token adminstration of our hackerspace's door lock.
|
Flask-based web interface for user token adminstration of our hackerspace's door lock.
|
||||||
|
|
||||||
## Installation
|
|
||||||
Clone the repo
|
|
||||||
```shell
|
|
||||||
git clone <path_to_repo>
|
|
||||||
```
|
|
||||||
Install using pip
|
|
||||||
```shell
|
|
||||||
pip install .
|
|
||||||
```
|
|
||||||
|
|
||||||
## Running the app
|
# Development
|
||||||
```shell
|
|
||||||
export FLASK_APP=imaginaerraum_door_admin
|
|
||||||
flask run
|
|
||||||
```
|
|
||||||
|
|
||||||
## Configuration
|
|
||||||
You can set custom configuration options by defining an environment variable
|
|
||||||
``APPLICATION_SETTINGS`` pointing to a file with configuration options.
|
|
||||||
|
|
||||||
For example, consider the following configuration file ``app_config.py``:
|
|
||||||
```
|
|
||||||
# door app configuration
|
|
||||||
SECRET_KEY = 'mysupersecretkey'
|
|
||||||
SECURITY_PASSWORD_SALT = 'saltycaramel'
|
|
||||||
|
|
||||||
TESTING = False
|
|
||||||
DEBUG = False
|
|
||||||
|
|
||||||
TOKEN_FILE = 'door_tokens.txt'
|
|
||||||
ADMIN_FILE = 'admins.txt'
|
|
||||||
|
|
||||||
NFC_SOCKET = "/tmp/nfc.sock"
|
|
||||||
NFC_LOG = "nfc.log"
|
|
||||||
```
|
|
||||||
To instruct the flask app to use this configuration, use the following commands:
|
|
||||||
```shell
|
|
||||||
export APPLICATION_SETTINGS=app_config.py
|
|
||||||
flask run
|
|
||||||
```
|
|
||||||
|
|
||||||
Below, you can find a list of configuration options.
|
|
||||||
|
|
||||||
### Flask app configuration
|
|
||||||
You can override common Flask configuration variables. In particular, you
|
|
||||||
definitely should set custom values for the ``SECRET_KEY`` and
|
|
||||||
``SECURITY_PASSWORD_SALT``.
|
|
||||||
|
|
||||||
### Token file
|
|
||||||
The token file is an ASCII file which list the IDs of RFID tokens that can be
|
|
||||||
used to unlock the door. You can specify the path to the token file using the
|
|
||||||
``TOKEN_FILE`` variable in the configuration file.
|
|
||||||
Here's an example of a token file (lines starting with ``#`` represent inactive
|
|
||||||
tokens):
|
|
||||||
```
|
|
||||||
# token | name | organization | email | valid_thru
|
|
||||||
#042979fa186280||||
|
|
||||||
04487cfa176280|Frodo|Hobbits|frodo@shire.me|
|
|
||||||
043a85fa1a6280|Gandalf|Wizards|gandalf@middleearth.me|
|
|
||||||
#04206e2aef6880|Boromir|Humans|boromir@gondor.me|
|
|
||||||
```
|
|
||||||
|
|
||||||
### Admin file
|
|
||||||
``ADMIN_FILE`` -> file to create new super admins
|
|
||||||
|
|
||||||
### User database
|
|
||||||
|
|
||||||
### NFC files
|
|
||||||
``NFC_SOCKET = "/tmp/nfc.sock"`` -> unix socket to interact with the door
|
|
||||||
``NFC_LOG = "nfc.log"`` -> log file of door events
|
|
||||||
|
|
||||||
## Development
|
|
||||||
```shell
|
```shell
|
||||||
cd tests
|
cd tests
|
||||||
export APPLICATION_SETTINGS=/home/simon/imaginaerraum/door-lock/webinterface/tests/debug_app_config.py
|
export APPLICATION_SETTINGS=/home/simon/imaginaerraum/door-lock/webinterface/tests/debug_app_config.py
|
||||||
|
|
|
@ -5,6 +5,7 @@ from flask_security import Security, SQLAlchemyUserDatastore, hash_password
|
||||||
from email_validator import validate_email
|
from email_validator import validate_email
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
#from .webapp import door_app
|
||||||
from .door_handle import DoorHandle
|
from .door_handle import DoorHandle
|
||||||
|
|
||||||
security = Security()
|
security = Security()
|
||||||
|
@ -12,85 +13,128 @@ db = SQLAlchemy()
|
||||||
|
|
||||||
|
|
||||||
# create admin users (only if they don't exists already)
|
# create admin users (only if they don't exists already)
|
||||||
def create_super_admins(app, user_datastore):
|
def create_super_admins(app, db, user_datastore, logger):
|
||||||
admin_file = Path(app.config.get('ADMIN_FILE'))
|
|
||||||
|
|
||||||
# setup user database when starting the app
|
# setup user database when starting the app
|
||||||
new_admin_data = []
|
with app.app_context():
|
||||||
if not admin_file.exists():
|
new_admin_data = []
|
||||||
app.logger.warning(
|
if app.config['ADMIN_FILE'] is not None:
|
||||||
f"Admin user creation file not found at path "
|
if not Path(app.config['ADMIN_FILE']).exists():
|
||||||
f"{admin_file.absolute()}."
|
logger.warning(
|
||||||
f"No super admins have been created in the datastore."
|
f"Admin user creation file not found at {app.config['ADMIN_FILE']}")
|
||||||
)
|
else:
|
||||||
|
# store data for new admins in memory s.t. the file can be deleted afterwards
|
||||||
|
with open(app.config['ADMIN_FILE']) as f:
|
||||||
|
for i, line in enumerate(f.readlines()):
|
||||||
|
if not line.strip().startswith('#'):
|
||||||
|
try:
|
||||||
|
user, email, pw = line.split()
|
||||||
|
validate_email(email)
|
||||||
|
new_admin_data.append(
|
||||||
|
{'username': user, 'email': email,
|
||||||
|
'password': pw})
|
||||||
|
except Exception as e:
|
||||||
|
print(
|
||||||
|
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
|
||||||
|
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
|
||||||
|
|
||||||
|
db.create_all()
|
||||||
|
super_admin_role = user_datastore.find_or_create_role(
|
||||||
|
'super_admin') # root admin = can create other admins
|
||||||
|
admin_role = user_datastore.find_or_create_role(
|
||||||
|
'admin') # 'normal' admin
|
||||||
|
local_role = user_datastore.find_or_create_role(
|
||||||
|
'local') # LDAP user or local user
|
||||||
|
|
||||||
|
for d in new_admin_data:
|
||||||
|
if user_datastore.find_user(email=d['email'],
|
||||||
|
username=d['username']) is None:
|
||||||
|
roles = [super_admin_role, admin_role]
|
||||||
|
if not d['password'] == 'LDAP':
|
||||||
|
roles.append(local_role)
|
||||||
|
logger.info(
|
||||||
|
f"New super admin user created with username '{d['username']}' and email '{d['email']}', roles = {[r.name for r in roles]}")
|
||||||
|
|
||||||
|
# create new admin (only if admin does not already exist)
|
||||||
|
new_admin = user_datastore.create_user(email=d['email'],
|
||||||
|
username=d[
|
||||||
|
'username'],
|
||||||
|
password=hash_password(
|
||||||
|
d['password']),
|
||||||
|
roles=roles)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
def setup_logging(app):
|
||||||
|
# set up logging for the web app
|
||||||
|
logger = logging.getLogger('webapp')
|
||||||
|
logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
if app.config['LOG_FILE'] is not None:
|
||||||
|
ch = logging.FileHandler(app.config['LOG_FILE'])
|
||||||
|
ch.setLevel(logging.INFO)
|
||||||
else:
|
else:
|
||||||
# store data for new admins in memory s.t. the file can be deleted
|
# create console handler and set level to debug
|
||||||
# afterwards
|
ch = logging.StreamHandler()
|
||||||
admin_data = admin_file.read_text().split('\n')
|
ch.setLevel(logging.DEBUG)
|
||||||
for i, line in enumerate(admin_data):
|
|
||||||
if not line.strip().startswith('#'):
|
|
||||||
try:
|
|
||||||
user, email, pw = line.split()
|
|
||||||
validate_email(email)
|
|
||||||
new_admin_data.append(
|
|
||||||
{'username': user, 'email': email,
|
|
||||||
'password': pw})
|
|
||||||
except Exception as e:
|
|
||||||
app.logger.error(
|
|
||||||
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
|
|
||||||
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created."
|
|
||||||
)
|
|
||||||
|
|
||||||
with app.app_context():
|
# create formatter
|
||||||
db.create_all()
|
formatter = logging.Formatter(
|
||||||
super_admin_role = user_datastore.find_or_create_role(
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||||
'super_admin') # root admin = can create other admins
|
# add formatter to ch
|
||||||
admin_role = user_datastore.find_or_create_role(
|
ch.setFormatter(formatter)
|
||||||
'admin') # 'normal' admin
|
# add ch to logger
|
||||||
local_role = user_datastore.find_or_create_role(
|
logger.addHandler(ch)
|
||||||
'local') # LDAP user or local user
|
|
||||||
|
|
||||||
for d in new_admin_data:
|
|
||||||
if user_datastore.find_user(email=d['email'],
|
|
||||||
username=d['username']) is None:
|
|
||||||
roles = [super_admin_role, admin_role]
|
|
||||||
if not d['password'] == 'LDAP':
|
|
||||||
roles.append(local_role)
|
|
||||||
|
|
||||||
# create new admin (only if admin does not already exist)
|
|
||||||
new_admin = user_datastore.create_user(
|
|
||||||
email=d['email'], username=d['username'],
|
|
||||||
password=hash_password(d['password']), roles=roles
|
|
||||||
)
|
|
||||||
app.logger.info(
|
|
||||||
f"New super admin user created with username "
|
|
||||||
f"'{new_admin.username}' and email '{new_admin.email}'"
|
|
||||||
f", roles = {[r.name for r in new_admin.roles]}"
|
|
||||||
)
|
|
||||||
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
|
return logger
|
||||||
|
|
||||||
def create_app():
|
def create_app():
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
app.config.from_object('imaginaerraum_door_admin.default_app_config.DefaultConfig')
|
app.config.from_object('imaginaerraum_door_admin.default_app_config.DefaultConfig')
|
||||||
app.config.from_envvar('APPLICATION_SETTINGS', silent=True)
|
app.config.from_envvar('APPLICATION_SETTINGS')
|
||||||
|
|
||||||
|
logger = setup_logging(app)
|
||||||
|
|
||||||
token_file = Path(app.config.get('TOKEN_FILE'))
|
# do some checks for file existence etc.
|
||||||
if not token_file.exists():
|
try:
|
||||||
app.logger.warning(
|
with open(app.config['KEY_FILE']) as f:
|
||||||
f"Token file not found at {token_file.absolute()}. "
|
data = f.readlines()
|
||||||
"An empty token file will be created."
|
if 'SECRET_KEY' in data[0]:
|
||||||
)
|
secret_key = data[0].split()[-1]
|
||||||
token_file.touch()
|
else:
|
||||||
|
raise Exception("Could not read SECURITY_PASSWORD_SALT")
|
||||||
|
if 'SECURITY_PASSWORD_SALT' in data[1]:
|
||||||
|
security_password_salt = data[1].split()[-1]
|
||||||
|
else:
|
||||||
|
raise Exception("Could not read SECURITY_PASSWORD_SALT")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
f"Flask keys could not be read from file at {Path(app.config['KEY_FILE']).absolute()}. Exception: {e}. Using default values instead.")
|
||||||
|
secret_key = 'Q7PJu2fg2jabYwP-Psop6c6f2G4'
|
||||||
|
security_password_salt = '10036796768252925167749545152988277953'
|
||||||
|
|
||||||
# create door objects which provides access to the token file and current
|
if Path(app.config['TEMPLATE_FOLDER']).is_absolute():
|
||||||
# door state via MQTT
|
if not Path(app.config['TEMPLATE_FOLDER']).exists():
|
||||||
app.door = DoorHandle(
|
logger.error(
|
||||||
token_file=token_file, mqtt_host=app.config['MQTT_HOST'],
|
f"Flask template folder not found at {Path(app.config['TEMPLATE_FOLDER']).absolute()}")
|
||||||
nfc_socket=app.config['NFC_SOCKET'], logger=app.logger
|
else:
|
||||||
)
|
if not (Path(__file__).parent / app.config['TEMPLATE_FOLDER']).exists():
|
||||||
|
logger.error(
|
||||||
|
f"Flask template folder not found at {(Path(__file__).parent / app.config['TEMPLATE_FOLDER']).absolute()}")
|
||||||
|
if Path(app.config['STATIC_FOLDER']).is_absolute():
|
||||||
|
if not Path(app.config['STATIC_FOLDER']).exists():
|
||||||
|
logger.error(
|
||||||
|
f"Flask static folder not found at {Path(app.config['STATIC_FOLDER']).absolute()}")
|
||||||
|
else:
|
||||||
|
if not (Path(__file__).parent / app.config['STATIC_FOLDER']).exists():
|
||||||
|
logger.error(
|
||||||
|
f"Flask static folder not found at {(Path(__file__).parent / app.config['STATIC_FOLDER']).absolute()}")
|
||||||
|
if not Path(app.config['TOKEN_FILE']).exists():
|
||||||
|
logger.warning(
|
||||||
|
f"Token file not found at {Path(app.config['TOKEN_FILE']).absolute()}")
|
||||||
|
|
||||||
|
# create door objects which provides access to the token file and current door state via MQTT
|
||||||
|
app.door = DoorHandle(token_file=app.config['TOKEN_FILE'], mqtt_host=app.config['MQTT_HOST'],
|
||||||
|
nfc_socket=app.config['NFC_SOCKET'],
|
||||||
|
logger=logger)
|
||||||
|
|
||||||
# Mail Config
|
# Mail Config
|
||||||
#mail = Mail(app)
|
#mail = Mail(app)
|
||||||
|
@ -107,6 +151,6 @@ def create_app():
|
||||||
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
||||||
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
|
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
|
||||||
|
|
||||||
create_super_admins(app, user_datastore)
|
create_super_admins(app, db, user_datastore, logger)
|
||||||
|
|
||||||
return app
|
return app
|
|
@ -10,7 +10,9 @@ class DefaultConfig(object):
|
||||||
DEBUG = False
|
DEBUG = False
|
||||||
|
|
||||||
SECRET_KEY = 'supersecret'
|
SECRET_KEY = 'supersecret'
|
||||||
SECURITY_PASSWORD_SALT = 'salty'
|
|
||||||
|
TEMPLATE_FOLDER = 'templates'
|
||||||
|
STATIC_FOLDER = 'static'
|
||||||
|
|
||||||
SECURITY_REGISTERABLE = False
|
SECURITY_REGISTERABLE = False
|
||||||
SECURITY_CHANGEABLE = True
|
SECURITY_CHANGEABLE = True
|
||||||
|
@ -45,8 +47,7 @@ class DefaultConfig(object):
|
||||||
}
|
}
|
||||||
|
|
||||||
KEY_FILE = '/root/flask_keys'
|
KEY_FILE = '/root/flask_keys'
|
||||||
TOKEN_FILE = "door_tokens"
|
TOKEN_FILE = "/etc/door_tokens"
|
||||||
ADMIN_FILE = 'admins'
|
|
||||||
|
|
||||||
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
|
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
|
||||||
LDAP_USER_GROUP = 'Users'
|
LDAP_USER_GROUP = 'Users'
|
||||||
|
@ -54,6 +55,6 @@ class DefaultConfig(object):
|
||||||
LDAP_DOMAIN_EXT = 'de'
|
LDAP_DOMAIN_EXT = 'de'
|
||||||
|
|
||||||
NFC_SOCKET = "/tmp/nfc.sock"
|
NFC_SOCKET = "/tmp/nfc.sock"
|
||||||
LOG_FILE = "webinterface.log"
|
LOG_FILE = "/var/log/webinterface.log"
|
||||||
NFC_LOG = "nfc.log"
|
NFC_LOG = "/var/log/nfc.log"
|
||||||
MQTT_HOST = '10.10.21.2'
|
MQTT_HOST = '10.10.21.2'
|
|
@ -4,22 +4,23 @@ from pathlib import Path
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
class DoorHandle:
|
class DoorHandle:
|
||||||
def __init__(self, token_file, mqtt_host, mqtt_port=1883,
|
def __init__(self, token_file, mqtt_host, mqtt_port=1883, nfc_socket='/tmp/nfc.sock', logger=None):
|
||||||
nfc_socket='/tmp/nfc.sock', logger=None):
|
|
||||||
self.state = None
|
self.state = None
|
||||||
self.encoder_position = None
|
self.encoder_position = None
|
||||||
|
|
||||||
if not Path(token_file).exists():
|
if not Path(token_file).exists():
|
||||||
raise FileNotFoundError(
|
raise FileNotFoundError(f"File with door tokens could not be found at {Path(token_file).absolute()}")
|
||||||
"File with door tokens could not be found at "
|
|
||||||
f"{Path(token_file).absolute()}"
|
|
||||||
)
|
|
||||||
|
|
||||||
self.token_file = token_file
|
self.token_file = token_file
|
||||||
self.last_invalid = {}
|
self.last_invalid = {}
|
||||||
|
|
||||||
|
self.mqtt_client = mqtt.Client()
|
||||||
|
self.mqtt_client.on_connect = self.on_connect
|
||||||
|
self.mqtt_client.on_message = self.on_message
|
||||||
|
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
|
||||||
|
self.mqtt_client.loop_start()
|
||||||
|
|
||||||
if logger:
|
if logger:
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
else:
|
else:
|
||||||
|
@ -30,31 +31,19 @@ class DoorHandle:
|
||||||
self.nfc_sock.connect(nfc_socket)
|
self.nfc_sock.connect(nfc_socket)
|
||||||
self.logger.info(f"Connected to NFC socket at {nfc_socket}.")
|
self.logger.info(f"Connected to NFC socket at {nfc_socket}.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Could not connect to NFC socket at {nfc_socket}. "
|
print(f"Could not connect to NFC socket at {nfc_socket}. Exception: {e}")
|
||||||
f"Exception: {e}")
|
|
||||||
self.nfc_sock = None
|
self.nfc_sock = None
|
||||||
#raise
|
#raise
|
||||||
|
|
||||||
self.mqtt_client = mqtt.Client()
|
|
||||||
self.mqtt_client.on_connect = self.on_connect
|
|
||||||
self.mqtt_client.on_message = self.on_message
|
|
||||||
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
|
|
||||||
self.mqtt_client.loop_start()
|
|
||||||
|
|
||||||
self.data_fields = ['name', 'organization', 'email', 'valid_thru']
|
self.data_fields = ['name', 'organization', 'email', 'valid_thru']
|
||||||
|
|
||||||
|
|
||||||
pass
|
|
||||||
|
|
||||||
# The callback for when the client receives a CONNACK response from the server.
|
# The callback for when the client receives a CONNACK response from the server.
|
||||||
def on_connect(self, client, userdata, flags, rc):
|
def on_connect(self, client, userdata, flags, rc):
|
||||||
self.logger.info("Connected to MQTT broker with result code " + str(rc))
|
self.logger.info("Connected to MQTT broker with result code " + str(rc))
|
||||||
|
|
||||||
# Subscribing in on_connect() means that if we lose the connection and
|
# Subscribing in on_connect() means that if we lose the connection and
|
||||||
# reconnect then subscriptions will be renewed.
|
# reconnect then subscriptions will be renewed.
|
||||||
client.subscribe("door/#")
|
client.subscribe("#")
|
||||||
|
|
||||||
pass
|
|
||||||
|
|
||||||
# The callback for when a PUBLISH message is received from the server.
|
# The callback for when a PUBLISH message is received from the server.
|
||||||
def on_message(self, client, userdata, msg):
|
def on_message(self, client, userdata, msg):
|
||||||
|
@ -109,13 +98,13 @@ class DoorHandle:
|
||||||
if self.nfc_sock is not None:
|
if self.nfc_sock is not None:
|
||||||
self.nfc_sock.send(b'open ' + user.encode() + b'\n')
|
self.nfc_sock.send(b'open ' + user.encode() + b'\n')
|
||||||
else:
|
else:
|
||||||
raise RuntimeError("No connection to NFC socket. Cannot close door!")
|
raise Exception("No connection to NFC socket. Cannot close door!")
|
||||||
|
|
||||||
def close_door(self, user=''):
|
def close_door(self, user=''):
|
||||||
if self.nfc_sock is not None:
|
if self.nfc_sock is not None:
|
||||||
self.nfc_sock.send(b'close ' + user.encode() + b'\n')
|
self.nfc_sock.send(b'close ' + user.encode() + b'\n')
|
||||||
else:
|
else:
|
||||||
raise RuntimeError("No connection to NFC socket. Cannot close door!")
|
raise Exception("No connection to NFC socket. Cannot close door!")
|
||||||
|
|
||||||
def get_most_recent_token(self):
|
def get_most_recent_token(self):
|
||||||
# read last invalid token from logfile
|
# read last invalid token from logfile
|
||||||
|
|
|
@ -1,46 +0,0 @@
|
||||||
from flask import flash
|
|
||||||
from flask_wtf import FlaskForm
|
|
||||||
from wtforms.fields import DateField, EmailField
|
|
||||||
from wtforms.fields import StringField, BooleanField
|
|
||||||
from wtforms.validators import DataRequired, ValidationError, EqualTo
|
|
||||||
from datetime import date
|
|
||||||
|
|
||||||
|
|
||||||
def validate_valid_thru_date(form, field):
|
|
||||||
if form.limit_validity.data:
|
|
||||||
# only check date format if limited validity of token is set
|
|
||||||
try:
|
|
||||||
if not field.data >= date.today():
|
|
||||||
raise ValueError
|
|
||||||
except ValueError as e:
|
|
||||||
flash("Ungültiges Datum")
|
|
||||||
raise ValidationError
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
class TokenForm(FlaskForm):
|
|
||||||
name = StringField("Name", validators=[DataRequired()])
|
|
||||||
email = EmailField("E-Mail", validators=[DataRequired()])
|
|
||||||
organization = StringField("Organization", validators=[DataRequired()])
|
|
||||||
limit_validity = BooleanField("Gültigkeit begrenzen?")
|
|
||||||
valid_thru = DateField("Gültig bis", validators=[validate_valid_thru_date])
|
|
||||||
active = BooleanField("Aktiv?")
|
|
||||||
dsgvo = BooleanField(
|
|
||||||
"Einwilligung Nutzungsbedingungen erfragt?", validators=[DataRequired()]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class ConfirmDeleteForm(FlaskForm):
|
|
||||||
name = StringField(
|
|
||||||
"Name",
|
|
||||||
validators=[
|
|
||||||
DataRequired(),
|
|
||||||
EqualTo("name_confirm", "Name stimmt nicht überein"),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
name_confirm = StringField("Name confirm")
|
|
||||||
|
|
||||||
|
|
||||||
class AdminCreationForm(FlaskForm):
|
|
||||||
name = StringField("Name", validators=[DataRequired()])
|
|
||||||
email = EmailField("E-Mail", validators=[DataRequired()])
|
|
|
@ -1,19 +1,11 @@
|
||||||
from flask import (
|
from flask import render_template, request, flash, redirect, session, send_file, Blueprint, current_app
|
||||||
render_template,
|
|
||||||
request,
|
|
||||||
flash,
|
|
||||||
redirect,
|
|
||||||
send_file,
|
|
||||||
Blueprint,
|
|
||||||
current_app,
|
|
||||||
)
|
|
||||||
from werkzeug.utils import secure_filename
|
from werkzeug.utils import secure_filename
|
||||||
from flask_security import (
|
from flask_wtf import FlaskForm
|
||||||
auth_required,
|
from wtforms.fields import DateField, EmailField
|
||||||
hash_password,
|
from wtforms.fields import StringField, BooleanField
|
||||||
current_user,
|
from wtforms.validators import DataRequired, ValidationError, EqualTo
|
||||||
roles_required
|
from flask_security import auth_required, hash_password, \
|
||||||
)
|
current_user, roles_required
|
||||||
from flask_security.views import change_password
|
from flask_security.views import change_password
|
||||||
from email_validator import validate_email
|
from email_validator import validate_email
|
||||||
|
|
||||||
|
@ -25,231 +17,207 @@ import tempfile
|
||||||
from datetime import date, datetime, timedelta
|
from datetime import date, datetime, timedelta
|
||||||
|
|
||||||
from imaginaerraum_door_admin import db, security
|
from imaginaerraum_door_admin import db, security
|
||||||
from imaginaerraum_door_admin.forms import (
|
|
||||||
AdminCreationForm,
|
|
||||||
ConfirmDeleteForm,
|
|
||||||
TokenForm,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
door_app = Blueprint("door_app", __name__, template_folder="templates")
|
def validate_valid_thru_date(form, field):
|
||||||
|
if form.limit_validity.data: # only check date format if limited validity of token is set
|
||||||
|
try:
|
||||||
|
if not field.data >= date.today():
|
||||||
|
raise ValueError
|
||||||
|
except ValueError as e:
|
||||||
|
flash("Ungültiges Datum")
|
||||||
|
raise ValidationError
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
# we override the change_password view from flask security to only allow local
|
class TokenForm(FlaskForm):
|
||||||
# users to change their passwords
|
name = StringField('Name', validators=[DataRequired()])
|
||||||
|
email = EmailField('E-Mail', validators=[DataRequired()])
|
||||||
|
organization = StringField('Organization', validators=[DataRequired()])
|
||||||
|
limit_validity = BooleanField('Gültigkeit begrenzen?')
|
||||||
|
valid_thru = DateField('Gültig bis', validators=[validate_valid_thru_date])
|
||||||
|
active = BooleanField('Aktiv?')
|
||||||
|
dsgvo = BooleanField('Einwilligung Nutzungsbedingungen erfragt?', validators=[DataRequired()])
|
||||||
|
|
||||||
|
|
||||||
|
class ConfirmDeleteForm(FlaskForm):
|
||||||
|
name = StringField('Name', validators=[DataRequired(), EqualTo('name_confirm', 'Name stimmt nicht überein')])
|
||||||
|
name_confirm = StringField('Name confirm')
|
||||||
|
|
||||||
|
|
||||||
|
class AdminCreationForm(FlaskForm):
|
||||||
|
name = StringField('Name', validators=[DataRequired()])
|
||||||
|
email = EmailField('E-Mail', validators=[DataRequired()])
|
||||||
|
|
||||||
|
door_app = Blueprint('door_app', __name__,
|
||||||
|
template_folder='templates')
|
||||||
|
|
||||||
|
|
||||||
|
# we override the change_password view from flask security to only allow local users to change their passwords
|
||||||
# LDAP users should use the LDAP self service for changing passwords
|
# LDAP users should use the LDAP self service for changing passwords
|
||||||
# this route needs to be defined before the Flask Security setup
|
# this route needs to be defined before the Flask Security setup
|
||||||
@door_app.route("/change", methods=["GET", "POST"])
|
@door_app.route('/change', methods=['GET', 'POST'])
|
||||||
@auth_required()
|
@auth_required()
|
||||||
def change_pw():
|
def change_pw():
|
||||||
if current_user.has_role("local"):
|
if current_user.has_role('local'):
|
||||||
# local users can change their password
|
# local users can change their password
|
||||||
return change_password()
|
return change_password()
|
||||||
else:
|
else:
|
||||||
# LDAP users get redirected to the LDAP self service
|
# LDAP users get redirected to the LDAP self service
|
||||||
return redirect("https://ldap.imaginaerraum.de/")
|
return redirect('https://ldap.imaginaerraum.de/')
|
||||||
|
|
||||||
|
|
||||||
# admin user management
|
# admin user management
|
||||||
@door_app.route("/manage_admins", methods=["GET", "POST"])
|
@door_app.route('/manage_admins', methods=['GET', 'POST'])
|
||||||
@roles_required("super_admin")
|
@roles_required('super_admin')
|
||||||
def manage_admins():
|
def manage_admins():
|
||||||
form = AdminCreationForm()
|
form = AdminCreationForm()
|
||||||
if request.method == "GET":
|
if request.method == 'GET':
|
||||||
users = security.datastore.user_model.query.all()
|
users = security.datastore.user_model.query.all()
|
||||||
admin_data = [
|
admin_data = [{'username': u.username, 'email': u.email, 'active': u.is_active,
|
||||||
{
|
'admin': u.has_role('admin'), 'super_admin': u.has_role('super_admin'),
|
||||||
"username": u.username,
|
} for u in users]
|
||||||
"email": u.email,
|
return render_template('admins.html', admin_data=admin_data, form=form)
|
||||||
"active": u.is_active,
|
|
||||||
"admin": u.has_role("admin"),
|
|
||||||
"super_admin": u.has_role("super_admin"),
|
|
||||||
}
|
|
||||||
for u in users
|
|
||||||
]
|
|
||||||
return render_template("admins.html", admin_data=admin_data, form=form)
|
|
||||||
elif form.validate():
|
elif form.validate():
|
||||||
if (
|
if security.datastore.find_user(username=form.name.data) is not None or \
|
||||||
security.datastore.find_user(username=form.name.data) is not None
|
security.datastore.find_user(email=form.email.data) is not None:
|
||||||
or security.datastore.find_user(email=form.email.data) is not None
|
flash("Ein Benutzer mit diesem Nutzernamen oder dieser E-Mail-Adresse existiert bereits!")
|
||||||
):
|
|
||||||
flash(
|
|
||||||
"Ein Benutzer mit diesem Nutzernamen oder dieser E-Mail-Adresse"
|
|
||||||
" existiert bereits!"
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
pw = secrets.token_urlsafe(16)
|
pw = secrets.token_urlsafe(16)
|
||||||
new_user = security.datastore.create_user(
|
new_user = security.datastore.create_user(username=form.name.data, email=form.email.data,
|
||||||
username=form.name.data,
|
password=hash_password(pw))
|
||||||
email=form.email.data,
|
security.datastore.add_role_to_user(new_user, 'local')
|
||||||
password=hash_password(pw),
|
|
||||||
)
|
|
||||||
security.datastore.add_role_to_user(new_user, "local")
|
|
||||||
current_app.logger.info(
|
current_app.logger.info(
|
||||||
f"Super admin {current_user.username} created new user account "
|
f"Super admin {current_user.username} created new user account for {new_user.username} <{new_user.email}>")
|
||||||
f"for {new_user.username} <{new_user.email}>"
|
flash(f"Ein Account für den Nutzer {new_user.username} wurde erstellt. Verwende das Passwort {pw} um den Nutzer einzuloggen.")
|
||||||
)
|
|
||||||
flash(
|
|
||||||
f"Ein Account für den Nutzer {new_user.username} wurde "
|
|
||||||
f"erstellt. Verwende das Passwort {pw} um den Nutzer "
|
|
||||||
f"einzuloggen."
|
|
||||||
)
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
else:
|
else:
|
||||||
flash(f"Ungültige Eingabe: {form.errors}")
|
flash(f"Ungültige Eingabe: {form.errors}")
|
||||||
|
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
|
|
||||||
|
|
||||||
@door_app.route("/delete_admins/<username>", methods=["GET", "POST"])
|
@door_app.route('/delete_admins/<username>', methods=['GET', 'POST'])
|
||||||
@roles_required("super_admin")
|
@roles_required('super_admin')
|
||||||
def delete_admins(username):
|
def delete_admins(username):
|
||||||
user = security.datastore.find_user(username=username)
|
user = security.datastore.find_user(username=username)
|
||||||
if user is None:
|
if user is None:
|
||||||
flash(f"Ungültiger Nutzer {username}")
|
flash(f"Ungültiger Nutzer {username}")
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
if user.has_role("super_admin"):
|
if user.has_role('super_admin'):
|
||||||
flash("Super-Admins können nicht gelöscht werden!")
|
flash('Super-Admins können nicht gelöscht werden!')
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
if user.is_active:
|
if user.is_active:
|
||||||
flash(
|
flash('Aktive Nutzer können nicht gelöscht werden! Bitte den Benutzer zuerst deaktivieren.')
|
||||||
"Aktive Nutzer können nicht gelöscht werden! Bitte den Benutzer "
|
return redirect('/manage_admins')
|
||||||
"zuerst deaktivieren."
|
|
||||||
)
|
|
||||||
return redirect("/manage_admins")
|
|
||||||
|
|
||||||
# set up form for confirming deletion
|
# set up form for confirming deletion
|
||||||
form = ConfirmDeleteForm()
|
form = ConfirmDeleteForm()
|
||||||
form.name_confirm.data = username
|
form.name_confirm.data = username
|
||||||
|
|
||||||
if request.method == "GET":
|
if request.method == 'GET':
|
||||||
# return page asking the user to confirm delete
|
# return page asking the user to confirm delete
|
||||||
return render_template("delete_user.html", username=username, form=form)
|
return render_template('delete_user.html', username=username, form=form)
|
||||||
elif form.validate():
|
elif form.validate():
|
||||||
security.datastore.delete_user(user)
|
security.datastore.delete_user(user)
|
||||||
flash(f"Benutzer {username} wurde gelöscht.")
|
flash(f"Benutzer {username} wurde gelöscht.")
|
||||||
current_app.logger.info(
|
current_app.logger.info(f"Super admin {current_user.username} deleted admin user {username}")
|
||||||
f"Super admin {current_user.username} deleted admin user {username}"
|
|
||||||
)
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
else:
|
else:
|
||||||
flash(
|
flash("Der eingegebene Nutzername stimmt nicht überein. Der Benutzer wurde nicht gelöscht!")
|
||||||
"Der eingegebene Nutzername stimmt nicht überein. Der Benutzer "
|
return redirect('/manage_admins')
|
||||||
"wurde nicht gelöscht!"
|
|
||||||
)
|
|
||||||
return redirect("/manage_admins")
|
|
||||||
|
|
||||||
|
|
||||||
@door_app.route("/admin_toggle_active/<username>")
|
@door_app.route('/admin_toggle_active/<username>')
|
||||||
@roles_required("super_admin")
|
@roles_required('super_admin')
|
||||||
def admin_toggle_active(username):
|
def admin_toggle_active(username):
|
||||||
user = security.datastore.find_user(username=username)
|
user = security.datastore.find_user(username=username)
|
||||||
if user is None:
|
if user is None:
|
||||||
flash(f"Ungültiger Nutzer {username}")
|
flash(f"Ungültiger Nutzer {username}")
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
if user.has_role("super_admin"):
|
if user.has_role('super_admin'):
|
||||||
flash("Super-Admins können nicht deaktiviert werden!")
|
flash('Super-Admins können nicht deaktiviert werden!')
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
security.datastore.toggle_active(user)
|
security.datastore.toggle_active(user)
|
||||||
if user.is_active:
|
if user.is_active:
|
||||||
current_app.logger.info(
|
current_app.logger.info(f"Super admin {current_user.username} activated access for admin user {username}")
|
||||||
f"Super admin {current_user.username} activated access for admin "
|
|
||||||
f"user {username}"
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
current_app.logger.info(
|
current_app.logger.info(f"Super admin {current_user.username} deactivated access for admin user {username}")
|
||||||
f"Super admin {current_user.username} deactivated access for admin "
|
|
||||||
f"user {username}"
|
|
||||||
)
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
|
|
||||||
|
|
||||||
@door_app.route("/promote_admin/<username>")
|
@door_app.route('/promote_admin/<username>')
|
||||||
@roles_required("super_admin")
|
@roles_required('super_admin')
|
||||||
def promote_admin(username):
|
def promote_admin(username):
|
||||||
user = security.datastore.find_user(username=username)
|
user = security.datastore.find_user(username=username)
|
||||||
if user is None:
|
if user is None:
|
||||||
flash(f"Ungültiger Nutzer {username}")
|
flash(f"Ungültiger Nutzer {username}")
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
if user.has_role("admin"):
|
if user.has_role('admin'):
|
||||||
flash(f"Benutzer {username} hat bereits Admin-Rechte!")
|
flash(f'Benutzer {username} hat bereits Admin-Rechte!')
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
security.datastore.add_role_to_user(user, "admin")
|
security.datastore.add_role_to_user(user, 'admin')
|
||||||
current_app.logger.info(
|
current_app.logger.info(f"Super admin {current_user.username} granted admin privileges to user {username}")
|
||||||
f"Super admin {current_user.username} granted admin privileges to "
|
|
||||||
f"user {username}"
|
|
||||||
)
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
|
|
||||||
|
|
||||||
@door_app.route("/demote_admin/<username>")
|
@door_app.route('/demote_admin/<username>')
|
||||||
@roles_required("super_admin")
|
@roles_required('super_admin')
|
||||||
def demote_admin(username):
|
def demote_admin(username):
|
||||||
user = security.datastore.find_user(username=username)
|
user = security.datastore.find_user(username=username)
|
||||||
if user is None:
|
if user is None:
|
||||||
flash(f"Ungültiger Nutzer {username}")
|
flash(f"Ungültiger Nutzer {username}")
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
if user.has_role("super_admin"):
|
if user.has_role('super_admin'):
|
||||||
flash(
|
flash(f'Benutzer {username} hat Super-Admin-Rechte und kann nicht '
|
||||||
f"Benutzer {username} hat Super-Admin-Rechte und kann nicht "
|
'verändert werden!')
|
||||||
"verändert werden!"
|
return redirect('/manage_admins')
|
||||||
)
|
if user.has_role('admin'):
|
||||||
return redirect("/manage_admins")
|
security.datastore.remove_role_from_user(user, 'admin')
|
||||||
if user.has_role("admin"):
|
current_app.logger.info(f"Super admin {current_user.username} revoked "
|
||||||
security.datastore.remove_role_from_user(user, "admin")
|
f"admin privileges of user {username}")
|
||||||
current_app.logger.info(
|
|
||||||
f"Super admin {current_user.username} revoked "
|
|
||||||
f"admin privileges of user {username}"
|
|
||||||
)
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
else:
|
else:
|
||||||
flash(f"Benutzer {username} ist bereits kein Admin!")
|
flash(f'Benutzer {username} ist bereits kein Admin!')
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
|
|
||||||
|
|
||||||
@door_app.route("/backup_user_datastore")
|
@door_app.route('/backup_user_datastore')
|
||||||
@roles_required("super_admin")
|
@roles_required('super_admin')
|
||||||
def backup_user_datastore():
|
def backup_user_datastore():
|
||||||
# get list of defined admin users for backup
|
# get list of defined admin users for backup
|
||||||
users = security.datastore.user_model.query.all()
|
users = security.datastore.user_model.query.all()
|
||||||
user_data = [
|
user_data = [{'username': u.username, 'email': u.email,
|
||||||
{
|
'active': u.is_active, 'password_hash': u.password,
|
||||||
"username": u.username,
|
'roles': [r.name for r in u.roles]}
|
||||||
"email": u.email,
|
for u in users if not u.has_role('super_admin')]
|
||||||
"active": u.is_active,
|
|
||||||
"password_hash": u.password,
|
|
||||||
"roles": [r.name for r in u.roles],
|
|
||||||
}
|
|
||||||
for u in users
|
|
||||||
if not u.has_role("super_admin")
|
|
||||||
]
|
|
||||||
try:
|
try:
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
file = Path(tmpdir, "user_data.txt")
|
file = Path(tmpdir, 'user_data.txt')
|
||||||
file.write_text(json.dumps(user_data))
|
file.write_text(json.dumps(user_data))
|
||||||
return send_file(file, as_attachment=True, cache_timeout=-1)
|
return send_file(file, as_attachment=True, cache_timeout=-1)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return str(e)
|
return str(e)
|
||||||
|
|
||||||
|
|
||||||
@door_app.route("/restore_user_datastore", methods=["POST"])
|
@door_app.route('/restore_user_datastore', methods=['POST'])
|
||||||
@roles_required("super_admin")
|
@roles_required('super_admin')
|
||||||
def restore_user_datastore():
|
def restore_user_datastore():
|
||||||
# check if the post request has the file part
|
# check if the post request has the file part
|
||||||
if "file" not in request.files:
|
if 'file' not in request.files:
|
||||||
flash("Keine Datei ausgewählt!")
|
flash('Keine Datei ausgewählt!')
|
||||||
return redirect(request.url)
|
return redirect(request.url)
|
||||||
file = request.files["file"]
|
file = request.files['file']
|
||||||
# if user does not select file, browser also
|
# if user does not select file, browser also
|
||||||
# submit an empty part without filename
|
# submit an empty part without filename
|
||||||
if file.filename == "":
|
if file.filename == '':
|
||||||
flash("Keine Datei ausgewählt!")
|
flash('Keine Datei ausgewählt!')
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
filename = secure_filename(file.filename)
|
filename = secure_filename(file.filename)
|
||||||
if file and filename.endswith(".txt"):
|
if file and filename.endswith('.txt'):
|
||||||
data = file.stream.read()
|
data = file.stream.read()
|
||||||
try:
|
try:
|
||||||
# check validity of user data
|
# check validity of user data
|
||||||
|
@ -258,97 +226,63 @@ def restore_user_datastore():
|
||||||
valid &= all(type(d) == dict for d in user_data)
|
valid &= all(type(d) == dict for d in user_data)
|
||||||
if valid:
|
if valid:
|
||||||
for d in user_data:
|
for d in user_data:
|
||||||
entry_valid = set(d.keys()) == {
|
entry_valid = set(d.keys()) == { 'active', 'email', 'password_hash', 'username', 'roles'}
|
||||||
"active",
|
entry_valid &= all(len(d[key]) > 0 for key in ['email', 'password_hash', 'username'])
|
||||||
"email",
|
entry_valid &= type(d['active']) == bool
|
||||||
"password_hash",
|
entry_valid &= type(d['roles']) == list
|
||||||
"username",
|
validate_email(d['email'])
|
||||||
"roles",
|
|
||||||
}
|
|
||||||
entry_valid &= all(
|
|
||||||
len(d[key]) > 0
|
|
||||||
for key in ["email", "password_hash", "username"]
|
|
||||||
)
|
|
||||||
entry_valid &= type(d["active"]) == bool
|
|
||||||
entry_valid &= type(d["roles"]) == list
|
|
||||||
validate_email(d["email"])
|
|
||||||
if entry_valid:
|
if entry_valid:
|
||||||
existing_user = security.datastore.find_user(
|
existing_user = security.datastore.find_user(username=d['username'], email=d['email'])
|
||||||
username=d["username"], email=d["email"]
|
|
||||||
)
|
|
||||||
if existing_user is None:
|
if existing_user is None:
|
||||||
security.datastore.create_user(
|
security.datastore.create_user(username=d['username'], email=d['email'],
|
||||||
username=d["username"],
|
password=d['password_hash'], active=d['active'],
|
||||||
email=d["email"],
|
roles=d['roles'])
|
||||||
password=d["password_hash"],
|
flash(f"Account für Benutzer '{d['username']} wurde wiederhergestellt.")
|
||||||
active=d["active"],
|
|
||||||
roles=d["roles"],
|
|
||||||
)
|
|
||||||
flash(
|
|
||||||
f"Account für Benutzer '{d['username']} wurde "
|
|
||||||
f"wiederhergestellt."
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
flash(
|
flash(f"Benutzer '{d['username']} existiert bereits. Eintrag wird übersprungen.")
|
||||||
f"Benutzer '{d['username']} existiert bereits."
|
|
||||||
f" Eintrag wird übersprungen."
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Ungültige Daten für User Entry {d}")
|
raise ValueError(f"Ungültige Daten für User Entry {d}")
|
||||||
else:
|
else:
|
||||||
raise ValueError("Admin User Datei hat ungültiges Format.")
|
raise ValueError("Admin User Datei hat ungültiges Format.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(f"Die Datei konnte nicht gelesen werden. Exception: {e}")
|
flash(f"Die Datei konnte nicht gelesen werden. Exception: {e}")
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
flash("Benutzer aus Datei gelesen.")
|
flash("Benutzer aus Datei gelesen.")
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
else:
|
else:
|
||||||
flash("Ungültige Dateiendung")
|
flash("Ungültige Dateiendung")
|
||||||
return redirect("/manage_admins")
|
return redirect('/manage_admins')
|
||||||
|
|
||||||
|
|
||||||
# main page
|
# main page
|
||||||
@door_app.route("/")
|
@door_app.route('/')
|
||||||
def door_lock():
|
def door_lock():
|
||||||
return render_template(
|
return render_template('index.html', door_state=current_app.door.state, encoder_position=current_app.door.encoder_position)
|
||||||
"index.html",
|
|
||||||
door_state=current_app.door.state,
|
|
||||||
encoder_position=current_app.door.encoder_position,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# token overview
|
# token overview
|
||||||
@door_app.route("/tokens")
|
@door_app.route('/tokens')
|
||||||
@roles_required("admin")
|
@roles_required('admin')
|
||||||
def list_tokens():
|
def list_tokens():
|
||||||
tokens = current_app.door.get_tokens()
|
tokens = current_app.door.get_tokens()
|
||||||
assigned_tokens = {t: data for t, data in tokens.items()
|
assigned_tokens = {t: data for t, data in tokens.items() if not data['inactive']}
|
||||||
if not data["inactive"]}
|
inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']}
|
||||||
inactive_tokens = {t: data for t, data in tokens.items()
|
return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens)
|
||||||
if data["inactive"]}
|
|
||||||
return render_template(
|
|
||||||
"tokens.html",
|
|
||||||
assigned_tokens=assigned_tokens,
|
|
||||||
inactive_tokens=inactive_tokens
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@door_app.route("/token-log")
|
@door_app.route('/token-log')
|
||||||
@roles_required("super_admin")
|
@roles_required('super_admin')
|
||||||
def token_log():
|
def token_log():
|
||||||
log = []
|
log = []
|
||||||
try:
|
try:
|
||||||
with open(current_app.config["NFC_LOG"]) as f:
|
with open(current_app.config['NFC_LOG']) as f:
|
||||||
log += f.readlines()
|
log += f.readlines()
|
||||||
log.reverse()
|
log.reverse()
|
||||||
log = [l.split(" - ") for l in log]
|
log = [l.split(' - ') for l in log]
|
||||||
return render_template("token_log.html", log=log)
|
return render_template('token_log.html', log=log)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(
|
flash(f"NFC logfile {Path(current_app.config['NFC_LOG']).absolute()} konnte nicht gelesen werden. Exception: {e}")
|
||||||
f"NFC logfile {Path(current_app.config['NFC_LOG']).absolute()} "
|
return redirect('/')
|
||||||
f"konnte nicht gelesen werden. Exception: {e}"
|
|
||||||
)
|
|
||||||
return redirect("/")
|
|
||||||
|
|
||||||
|
|
||||||
def store_token(token_data):
|
def store_token(token_data):
|
||||||
|
@ -357,82 +291,69 @@ def store_token(token_data):
|
||||||
This will use the token id and the associated data and create/modify a
|
This will use the token id and the associated data and create/modify a
|
||||||
token and store the new token file to disk.
|
token and store the new token file to disk.
|
||||||
"""
|
"""
|
||||||
token = token_data["token"]
|
token = token_data['token']
|
||||||
tokens = current_app.door.get_tokens()
|
tokens = current_app.door.get_tokens()
|
||||||
tokens[token] = {
|
tokens[token] = {'name': token_data['name'],
|
||||||
"name": token_data["name"],
|
'email': token_data['email'],
|
||||||
"email": token_data["email"],
|
'valid_thru': token_data['valid_thru'],
|
||||||
"valid_thru": token_data["valid_thru"],
|
'inactive': token_data['inactive'],
|
||||||
"inactive": token_data["inactive"],
|
'organization': token_data['organization']}
|
||||||
"organization": token_data["organization"],
|
|
||||||
}
|
|
||||||
try:
|
try:
|
||||||
current_app.door.store_tokens(tokens)
|
current_app.door.store_tokens(tokens)
|
||||||
current_app.logger.info(
|
current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}")
|
||||||
f"Token {token} stored in database by admin user "
|
|
||||||
f"{current_user.username}"
|
|
||||||
)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(f"Error during store_tokens. Exception: {e}")
|
flash(f"Error during store_tokens. Exception: {e}")
|
||||||
|
|
||||||
|
|
||||||
# routes for registering, editing and deleting tokens
|
# routes for registering, editing and deleting tokens
|
||||||
@door_app.route("/register-token", methods=["GET", "POST"])
|
@door_app.route('/register-token', methods=['GET', 'POST'])
|
||||||
@roles_required("admin")
|
@roles_required('admin')
|
||||||
def register():
|
def register():
|
||||||
"""Register new token for locking and unlocking the door.
|
"""Register new token for locking and unlocking the door.
|
||||||
|
|
||||||
This route displays the most recently scanned invalid token as reported in
|
This route displays the most recently scanned invalid token as reported in the logfile and provides a form for
|
||||||
the logfile and provides a form for entering user info (name, email, valid
|
entering user info (name, email, valid thru date (optional)) for the new token.
|
||||||
thru date (optional)) for the new token.
|
|
||||||
|
|
||||||
If the route is called via POST the provided form data is checked and if
|
If the route is called via POST the provided form data is checked and if the check succeeds the /store-token route
|
||||||
the check succeeds the /store-token route will be called which adds the new
|
will be called which adds the new token to the database.
|
||||||
token to the database.
|
|
||||||
"""
|
"""
|
||||||
token = current_app.door.get_most_recent_token()
|
token = current_app.door.get_most_recent_token()
|
||||||
|
|
||||||
recent_token = {}
|
recent_token = {}
|
||||||
if {"token", "timestamp"}.issubset(set(token.keys())):
|
if {'token', 'timestamp'}.issubset(set(token.keys())):
|
||||||
dt = datetime.utcnow() - token["timestamp"]
|
dt = datetime.utcnow() - token['timestamp']
|
||||||
if dt < timedelta(minutes=10):
|
if dt < timedelta(minutes=10):
|
||||||
recent_token = token
|
recent_token = token
|
||||||
recent_token["timedelta_minutes"] = int(dt.total_seconds() / 60.0)
|
recent_token['timedelta_minutes'] = int(dt.total_seconds() / 60.0)
|
||||||
|
|
||||||
form = TokenForm()
|
form = TokenForm()
|
||||||
if request.method == "GET":
|
if request.method == 'GET':
|
||||||
# set default valid thru date to today to make sure form validity check
|
# set default valid thru date to today to make sure form validity check passes
|
||||||
# passes
|
|
||||||
# (will not be used if limited validity is disabled)
|
# (will not be used if limited validity is disabled)
|
||||||
form.valid_thru.data = date.today()
|
form.valid_thru.data = date.today()
|
||||||
elif request.method == "POST" and form.validate():
|
elif request.method == 'POST' and form.validate():
|
||||||
token_data = {
|
token_data = {
|
||||||
"token": current_app.door.get_most_recent_token()["token"],
|
'token': current_app.door.get_most_recent_token()['token'],
|
||||||
"name": form.name.data,
|
'name': form.name.data, 'email': form.email.data,
|
||||||
"email": form.email.data,
|
'organization': form.organization.data,
|
||||||
"organization": form.organization.data,
|
'inactive': not form.active.data,
|
||||||
"inactive": not form.active.data,
|
'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else ''
|
||||||
"valid_thru": form.valid_thru.data.isoformat()
|
|
||||||
if form.limit_validity.data
|
|
||||||
else "",
|
|
||||||
}
|
}
|
||||||
store_token(token_data)
|
store_token(token_data)
|
||||||
return redirect("/tokens")
|
return redirect('/tokens')
|
||||||
else:
|
else:
|
||||||
flash(f"Token konnte nicht registiert werden. Fehler: {form.errors}")
|
flash(f'Token konnte nicht registiert werden. Fehler: {form.errors}')
|
||||||
return render_template("register.html", token=recent_token, form=form)
|
return render_template('register.html', token=recent_token, form=form)
|
||||||
|
|
||||||
|
|
||||||
@door_app.route("/edit-token/<token>", methods=["GET", "POST"])
|
@door_app.route('/edit-token/<token>', methods=['GET', 'POST'])
|
||||||
@roles_required("admin")
|
@roles_required('admin')
|
||||||
def edit_token(token):
|
def edit_token(token):
|
||||||
"""Edit data in the token file (name, email, valid_thru date, active).
|
"""Edit data in the token file (name, email, valid_thru date, active/inactive).
|
||||||
|
|
||||||
If the route is accessed via GET it will provide a form for editing the
|
If the route is accessed via GET it will provide a form for editing the currently stored data for the user.
|
||||||
currently stored data for the user.
|
If the route is accessed via POST it will check if the form data is good and then store the modified user data in
|
||||||
If the route is accessed via POST it will check if the form data is good
|
the database (by redirecting to the /store-token route)
|
||||||
and then store the modified user data in the database (by redirecting to
|
|
||||||
the /store-token route)
|
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
|
@ -440,60 +361,51 @@ def edit_token(token):
|
||||||
The token for which data should be edited.
|
The token for which data should be edited.
|
||||||
"""
|
"""
|
||||||
form = TokenForm(request.form)
|
form = TokenForm(request.form)
|
||||||
form.dsgvo.validators = (
|
form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed
|
||||||
[]
|
# to it before
|
||||||
)
|
if request.method == 'GET':
|
||||||
# we skip the validation of the DSGVO checkbox here because we assume
|
|
||||||
# the user agreed to it before
|
|
||||||
if request.method == "GET":
|
|
||||||
tokens = current_app.door.get_tokens()
|
tokens = current_app.door.get_tokens()
|
||||||
if token in tokens:
|
if token in tokens:
|
||||||
# set default for form according to values from the token file
|
# set default for form according to values from the token file
|
||||||
et = tokens[token]
|
et = tokens[token]
|
||||||
form.active.data = not et["inactive"]
|
form.active.data = not et['inactive']
|
||||||
form.name.data = et["name"] if et["name"] else ""
|
form.name.data = et['name'] if et['name'] else ''
|
||||||
form.email.data = et["email"] if et["email"] else ""
|
form.email.data = et['email'] if et['email'] else ''
|
||||||
form.organization.data = et["organization"] \
|
form.organization.data = et['organization'] if et['organization'] else ''
|
||||||
if et["organization"] else ""
|
|
||||||
|
|
||||||
# for the valid thru date we use today's date in case there is not
|
# for the valid thru date we use today's date in case there is not valid date in the database
|
||||||
# valid date in the database
|
|
||||||
try:
|
try:
|
||||||
form.valid_thru.data = date.fromisoformat(et["valid_thru"])
|
form.valid_thru.data = date.fromisoformat(et['valid_thru'])
|
||||||
form.limit_validity.data = True
|
form.limit_validity.data = True
|
||||||
except Exception:
|
except Exception:
|
||||||
form.valid_thru.data = date.today()
|
form.valid_thru.data = date.today()
|
||||||
|
|
||||||
return render_template("edit.html", token=token, form=form)
|
return render_template('edit.html', token=token, form=form)
|
||||||
else:
|
else:
|
||||||
# flash an error message if the route is accessed with an invalid
|
# flash an error message if the route is accessed with an invalid token
|
||||||
# token
|
flash(f'Ungültiger Token {token}!')
|
||||||
flash(f"Ungültiger Token {token}!")
|
return redirect('/tokens')
|
||||||
return redirect("/tokens")
|
elif request.method == 'POST':
|
||||||
elif request.method == "POST":
|
|
||||||
if form.validate():
|
if form.validate():
|
||||||
# store data in token_data cookie
|
# store data in token_data cookie
|
||||||
token_data = {
|
token_data = {'token': token,
|
||||||
"token": token,
|
'name': form.name.data,
|
||||||
"name": form.name.data,
|
'organization': form.organization.data,
|
||||||
"organization": form.organization.data,
|
'email': form.email.data,
|
||||||
"email": form.email.data,
|
'inactive': not form.active.data,
|
||||||
"inactive": not form.active.data,
|
'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else ''
|
||||||
"valid_thru": form.valid_thru.data.isoformat()
|
}
|
||||||
if form.limit_validity.data
|
|
||||||
else "",
|
|
||||||
}
|
|
||||||
store_token(token_data)
|
store_token(token_data)
|
||||||
return redirect("/tokens")
|
return redirect('/tokens')
|
||||||
else:
|
else:
|
||||||
flash(f"Token konnte nicht editiert werden. Fehler: {form.errors}")
|
flash(f'Token konnte nicht editiert werden. Fehler: {form.errors}')
|
||||||
return render_template("edit.html", token=token, form=form)
|
return render_template('edit.html', token=token, form=form)
|
||||||
|
|
||||||
|
|
||||||
@door_app.route("/delete-token/<token>", methods=["GET", "POST"])
|
@door_app.route('/delete-token/<token>', methods=['GET', 'POST'])
|
||||||
@roles_required("admin")
|
@roles_required('admin')
|
||||||
def delete_token(token):
|
def delete_token(token):
|
||||||
"""Delete the given token from the token file and store the new token file.
|
"""Delete the given token from the token file and store the new token file to disk
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
|
@ -503,31 +415,29 @@ def delete_token(token):
|
||||||
tokens = current_app.door.get_tokens()
|
tokens = current_app.door.get_tokens()
|
||||||
|
|
||||||
if token not in tokens:
|
if token not in tokens:
|
||||||
flash(f"Ungültiger Token {token} für Löschung.")
|
flash(f'Ungültiger Token {token} für Löschung.')
|
||||||
return redirect("/tokens")
|
return redirect('/tokens')
|
||||||
|
|
||||||
token_to_delete = tokens[token]
|
token_to_delete = tokens[token]
|
||||||
|
|
||||||
# set up form for confirming deletion
|
# set up form for confirming deletion
|
||||||
form = ConfirmDeleteForm()
|
form = ConfirmDeleteForm()
|
||||||
form.name_confirm.data = token_to_delete["name"]
|
form.name_confirm.data = token_to_delete['name']
|
||||||
|
|
||||||
if request.method == "GET":
|
if request.method == 'GET':
|
||||||
# return page asking the user to confirm delete
|
# return page asking the user to confirm delete
|
||||||
return render_template("delete.html", token=token_to_delete, form=form)
|
return render_template('delete.html', token=token_to_delete, form=form)
|
||||||
elif form.validate():
|
elif form.validate():
|
||||||
# form validation successful -> can delete the token
|
# form validation successful -> can delete the token
|
||||||
tokens.pop(token)
|
tokens.pop(token)
|
||||||
try:
|
try:
|
||||||
current_app.door.store_tokens(tokens)
|
current_app.door.store_tokens(tokens)
|
||||||
current_app.logger.info(
|
current_app.logger.info(f"Token {token} was deleted from database "
|
||||||
f"Token {token} was deleted from database "
|
f"by admin user {current_user.username}")
|
||||||
f"by admin user {current_user.username}"
|
|
||||||
)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(f"Error during store_tokens. Exception: {e}")
|
flash(f"Error during store_tokens. Exception: {e}")
|
||||||
flash(f"Token {token} wurde gelöscht!")
|
flash(f"Token {token} wurde gelöscht!")
|
||||||
return redirect("/tokens")
|
return redirect('/tokens')
|
||||||
else:
|
else:
|
||||||
# form validation failed -> return to token overview and flash message
|
# form validation failed -> return to token overview and flash message
|
||||||
flash(
|
flash(
|
||||||
|
@ -535,15 +445,13 @@ def delete_token(token):
|
||||||
f"Der Token {token} von {token_to_delete['name']} wurde nicht "
|
f"Der Token {token} von {token_to_delete['name']} wurde nicht "
|
||||||
"gelöscht."
|
"gelöscht."
|
||||||
)
|
)
|
||||||
return redirect("/tokens")
|
return redirect('/tokens')
|
||||||
|
|
||||||
|
|
||||||
@door_app.route("/deactivate-token/<token>")
|
@door_app.route('/deactivate-token/<token>')
|
||||||
@roles_required("admin")
|
@roles_required('admin')
|
||||||
def deactivate_token(token):
|
def deactivate_token(token):
|
||||||
"""Deactivate access for the given token.
|
"""Deactivate access for the given token. This updates the token file on disk.
|
||||||
|
|
||||||
This updates the token file on disk.
|
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
|
@ -553,54 +461,50 @@ def deactivate_token(token):
|
||||||
tokens = current_app.door.get_tokens()
|
tokens = current_app.door.get_tokens()
|
||||||
|
|
||||||
if token not in tokens:
|
if token not in tokens:
|
||||||
flash(f"Ungültiger Token {token} für Deaktivierung.")
|
flash(f'Ungültiger Token {token} für Deaktivierung.')
|
||||||
return redirect("/tokens")
|
return redirect('/tokens')
|
||||||
|
|
||||||
tokens[token]["inactive"] = True
|
tokens[token]['inactive'] = True
|
||||||
try:
|
try:
|
||||||
current_app.door.store_tokens(tokens)
|
current_app.door.store_tokens(tokens)
|
||||||
current_app.logger.info(
|
current_app.logger.info(f"Token {token} deactivated by admin user {current_user.username}")
|
||||||
f"Token {token} deactivated by admin user {current_user.username}"
|
|
||||||
)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(f"Error during store_tokens. Exception: {e}")
|
flash(f"Error during store_tokens. Exception: {e}")
|
||||||
return redirect("/tokens")
|
return redirect('/tokens')
|
||||||
|
|
||||||
|
|
||||||
@door_app.route("/backup_tokens")
|
@door_app.route('/backup_tokens')
|
||||||
@roles_required("admin")
|
@roles_required('admin')
|
||||||
def backup_tokens():
|
def backup_tokens():
|
||||||
# get list of defined admin users for backup
|
# get list of defined admin users for backup
|
||||||
tokens = current_app.door.get_tokens()
|
tokens = current_app.door.get_tokens()
|
||||||
try:
|
try:
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
file = Path(tmpdir, "token_data.txt")
|
file = Path(tmpdir, 'token_data.txt')
|
||||||
file.write_text(json.dumps(tokens))
|
file.write_text(json.dumps(tokens))
|
||||||
return send_file(file, as_attachment=True, cache_timeout=-1)
|
return send_file(file, as_attachment=True, cache_timeout=-1)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return str(e)
|
return str(e)
|
||||||
|
|
||||||
|
|
||||||
@door_app.route("/open")
|
@door_app.route('/open')
|
||||||
@auth_required()
|
@auth_required()
|
||||||
def open_door():
|
def open_door():
|
||||||
try:
|
try:
|
||||||
current_app.door.open_door(user=current_user.username)
|
current_app.door.open_door(user=current_user.username)
|
||||||
current_app.logger.info(f"Door opened by admin user "
|
current_app.logger.info(f"Door opened by admin user {current_user.username}")
|
||||||
f"{current_user.username}")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(f"Could not open door. Exception: {e}")
|
flash(f'Could not open door. Exception: {e}')
|
||||||
return redirect("/")
|
return redirect('/')
|
||||||
|
|
||||||
|
|
||||||
# routes for opening and closing the door via the web interface
|
# routes for opening and closing the door via the web interface
|
||||||
@door_app.route("/close")
|
@door_app.route('/close')
|
||||||
@auth_required()
|
@auth_required()
|
||||||
def close_door():
|
def close_door():
|
||||||
try:
|
try:
|
||||||
current_app.door.close_door(user=current_user.username)
|
current_app.door.close_door(user=current_user.username)
|
||||||
current_app.logger.info(f"Door closed by admin user "
|
current_app.logger.info(f"Door closed by admin user {current_user.username}")
|
||||||
f"{current_user.username}")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(f"Could not close door. Exception: {e}")
|
flash(f'Could not close door. Exception: {e}')
|
||||||
return redirect("/")
|
return redirect('/')
|
||||||
|
|
|
@ -1,154 +0,0 @@
|
||||||
import datetime
|
|
||||||
import time
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
from pathlib import Path
|
|
||||||
from imaginaerraum_door_admin.door_handle import DoorHandle
|
|
||||||
import paho.mqtt.client as mqtt
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def nfc_socket(tmp_path):
|
|
||||||
# mock up a Unix socket server for testing
|
|
||||||
import threading
|
|
||||||
import socketserver
|
|
||||||
|
|
||||||
class ThreadedRequestHandler(socketserver.BaseRequestHandler):
|
|
||||||
def handle(self):
|
|
||||||
data = str(self.request.recv(1024), 'ascii')
|
|
||||||
cur_thread = threading.current_thread()
|
|
||||||
response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
|
|
||||||
try:
|
|
||||||
self.request.sendall(response)
|
|
||||||
except BrokenPipeError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
class TreadedUnixServer(socketserver.ThreadingMixIn,
|
|
||||||
socketserver.UnixStreamServer):
|
|
||||||
pass
|
|
||||||
|
|
||||||
nfc_socket_file = tmp_path / 'nfc.sock'
|
|
||||||
server = TreadedUnixServer(str(nfc_socket_file), ThreadedRequestHandler)
|
|
||||||
with server:
|
|
||||||
nfc_socket = server.server_address
|
|
||||||
|
|
||||||
# Start a thread with the server -- that thread will then start one
|
|
||||||
# more thread for each request
|
|
||||||
server_thread = threading.Thread(target=server.serve_forever)
|
|
||||||
# Exit the server thread when the main thread terminates
|
|
||||||
server_thread.daemon = True
|
|
||||||
server_thread.start()
|
|
||||||
print("Server loop running in thread:", server_thread.name)
|
|
||||||
|
|
||||||
yield nfc_socket
|
|
||||||
|
|
||||||
server.shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_door_handle(nfc_socket, tmp_path):
|
|
||||||
# test with invalid token file
|
|
||||||
with pytest.raises(FileNotFoundError):
|
|
||||||
door_handle = DoorHandle(token_file='no_such_file',
|
|
||||||
mqtt_host='test.mosquitto.org')
|
|
||||||
|
|
||||||
token_file = tmp_path / 'door_tokens'
|
|
||||||
token_file.write_text('')
|
|
||||||
door_handle = DoorHandle(
|
|
||||||
token_file=token_file, mqtt_host='test.mosquitto.org',
|
|
||||||
nfc_socket=nfc_socket
|
|
||||||
)
|
|
||||||
door_handle.nfc_sock.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def door_handle(nfc_socket, tmp_path):
|
|
||||||
token_file = tmp_path / 'door_tokens'
|
|
||||||
token_file.write_text(
|
|
||||||
"""# token | name | organization | email | valid_thru
|
|
||||||
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
|
|
||||||
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
|
|
||||||
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
door_handle = DoorHandle(
|
|
||||||
token_file=token_file, mqtt_host='test.mosquitto.org',
|
|
||||||
nfc_socket=nfc_socket
|
|
||||||
)
|
|
||||||
yield door_handle
|
|
||||||
door_handle.nfc_sock.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def door_handle_no_nfc(tmp_path):
|
|
||||||
token_file = tmp_path / 'door_tokens'
|
|
||||||
token_file.write_text('')
|
|
||||||
door_handle = DoorHandle(
|
|
||||||
token_file=token_file, mqtt_host='test.mosquitto.org',
|
|
||||||
)
|
|
||||||
yield door_handle
|
|
||||||
|
|
||||||
|
|
||||||
def test_store_tokens(door_handle):
|
|
||||||
new_tokens = {
|
|
||||||
'042979fa186280': {
|
|
||||||
'name': 'Pippin',
|
|
||||||
'organization': 'Hobbits',
|
|
||||||
'email': 'pippin@shire.me',
|
|
||||||
'valid_thru': None,
|
|
||||||
'inactive': False
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
door_handle.store_tokens(new_tokens)
|
|
||||||
|
|
||||||
token_file = Path(door_handle.token_file)
|
|
||||||
assert token_file.exists()
|
|
||||||
assert '042979fa186280' in token_file.read_text()
|
|
||||||
|
|
||||||
|
|
||||||
def test_mqtt_messages(door_handle):
|
|
||||||
# test sending messages to the door_handle via MQTT and see if the state
|
|
||||||
# gets updated accordingly
|
|
||||||
client = mqtt.Client()
|
|
||||||
|
|
||||||
con = client.connect('test.mosquitto.org', 1883)
|
|
||||||
client.publish('door/position/value', 10).wait_for_publish(1)
|
|
||||||
time.sleep(1)
|
|
||||||
assert door_handle.encoder_position == 10
|
|
||||||
|
|
||||||
client.publish('door/state/value', 'open').wait_for_publish(1)
|
|
||||||
time.sleep(1)
|
|
||||||
assert door_handle.state == 'open'
|
|
||||||
|
|
||||||
timestamp = datetime.datetime.now().replace(microsecond=0)
|
|
||||||
token = '042979fa186280'
|
|
||||||
client.publish(
|
|
||||||
'door/token/last_invalid',
|
|
||||||
f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')};{token}"
|
|
||||||
).wait_for_publish(1)
|
|
||||||
time.sleep(1)
|
|
||||||
most_recent_token = door_handle.get_most_recent_token()
|
|
||||||
assert most_recent_token['timestamp'] == timestamp
|
|
||||||
assert most_recent_token['token'] == token
|
|
||||||
|
|
||||||
client.disconnect()
|
|
||||||
|
|
||||||
|
|
||||||
def test_open_door(door_handle):
|
|
||||||
door_handle.open_door()
|
|
||||||
|
|
||||||
|
|
||||||
def test_open_door_broken_socket(door_handle_no_nfc):
|
|
||||||
# test broken nfc_socket connection
|
|
||||||
with pytest.raises(RuntimeError):
|
|
||||||
door_handle_no_nfc.open_door()
|
|
||||||
|
|
||||||
|
|
||||||
def test_close_door(door_handle):
|
|
||||||
door_handle.close_door()
|
|
||||||
|
|
||||||
|
|
||||||
def test_close_door_broken_socket(door_handle_no_nfc):
|
|
||||||
# test broken nfc_socket connection
|
|
||||||
with pytest.raises(RuntimeError):
|
|
||||||
door_handle_no_nfc.close_door()
|
|
|
@ -18,7 +18,7 @@ def test_login(browser, live_server):
|
||||||
|
|
||||||
response = browser.get(f'http://localhost:{live_server.port}/login')
|
response = browser.get(f'http://localhost:{live_server.port}/login')
|
||||||
|
|
||||||
email_form = browser.find_element('id', 'email').send_keys('gandalf')
|
email_form = browser.find_element('id', 'email').send_keys('gandalf@shire.me')
|
||||||
password_form = browser.find_element('id', 'password').send_keys('shadowfax')
|
password_form = browser.find_element('id', 'password').send_keys('shadowfax')
|
||||||
submit_button = browser.find_element('id', 'submit').click()
|
submit_button = browser.find_element('id', 'submit').click()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user