Compare commits

..

No commits in common. "blueprint_refactoring" and "master" have entirely different histories.

21 changed files with 794 additions and 1979 deletions

View File

@ -1,79 +1 @@
Flask-based web interface for user token administration of our hackerspace's door lock. Flask-based web interface for user token adminstration of our hackerspace's door lock.
## Installation
Clone the repo
```shell
git clone <path_to_repo>
```
Install using pip
```shell
pip install .
```
## Running the app
```shell
export FLASK_APP=imaginaerraum_door_admin
flask run
```
## Configuration
You can set custom configuration options by defining an environment variable
``APPLICATION_SETTINGS`` pointing to a file with configuration options.
For example, consider the following configuration file ``app_config.py``:
```
# door app configuration
SECRET_KEY = 'mysupersecretkey'
SECURITY_PASSWORD_SALT = 'saltycaramel'
TESTING = False
DEBUG = False
TOKEN_FILE = 'door_tokens.txt'
ADMIN_FILE = 'admins.txt'
NFC_SOCKET = "/tmp/nfc.sock"
NFC_LOG = "nfc.log"
```
To instruct the flask app to use this configuration, use the following commands:
```shell
export APPLICATION_SETTINGS=app_config.py
flask run
```
Below, you can find a list of configuration options.
### Flask app configuration
You can override common Flask configuration variables. In particular, you
definitely should set custom values for the ``SECRET_KEY`` and
``SECURITY_PASSWORD_SALT``.
### Token file
The token file is an ASCII file which lists the IDs of RFID tokens that can be
used to unlock the door. You can specify the path to the token file using the
``TOKEN_FILE`` variable in the configuration file.
Here's an example of a token file (lines starting with ``#`` represent inactive
tokens):
```
# token | name | organization | email | valid_thru
#042979fa186280||||
04487cfa176280|Frodo|Hobbits|frodo@shire.me|
043a85fa1a6280|Gandalf|Wizards|gandalf@middleearth.me|
#04206e2aef6880|Boromir|Humans|boromir@gondor.me|
```
### Admin file
``ADMIN_FILE`` -> file to create new super admins
### User database
### NFC files
``NFC_SOCKET = "/tmp/nfc.sock"`` -> unix socket to interact with the door
``NFC_LOG = "nfc.log"`` -> log file of door events
## Development
```shell
cd tests
export APPLICATION_SETTINGS=/home/simon/imaginaerraum/door-lock/webinterface/tests/debug_app_config.py
pytest --cov=../imaginaerraum_door_admin --cov-report=html --cov-report=term
```

28
bin/launch_webadmin Executable file
View File

@ -0,0 +1,28 @@
#!/usr/bin/env python3
import argparse
from imaginaerraum_door_admin.webapp import create_application
parser = argparse.ArgumentParser()
parser.add_argument("--key_file", default='/root/flask_keys', help="Path to file with Flask SECRET_KEY and SECURITY_PASSWORD_SALT")
parser.add_argument("--token_file", default="/etc/door_tokens", help="path to the file with door tokens and users")
parser.add_argument("--nfc_socket", default="/tmp/nfc.sock", help="socket for handling NFC reader commands")
parser.add_argument("--template_folder", default="templates", help="path to Flask templates folder")
parser.add_argument("--static_folder", default="static", help="path to Flask static folder")
parser.add_argument("--admin_file", default="/etc/admins.conf", help="Path to file for creating super admin users")
parser.add_argument("--log_file", default="/var/log/webinterface.log", help="Path to flask log file")
parser.add_argument("--nfc_log", default="/var/log/nfc.log", help="Path to nfc log file")
parser.add_argument("--ldap_url", default="ldaps://ldap.imaginaerraum.de",
help="URL for LDAP server for alternative user authorization")
parser.add_argument("--mqtt_host", default="10.10.21.2", help="IP address of MQTT broker")
parser.add_argument("--flask_port", default=80, help="Port for running the Flask server")
parser.add_argument("--mail_server", default="smtp.googlemail.com", help="email server for sending security messages")
parser.add_argument("--mail_port", default=465, help="port for security email server")
parser.add_argument("--mail_use_tls", default=False, help="use TLS for security emails")
parser.add_argument("--mail_use_ssl", default=True, help="use SSL for security emails")
parser.add_argument("--mail_username", default="admin@example.com", help="email account for sending security messages")
parser.add_argument("--mail_password", default="password", help="Password for email account")
config = parser.parse_args()
app = create_application(config)
app.run(host='0.0.0.0', port=config.flask_port)

View File

@ -1,117 +0,0 @@
import logging
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
from email_validator import validate_email
from pathlib import Path
from .door_handle import DoorHandle
security = Security()
db = SQLAlchemy()
# create admin users (only if they don't exist already)
def create_super_admins(app, user_datastore):
admin_file = Path(app.config.get('ADMIN_FILE'))
# setup user database when starting the app
new_admin_data = []
if not admin_file.exists():
app.logger.warning(
f"Admin user creation file not found at path "
f"{admin_file.absolute()}."
f"No super admins have been created in the datastore."
)
else:
# store data for new admins in memory s.t. the file can be deleted
# afterwards
admin_data = admin_file.read_text().split('\n')
for i, line in enumerate(admin_data):
if len(line.strip()) > 0 and not line.strip().startswith('#'):
try:
user, email, pw = line.split()
validate_email(email)
new_admin_data.append(
{'username': user, 'email': email,
'password': pw})
except Exception as e:
app.logger.error(
f"Error while parsing line {i} in admin config file. "
f"Config file should contain lines of <username> "
f"<email> <password>\\n'\n "
f"Exception: {e}\nAdmin account could not be created."
)
with app.app_context():
db.create_all()
super_admin_role = user_datastore.find_or_create_role(
'super_admin') # root admin = can create other admins
admin_role = user_datastore.find_or_create_role(
'admin') # 'normal' admin
local_role = user_datastore.find_or_create_role(
'local') # LDAP user or local user
for d in new_admin_data:
if user_datastore.find_user(email=d['email'],
username=d['username']) is None:
roles = [super_admin_role, admin_role]
if not d['password'] == 'LDAP':
roles.append(local_role)
# create new admin (only if admin does not already exist)
new_admin = user_datastore.create_user(
email=d['email'], username=d['username'],
password=hash_password(d['password']), roles=roles
)
app.logger.info(
f"New super admin user created with username "
f"'{new_admin.username}' and email '{new_admin.email}'"
f", roles = {[r.name for r in new_admin.roles]}"
)
db.session.commit()
def create_app():
app = Flask(__name__)
app.config.from_object(
'imaginaerraum_door_admin.default_app_config.DefaultConfig'
)
app.config.from_envvar('APPLICATION_SETTINGS')
logging.basicConfig(filename=app.config['LOG_FILE'], level=logging.INFO)
token_file = Path(app.config.get('TOKEN_FILE'))
if not token_file.exists():
app.logger.warning(
f"Token file not found at {token_file.absolute()}. "
"An empty token file will be created."
)
token_file.touch()
# create door objects which provides access to the token file and current
# door state via MQTT
app.door = DoorHandle(
token_file=token_file, mqtt_host=app.config['MQTT_HOST'],
nfc_socket=app.config['NFC_SOCKET'], logger=app.logger
)
# Mail Config
#mail = Mail(app)
# Create database connection object
db.init_app(app)
from . webapp import door_app
app.register_blueprint(door_app)
# Setup Flask-Security
from .auth import ExtendedLoginForm, User, Role
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
create_super_admins(app, user_datastore)
return app

View File

@ -1,163 +0,0 @@
from wtforms.fields import StringField, BooleanField
from flask import current_app
from flask_security import hash_password
from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import lookup_identity
from flask_security.models import fsqla_v2 as fsqla
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
from imaginaerraum_door_admin import db, security
# Define models
fsqla.FsModels.set_db_info(db)
class Role(db.Model, fsqla.FsRoleMixin):
pass
class User(db.Model, fsqla.FsUserMixin):
pass
class ExtendedLoginForm(LoginForm):
email = StringField('Benutzername oder E-Mail', [Required()])
password = PasswordField('Passwort', [Required()])
remember = BooleanField('Login merken?')
def validate(self):
# search for user in the current database
user = lookup_identity(self.email.data)
if user is not None:
# if a user is found we check if it is associated with LDAP or with
# the local database
if user.has_role('local'):
# try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate()
if authorized:
current_app.logger.info(
f"User with credentials '{self.email.data}' authorized "
f"through local database"
)
else:
# run LDAP authorization
# if the authorization succeeds we also get the new_user_data
# dict which contains information about
# the user's permissions etc.
authorized, new_user_data = self.validate_ldap()
if authorized:
current_app.logger.info(
f"User with credentials '{self.email.data}' authorized "
f"through LDAP")
# update permissions and password/email to stay up to date
# for login with no network connection
user.email = new_user_data['email']
user.password = new_user_data['password']
for role in new_user_data['roles']:
security.datastore.add_role_to_user(user, role)
security.datastore.commit()
self.user = user
else:
self.password.errors = ['Invalid password']
else:
# this means there is no user with that email in the database
# we assume that the username was entered instead of an email and
# use that for authentication with LDAP
username = self.email.data
# try LDAP authorization and create a new user if it succeeds
authorized, new_user_data = self.validate_ldap()
if authorized:
# if there was no user in the database before we create a new
# user
self.user = security.datastore.create_user(
username=new_user_data['username'],
email=new_user_data['email'],
password=new_user_data['password'],
roles=new_user_data['roles']
)
security.datastore.commit()
current_app.logger.info(
f"New admin user '{new_user_data['username']} "
f"<{new_user_data['email']}>' created after successful "
f"LDAP authorization"
)
# if any of the authorization methods is successful we authorize
# the user
return authorized
def validate_ldap(self):
"""Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password
is authorized. Then the permissions and additional information of the
user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update
the entry for the user in the local database.
Parameters
----------
Returns
-------
bool : result of the authorization process (True = success,
False = failure)
dict : dictionary with information about an authorized user
(contains username, email, hashed password, roles)
"""
ldap_server = ldap3.Server(current_app.config['LDAP_URL'])
ldap_user_group = current_app.config['LDAP_USER_GROUP']
ldap_domain = current_app.config['LDAP_DOMAIN']
ldap_domain_ext = current_app.config['LDAP_DOMAIN_EXT']
username = self.email.data
password = self.password.data
try:
user = f"uid={username},ou={ldap_user_group},dc={ldap_domain}," \
f"dc={ldap_domain_ext}"
con = ldap3.Connection(
ldap_server,
user=user,
password=password,
auto_bind=True
)
except ldap3.core.exceptions.LDAPBindError as e:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError as e:
# server not reachable -> fail (but will try authorization from
# local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
return False, None
# get user data and permissions from LDAP server
new_user_data = {}
new_user_data['username'] = username
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
search_base = f"ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
search_filter = f"(&(uid={username})(memberof=cn=Keyholders," \
f"ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))"
lock_permission = con.search(
search_base, search_filter, attributes=ldap3.ALL_ATTRIBUTES
)
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
return False, None
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,' \
f'dc={ldap_domain},dc={ldap_domain_ext}))'
token_granting_permission = con.search(search_base, search_filter)
if token_granting_permission:
new_user_data['roles'].append('admin')
return True, new_user_data

View File

@ -1,11 +0,0 @@
#!/usr/bin/env python3
from imaginaerraum_door_admin import create_app
def main():
app = create_app()
app.run(host='0.0.0.0', port=app.config.get('PORT', 80))
if __name__ == "__main__":
main()

View File

@ -1,61 +0,0 @@
import bleach
from flask_security import uia_email_mapper
print("loading default config")
def uia_username_mapper(identity):
# we allow pretty much anything - but we bleach it.
return bleach.clean(identity, strip=True)
class DefaultConfig(object):
DEBUG = False
PORT = 80
SECRET_KEY = 'supersecret'
SECURITY_PASSWORD_SALT = 'salty'
SECURITY_REGISTERABLE = False
SECURITY_CHANGEABLE = True
SECURITY_RECOVERABLE = True
SECURITY_SEND_PASSWORD_CHANGE_EMAIL = False
SECURITY_POST_LOGIN_VIEW = '/'
SECURITY_EMAIL_SUBJECT_PASSWORD_RESET = 'Passwort zurücksetzen'
SECURITY_EMAIL_SUBJECT_PASSWORD_NOTICE = 'Passwort wurde zurückgesetzt'
SECURITY_PASSWORD_LENGTH_MIN = 10
SECURITY_USER_IDENTITY_ATTRIBUTES = [
{"email": {"mapper": uia_email_mapper, "case_insensitive": True}},
{"username": {"mapper": uia_username_mapper}}
]
# mail configuration
MAIL_SERVER = ''
MAIL_PORT = 465
MAIL_USE_SSL = True
MAIL_USERNAME = ''
MAIL_PASSWORD = ''
MAIL_DEFAULT_SENDER = ''
SQLALCHEMY_DATABASE_URI = 'sqlite:///admin.db'
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
# underlying engine. This option makes sure that DB connections from the
# pool are still valid. Important for entire application since
# many DBaaS options automatically close idle connections.
SQLALCHEMY_ENGINE_OPTIONS = {
"pool_pre_ping": True,
}
TOKEN_FILE = "door_tokens"
ADMIN_FILE = 'admins'
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
LDAP_USER_GROUP = 'Users'
LDAP_DOMAIN = 'imaginaerraum'
LDAP_DOMAIN_EXT = 'de'
NFC_SOCKET = "/tmp/nfc.sock"
LOG_FILE = "webinterface.log"
NFC_LOG = "nfc.log"
MQTT_HOST = '10.10.21.2'

View File

@ -4,22 +4,23 @@ from pathlib import Path
import logging import logging
from datetime import datetime from datetime import datetime
class DoorHandle: class DoorHandle:
def __init__(self, token_file, mqtt_host, mqtt_port=1883, def __init__(self, token_file, mqtt_host, mqtt_port=1883, nfc_socket='/tmp/nfc.sock', logger=None):
nfc_socket='/tmp/nfc.sock', logger=None):
self.state = None self.state = None
self.encoder_position = None self.encoder_position = None
if not Path(token_file).exists(): if not Path(token_file).exists():
raise FileNotFoundError( raise FileNotFoundError(f"File with door tokens could not be found at {Path(token_file).absolute()}")
"File with door tokens could not be found at "
f"{Path(token_file).absolute()}"
)
self.token_file = token_file self.token_file = token_file
self.last_invalid = {} self.last_invalid = {}
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
self.mqtt_client.loop_start()
if logger: if logger:
self.logger = logger self.logger = logger
else: else:
@ -30,18 +31,10 @@ class DoorHandle:
self.nfc_sock.connect(nfc_socket) self.nfc_sock.connect(nfc_socket)
self.logger.info(f"Connected to NFC socket at {nfc_socket}.") self.logger.info(f"Connected to NFC socket at {nfc_socket}.")
except Exception as e: except Exception as e:
self.logger.error(f"Could not connect to NFC socket at {nfc_socket}. " print(f"Could not connect to NFC socket at {nfc_socket}. Exception: {e}")
f"Exception: {e}")
self.nfc_sock = None self.nfc_sock = None
#raise #raise
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
self.mqtt_client.loop_start()
self.logger.info(f"Connected to MQTT broker at {mqtt_host}:{mqtt_port}")
self.data_fields = ['name', 'organization', 'email', 'valid_thru'] self.data_fields = ['name', 'organization', 'email', 'valid_thru']
# The callback for when the client receives a CONNACK response from the server. # The callback for when the client receives a CONNACK response from the server.
@ -50,7 +43,7 @@ class DoorHandle:
# Subscribing in on_connect() means that if we lose the connection and # Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed. # reconnect then subscriptions will be renewed.
client.subscribe("door/#") client.subscribe("#")
# The callback for when a PUBLISH message is received from the server. # The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg): def on_message(self, client, userdata, msg):
@ -77,13 +70,11 @@ class DoorHandle:
email = data[3].strip() if len(data) > 3 else None email = data[3].strip() if len(data) > 3 else None
valid_thru = data[4].strip() if len(data) > 4 else None valid_thru = data[4].strip() if len(data) > 4 else None
tokens[token] = { tokens[token] = {'name': name,
'name': name,
'organization': organization, 'organization': organization,
'email': email, 'email': email,
'valid_thru': valid_thru, 'valid_thru': valid_thru,
'inactive': inactive 'inactive': inactive}
}
return tokens return tokens
def store_tokens(self, tokens): def store_tokens(self, tokens):
@ -107,15 +98,13 @@ class DoorHandle:
if self.nfc_sock is not None: if self.nfc_sock is not None:
self.nfc_sock.send(b'open ' + user.encode() + b'\n') self.nfc_sock.send(b'open ' + user.encode() + b'\n')
else: else:
self.logger.error("No connection to NFC socket. Cannot open door!") raise Exception("No connection to NFC socket. Cannot close door!")
raise RuntimeError("No connection to NFC socket. Cannot open door!")
def close_door(self, user=''): def close_door(self, user=''):
if self.nfc_sock is not None: if self.nfc_sock is not None:
self.nfc_sock.send(b'close ' + user.encode() + b'\n') self.nfc_sock.send(b'close ' + user.encode() + b'\n')
else: else:
self.logger.error("No connection to NFC socket. Cannot close door!") raise Exception("No connection to NFC socket. Cannot close door!")
raise RuntimeError("No connection to NFC socket. Cannot close door!")
def get_most_recent_token(self): def get_most_recent_token(self):
# read last invalid token from logfile # read last invalid token from logfile

View File

@ -1,46 +0,0 @@
from flask import flash
from flask_wtf import FlaskForm
from wtforms.fields import DateField, EmailField
from wtforms.fields import StringField, BooleanField
from wtforms.validators import DataRequired, ValidationError, EqualTo
from datetime import date
def validate_valid_thru_date(form, field):
if form.limit_validity.data:
# only check date format if limited validity of token is set
try:
if not field.data >= date.today():
raise ValueError
except ValueError as e:
flash("Ungültiges Datum")
raise ValidationError
return True
class TokenForm(FlaskForm):
name = StringField("Name", validators=[DataRequired()])
email = EmailField("E-Mail", validators=[DataRequired()])
organization = StringField("Organization", validators=[DataRequired()])
limit_validity = BooleanField("Gültigkeit begrenzen?")
valid_thru = DateField("Gültig bis", validators=[validate_valid_thru_date])
active = BooleanField("Aktiv?")
dsgvo = BooleanField(
"Einwilligung Nutzungsbedingungen erfragt?", validators=[DataRequired()]
)
class ConfirmDeleteForm(FlaskForm):
name = StringField(
"Name",
validators=[
DataRequired(),
EqualTo("name_confirm", "Name stimmt nicht überein"),
],
)
name_confirm = StringField("Name confirm")
class AdminCreationForm(FlaskForm):
name = StringField("Name", validators=[DataRequired()])
email = EmailField("E-Mail", validators=[DataRequired()])

View File

@ -27,15 +27,15 @@
{% endfor %} {% endfor %}
<td> <td>
{% if not data['super_admin'] %} {% if not data['super_admin'] %}
<a href="{{ url_for('door_app.admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a> <a href="{{ url_for('admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
<a href="{{ url_for('door_app.delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a> <a href="{{ url_for('delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
{% endif %} {% endif %}
{% if data['admin'] %} {% if data['admin'] %}
{% if not data['super_admin'] %} {% if not data['super_admin'] %}
<a href="{{ url_for('door_app.demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a> <a href="{{ url_for('demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a>
{% endif %} {% endif %}
{% else %} {% else %}
<a href="{{ url_for('door_app.promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a> <a href="{{ url_for('promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a>
{% endif %} {% endif %}
</td> </td>
</tr> </tr>
@ -68,14 +68,14 @@
</div> </div>
<div class="p-2 bg-light border"> <div class="p-2 bg-light border">
<h3>Nutzerdaten sichern:</h3> <h3>Nutzerdaten sichern:</h3>
<form action="{{ url_for('door_app.backup_user_datastore') }}" method="get"> <form action="{{ url_for('backup_user_datastore') }}" method="get">
<input type="submit" value="Download"> <input type="submit" value="Download">
</form> </form>
</div> </div>
<div class="p-2 bg-light border"> <div class="p-2 bg-light border">
<h3>Nutzerdaten wiederherstellen:</h3> <h3>Nutzerdaten wiederherstellen:</h3>
<form action="{{ url_for('door_app.restore_user_datastore') }}" method=post enctype=multipart/form-data> <form action="{{ url_for('restore_user_datastore') }}" method=post enctype=multipart/form-data>
<input type=file name=file> <input type=file name=file>
<input type=submit value="Abschicken"> <input type=submit value="Abschicken">
</form> </form>

View File

@ -13,7 +13,7 @@
<body> <body>
<nav class="navbar navbar-expand-lg navbar-light bg-light"> <nav class="navbar navbar-expand-lg navbar-light bg-light">
<div class="container-fluid"> <div class="container-fluid">
<a class="navbar-brand" href="{{ url_for('door_app.door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a> <a class="navbar-brand" href="{{ url_for('door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" <button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent"
aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation"> aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span> <span class="navbar-toggler-icon"></span>
@ -29,10 +29,10 @@
Tokens Tokens
</a> </a>
<div class="dropdown-menu" aria-labelledby="navbarDropdown"> <div class="dropdown-menu" aria-labelledby="navbarDropdown">
<a class="dropdown-item" href="{{ url_for('door_app.register') }}">Token Registrierung</a> <a class="dropdown-item" href="{{ url_for('register') }}">Token Registrierung</a>
<a class="dropdown-item" href="{{ url_for('door_app.list_tokens') }}">Token Übersicht</a> <a class="dropdown-item" href="{{ url_for('list_tokens') }}">Token Übersicht</a>
{% if current_user.has_role('super_admin') %} {% if current_user.has_role('super_admin') %}
<a class="dropdown-item" href="{{ url_for('door_app.token_log') }}">Token Log</a> <a class="dropdown-item" href="{{ url_for('token_log') }}">Token Log</a>
{% endif %} {% endif %}
</div> </div>
</li> </li>
@ -40,7 +40,7 @@
{% if current_user.has_role('super_admin') %} {% if current_user.has_role('super_admin') %}
<li class="nav-item"> <li class="nav-item">
<a class="nav-link" href="{{ url_for('door_app.manage_admins') }}">Benutzer verwalten</a> <a class="nav-link" href="{{ url_for('manage_admins') }}">Benutzer verwalten</a>
</li> </li>
{% endif %} {% endif %}

View File

@ -24,11 +24,11 @@
{% if current_user.is_authenticated %} {% if current_user.is_authenticated %}
<div class="row"> <div class="row">
<div class="col-3"> <div class="col-3">
<a href="{{ url_for('door_app.open_door') }}" class="btn btn-success" role="button">Tür öffnen</a> <a href="{{ url_for('open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
</div> </div>
<div class="col-1"></div> <div class="col-1"></div>
<div class="col-3"> <div class="col-3">
<a href="{{ url_for('door_app.close_door') }}" class="btn btn-danger" role="button">Tür schließen</a> <a href="{{ url_for('close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
</div> </div>
<div class="col-5"></div> <div class="col-5"></div>
</div> </div>

View File

@ -23,7 +23,7 @@
und diesen zustimmt. und diesen zustimmt.
</li> </li>
<li>Achtung: Der Token funktioniert nicht sofort, sondern muss erst explizit aktiviert werden! <li>Achtung: Der Token funktioniert nicht sofort, sondern muss erst explizit aktiviert werden!
Dazu in der <a href="{{ url_for('door_app.list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol Dazu in der <a href="{{ url_for('list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol
(<img src="static/edit.png" title="Editieren" alt="Edit">) klicken und den Haken bei "Aktiv?" setzen. (<img src="static/edit.png" title="Editieren" alt="Edit">) klicken und den Haken bei "Aktiv?" setzen.
</li> </li>
<li>Jetzt kann der Token verwendet werden.</li> <li>Jetzt kann der Token verwendet werden.</li>

View File

@ -29,9 +29,9 @@
<td>{{ data[field] if data[field] }}</td> <td>{{ data[field] if data[field] }}</td>
{% endfor %} {% endfor %}
<td> <td>
<a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a> <a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
<a href="{{ url_for('door_app.deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a> <a href="{{ url_for('deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a>
<a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a> <a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
</td> </td>
</tr> </tr>
{% endfor %} {% endfor %}
@ -47,14 +47,14 @@
<td>{{ data[field] if data[field] }}</td> <td>{{ data[field] if data[field] }}</td>
{% endfor %} {% endfor %}
<td> <td>
<a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a> <a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
<a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a> <a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
</td> </td>
</tr> </tr>
{% endfor %} {% endfor %}
</tbody> </tbody>
</table> </table>
<form action="{{ url_for('door_app.backup_tokens') }}" method="get"> <form action="{{ url_for('backup_tokens') }}" method="get">
<input type="submit" value="Token Daten sichern"> <input type="submit" value="Token Daten sichern">
</form> </form>
{% endblock %} {% endblock %}

File diff suppressed because it is too large Load Diff

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
flask~=1.1.2
Flask-Security-Too~=4.0.0
WTForms~=2.3.3
paho-mqtt~=1.5.1
bleach~=3.3.0

View File

@ -15,38 +15,3 @@ classifiers =
Programming Language :: Python :: 3 Programming Language :: Python :: 3
License :: OSI Approved :: GNU General Public License v3 (GPLv3) License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Operating System :: OS Independent Operating System :: OS Independent
[options]
python_requires = >=3.8
install_requires =
bleach
Flask
Flask-Mail
Flask-Security-Too>=5.0.1
Flask-SQLAlchemy
Flask-WTF
email_validator
paho-mqtt
ldap3
wtforms
include_package_data = True
packages = find:
setup_requires =
wheel
tests_require = pytest>=3
zip_safe = False
[options.entry_points]
console_scripts =
launch_webinterface = imaginaerraum_door_admin.bin.launch_webadmin:main
[options.extras_require]
dev =
pytest
pytest-cov
pytest-flask
pytest-mock
flake8
selenium
beautifulsoup4

View File

@ -1,3 +1,17 @@
from setuptools import setup from setuptools import setup
setup() setup(install_requires=[
'bleach',
'Flask',
'Flask-Mail',
'Flask-Security-Too',
'Flask-SQLAlchemy',
'Flask-WTF',
'email_validator',
'paho-mqtt',
'ldap3',
],
include_package_data=True,
scripts=['bin/launch_webadmin'],
packages=['imaginaerraum_door_admin'],
zip_safe=False)

View File

@ -1,24 +0,0 @@
import pytest
import os
from selenium.webdriver import Chrome
from imaginaerraum_door_admin import create_app
os.environ['APPLICATION_SETTINGS'] = os.getcwd() + '/debug_app_config.py'
@pytest.fixture(scope='session')
def app():
"""Fixture to launch the webapp"""
app = create_app()
return app
@pytest.fixture
def browser():
"""Fixture for a selenium browser to access the webapp."""
driver = Chrome()
driver.implicitly_wait(10)
yield driver
driver.quit()

View File

@ -1,56 +0,0 @@
import tempfile
from pathlib import Path
TESTING = True
DEBUG = True
LOG_FILE = 'webapp.log'
token_file = tempfile.NamedTemporaryFile(delete=False).name
Path(token_file).write_text(
"""# token | name | organization | email | valid_thru
#042979fa186280||||
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
"""
)
TOKEN_FILE = str(token_file)
SECRET_KEY = 'supersecret'
SECURITY_PASSWORD_SALT = 'salty'
admin_file = tempfile.NamedTemporaryFile(delete=False).name
Path(admin_file).write_text(
"""# create new super-admin by putting the following data in this file:
# username email ldap/password
# the third column can either be LDAP, in which case the user will be able to authenticate through LDAP
# or if it's not LDAP then it should be the password for the new user which will be stored in the local database
# examples:
gandalf gandalf@shire.me shadowfax
"""
)
ADMIN_FILE = str(admin_file)
db_file = tempfile.NamedTemporaryFile(delete=False).name
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + str(db_file)
token_log_file = tempfile.NamedTemporaryFile(delete=False).name
Path(token_log_file).write_text(
"""1970-01-01 00:00:12,947 - nfc_log - INFO - Loading tokens
1970-01-01 00:00:12,961 - nfc_log - DEBUG - Opening control socket
1970-01-01 00:00:24,259 - nfc_log - INFO - Got connection
1970-01-01 00:00:28,052 - nfc_log - INFO - Connected to mqtt host with result 0
2021-04-17 13:05:09,344 - nfc_log - DEBUG - Control socket opening door
2021-04-17 13:05:09,346 - nfc_log - INFO - Opening the door
2021-04-17 13:05:25,684 - nfc_log - DEBUG - Control socket closing door
2021-04-17 13:05:25,685 - nfc_log - INFO - Closing the door
2021-04-17 13:08:57,121 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
2021-04-17 13:08:57,123 - nfc_log - INFO - Closing the door
2021-04-17 13:09:06,207 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
2021-04-17 13:09:06,209 - nfc_log - INFO - Opening the door
2021-04-17 13:09:09,017 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
2021-04-17 13:09:09,019 - nfc_log - ERROR - Opening the door
2021-04-17 13:09:43,272 - nfc_log - DEBUG - Reloading tokens
"""
)
NFC_LOG = str(token_log_file)

View File

@ -1,154 +0,0 @@
import datetime
import time
import pytest
from pathlib import Path
from imaginaerraum_door_admin.door_handle import DoorHandle
import paho.mqtt.client as mqtt
@pytest.fixture
def nfc_socket(tmp_path):
# mock up a Unix socket server for testing
import threading
import socketserver
class ThreadedRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
data = str(self.request.recv(1024), 'ascii')
cur_thread = threading.current_thread()
response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
try:
self.request.sendall(response)
except BrokenPipeError:
pass
class TreadedUnixServer(socketserver.ThreadingMixIn,
socketserver.UnixStreamServer):
pass
nfc_socket_file = tmp_path / 'nfc.sock'
server = TreadedUnixServer(str(nfc_socket_file), ThreadedRequestHandler)
with server:
nfc_socket = server.server_address
# Start a thread with the server -- that thread will then start one
# more thread for each request
server_thread = threading.Thread(target=server.serve_forever)
# Exit the server thread when the main thread terminates
server_thread.daemon = True
server_thread.start()
print("Server loop running in thread:", server_thread.name)
yield nfc_socket
server.shutdown()
def test_create_door_handle(nfc_socket, tmp_path):
# test with invalid token file
with pytest.raises(FileNotFoundError):
door_handle = DoorHandle(token_file='no_such_file',
mqtt_host='test.mosquitto.org')
token_file = tmp_path / 'door_tokens'
token_file.write_text('')
door_handle = DoorHandle(
token_file=token_file, mqtt_host='test.mosquitto.org',
nfc_socket=nfc_socket
)
door_handle.nfc_sock.close()
@pytest.fixture
def door_handle(nfc_socket, tmp_path):
token_file = tmp_path / 'door_tokens'
token_file.write_text(
"""# token | name | organization | email | valid_thru
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
"""
)
door_handle = DoorHandle(
token_file=token_file, mqtt_host='test.mosquitto.org',
nfc_socket=nfc_socket
)
yield door_handle
door_handle.nfc_sock.close()
@pytest.fixture
def door_handle_no_nfc(tmp_path):
token_file = tmp_path / 'door_tokens'
token_file.write_text('')
door_handle = DoorHandle(
token_file=token_file, mqtt_host='test.mosquitto.org',
)
yield door_handle
def test_store_tokens(door_handle):
new_tokens = {
'042979fa186280': {
'name': 'Pippin',
'organization': 'Hobbits',
'email': 'pippin@shire.me',
'valid_thru': None,
'inactive': False
}
}
door_handle.store_tokens(new_tokens)
token_file = Path(door_handle.token_file)
assert token_file.exists()
assert '042979fa186280' in token_file.read_text()
def test_mqtt_messages(door_handle):
# test sending messages to the door_handle via MQTT and see if the state
# gets updated accordingly
client = mqtt.Client()
con = client.connect('test.mosquitto.org', 1883)
client.publish('door/position/value', 10).wait_for_publish(1)
time.sleep(1)
assert door_handle.encoder_position == 10
client.publish('door/state/value', 'open').wait_for_publish(1)
time.sleep(1)
assert door_handle.state == 'open'
timestamp = datetime.datetime.now().replace(microsecond=0)
token = '042979fa186280'
client.publish(
'door/token/last_invalid',
f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')};{token}"
).wait_for_publish(1)
time.sleep(1)
most_recent_token = door_handle.get_most_recent_token()
assert most_recent_token['timestamp'] == timestamp
assert most_recent_token['token'] == token
client.disconnect()
def test_open_door(door_handle):
door_handle.open_door()
def test_open_door_broken_socket(door_handle_no_nfc):
# test broken nfc_socket connection
with pytest.raises(RuntimeError):
door_handle_no_nfc.open_door()
def test_close_door(door_handle):
door_handle.close_door()
def test_close_door_broken_socket(door_handle_no_nfc):
# test broken nfc_socket connection
with pytest.raises(RuntimeError):
door_handle_no_nfc.close_door()

View File

@ -1,622 +0,0 @@
import datetime
import pytest
from bs4 import BeautifulSoup
from flask_security.utils import lookup_identity
from imaginaerraum_door_admin.door_handle import DoorHandle
from imaginaerraum_door_admin.auth import ExtendedLoginForm
import re
import secrets
import pathlib
import json
def test_login(browser, live_server):
response = browser.get(f'http://localhost:{live_server.port}')
assert '<h1>Space Zugangsverwaltung</h1>' in browser.page_source
response = browser.get(f'http://localhost:{live_server.port}/login')
email_form = browser.find_element_by_xpath('//input[@id="email"]')
email_form.send_keys('gandalf@shire.me')
password_form = browser.find_element_by_xpath('//input[@id="password"]')
password_form.send_keys('shadowfax')
submit_button = browser.find_element_by_xpath('//input[@id="submit"]')
submit_button.click()
assert 'Tür öffnen' in browser.page_source
def extract_csrf_token(response):
soup = BeautifulSoup(response.data, 'html.parser')
csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value']
return csrf_token
def headless_login(client, user='gandalf@shire.me', password='shadowfax'):
# extract csrf token from the login page source
response = client.get('/login', follow_redirects=True)
csrf_token = extract_csrf_token(response)
# send login information
payload = {
'csrf_token': csrf_token,
'email': user,
'password': password
}
return client.post('/login', data=payload, follow_redirects=True)
def test_login_headless(client):
response = headless_login(client)
soup = BeautifulSoup(response.data, 'html.parser')
# make sure login succeeded -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0]
for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
def test_validate_ldap(client, mocker):
# mock ldap Connection object to simulate successful authentication
import ldap3
def init_success(self, *args, **kwargs):
pass
def init_LDAPBindError(self, *args, **kwargs):
raise ldap3.core.exceptions.LDAPBindError()
def init_LDAPSocketOpenError(self, *args, **kwargs):
raise ldap3.core.exceptions.LDAPSocketOpenError()
def init_Exception(self, *args, **kwargs):
raise Exception()
def search_success(self, *args, **kwargs):
return True
def search_failure(self, *args, **kwargs):
return False
mocker.patch.object(ldap3.Connection, '__init__', init_success)
mocker.patch.object(ldap3.Connection, 'search', search_success)
mock_entries = mocker.MagicMock()
mock_entries[0].mail.value = 'user@example.com'
mocker.patch.object(ldap3.Connection, 'entries', mock_entries)
with client.application.app_context():
# test successful login
form = ExtendedLoginForm()
form.email.data = 'user'
form.password.data = 'password'
result = form.validate_ldap()
assert result[0]
assert result[1]['username'] == 'user'
assert result[1]['email'] == 'user@example.com'
assert result[1]['roles'] == ['admin']
# test failing ldap search
mocker.patch.object(ldap3.Connection, 'search', search_failure)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
# test some errors in ldap Connection and authentication
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPBindError)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPSocketOpenError)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
mocker.patch.object(ldap3.Connection, '__init__', init_Exception)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
def test_login_ldap(client, temp_user, mocker):
# mock ldap validation for admin user
def mock_validate(self):
auth = self.email.data == temp_user['username'] and self.password.data == temp_user['password']
user_data = {'username': temp_user['username'],
'email': temp_user['email'],
'roles': ['admin'],
'password': temp_user['password']}
return auth, user_data
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
user = lookup_identity(temp_user['username'])
# remove local role so that ldap authentication is the default
user.roles.pop(0)
# log out admin user
client.get('/logout')
# log in temp user using ldap
response = headless_login(client, user=temp_user['username'],
password=temp_user['password'])
soup = BeautifulSoup(response.data, 'html.parser')
# make sure login succeeded -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0]
for link in soup.findAll('a', attrs={'class': ['btn'],
'role': 'button'})])
def test_login_ldap_new_user(client, mocker):
# mock ldap validation for admin user
def mock_validate(self):
auth = True
user_data = {'username': 'Balrog',
'email': 'balrog@moria.me',
'roles': ['admin'],
'password': 'youshallnotpass'}
return auth, user_data
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
# initially, the Balrog user should not exist
user = lookup_identity('Balrog')
assert user is None
# log in temp user using ldap -> this will succeed and create a local user
response = headless_login(client, user='Balrog',
password='youshallnotpass')
soup = BeautifulSoup(response.data, 'html.parser')
# make sure user is now created locally
user = lookup_identity('Balrog')
assert user is not None
# make sure login succeeded -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0]
for link in soup.findAll('a', attrs={'class': ['btn'],
'role': 'button'})])
@pytest.fixture
def client_authenticated(client):
# log in using admin account for testing
headless_login(client)
yield client
@pytest.mark.parametrize("url,function", [('/open', 'open_door'),
('/close', 'close_door')])
def test_access_door_button(client_authenticated, mocker, url, function):
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
# visit route for open
client_authenticated.get(url)
# make sure the open method was called
getattr(DoorHandle, function).assert_called_once_with(user='gandalf')
@pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')])
def test_access_door_unauthenticated(client, mocker, url, function):
# test for trying to visit opening/closing door while not logged in
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
# visit route for open
response = client.get(url, follow_redirects=True)
# we should get redirected to login page
assert 'login' in response.request.url
# the open door function should not be called
getattr(DoorHandle, function).assert_not_called()
def test_manage_admins(client_authenticated):
# visit admin management page
response = client_authenticated.get('/manage_admins')
assert "Nutzer Übersicht" in response.data.decode()
assert "gandalf" in response.data.decode()
assert "gandalf@shire.me" in response.data.decode()
def create_user(client_authenticated, username, email):
# visit admin management page
response = client_authenticated.get('/manage_admins')
csrf_token = extract_csrf_token(response)
# post data for creating a new admin
payload = {'name': username,
'email': email,
'csrf_token': csrf_token}
response = client_authenticated.post('/manage_admins', data=payload,
follow_redirects=True)
# after the new admin user is created, we should have been redirected to the
# /manage_admin page. there, the password for login is displayed
# we test if the newly created user can log in with that password
# extract password displayed on the page
match = re.search('Passwort (?P<password>.*) um', response.data.decode())
assert match is not None
extracted_password = match['password']
return extracted_password
@pytest.fixture
def temp_user(client_authenticated):
"""Fixture for creating a temporary user for testing"""
username = secrets.token_hex(4)
email = username + '@example.com'
# create user for test
password = create_user(client_authenticated, username, email)
return {'username': username,
'email': email,
'password': password}
@pytest.fixture
def temp_admin(client_authenticated):
"""Fixture for creating a temporary admin user for testing"""
username = secrets.token_hex(4)
email = username + '@example.com'
# create user for test
password = create_user(client_authenticated, username, email)
response = client_authenticated.get(
f"/promote_admin/{username}",
follow_redirects=True)
user = lookup_identity(username)
assert user.has_role('admin')
return {'username': username,
'email': email,
'password': password}
def test_backup_users(client_authenticated, temp_user):
# test with invalid token
response = client_authenticated.get("/backup_user_datastore",
follow_redirects=True)
user_data = json.loads(response.data)
users = [d['username'] for d in user_data]
emails = [d['email'] for d in user_data]
assert temp_user['username'] in users
assert temp_user['email'] in emails
def test_create_admin(client_authenticated):
password = create_user(client_authenticated, 'bilbo', 'bilbo@shire.me')
# log out current user
response = client_authenticated.get('/logout')
# try to log in new user using the extracted password
response = headless_login(client_authenticated, user='bilbo@shire.me',
password=password)
# - see if it works
soup = BeautifulSoup(response.data, 'html.parser')
# make sure login succeeded
# -> username should be displayed
assert 'Benutzer <span>bilbo</span>' in soup.decode()
# -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0] for link in
soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
def test_activate_deactivate_user(temp_user, client_authenticated):
response = client_authenticated.get(
'/admin_toggle_active/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
# deactivate the user
response = client_authenticated.get(
f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True)
# make sure the user is now inactive
user = lookup_identity(temp_user['username'])
assert user is not None
assert not user.active
# activate the user again
client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True)
# now the user should be active again
user = lookup_identity(temp_user['username'])
assert user is not None
assert user.active
# test deactivating super admin
response = client_authenticated.get(f"/admin_toggle_active/gandalf",
follow_redirects=True)
assert 'Super-Admins können nicht deaktiviert werden!' \
in response.data.decode()
def test_delete_admin(temp_user, client_authenticated):
# first we test deleting a non-existing user
response = client_authenticated.post(
'/delete_admins/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
# next, we create a temporary user and try to delete that one
response = client_authenticated.post(
f"/delete_admins/{temp_user['username']}",
follow_redirects=True)
# we need to deactivate the user first
assert 'Bitte den Benutzer zuerst deaktivieren.' in response.data.decode()
# make sure the user still exists
user = lookup_identity(temp_user['username'])
assert user is not None
# deactivate the user and try deleting it again
response = client_authenticated.get(
f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True)
# try deleting it without filling in the confirmation form
response = client_authenticated.post(
f"/delete_admins/{temp_user['username']}",
follow_redirects=True)
assert 'Der eingegebene Nutzername stimmt nicht überein' \
in response.data.decode()
# make sure the user still exists
user = lookup_identity(temp_user['username'])
assert user is not None
# now we send the confirmation data with the request
response = client_authenticated.get(
f"/delete_admins/{temp_user['username']}",
follow_redirects=True)
csrf_token = extract_csrf_token(response)
payload = {'name': temp_user['username'], 'csrf_token': csrf_token}
response = client_authenticated.post(
f"/delete_admins/{temp_user['username']}",
data=payload,
follow_redirects=True)
assert f"Benutzer {temp_user['username']} wurde gelöscht." in response.data.decode()
# make sure the user now is gone
user = lookup_identity(temp_user['username'])
assert user is None
def test_promote_user(temp_user, client_authenticated):
# first we test with a non-existing user
response = client_authenticated.get(
'/promote_admin/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
user = lookup_identity(temp_user['username'])
assert user is not None
assert not user.has_role('admin')
# grant admin permissions to test user
response = client_authenticated.get(
f"/promote_admin/{temp_user['username']}",
follow_redirects=True)
assert user.has_role('admin')
# try granting admin permissions again
response = client_authenticated.get(
f"/promote_admin/{temp_user['username']}",
follow_redirects=True)
assert f"Benutzer {temp_user['username']} hat bereits Admin-Rechte!"
assert user.has_role('admin')
def test_demote_user(temp_admin, client_authenticated):
# first we test with a non-existing user
response = client_authenticated.get(
'/demote_admin/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
user = lookup_identity(temp_admin['username'])
assert user.has_role('admin')
# try removing admin permissions
response = client_authenticated.get(
f"/demote_admin/{temp_admin['username']}",
follow_redirects=True)
assert not user.has_role('admin')
# try removing admin permissions
response = client_authenticated.get(
f"/demote_admin/{temp_admin['username']}",
follow_redirects=True)
assert f"Benutzer {temp_admin['username']} ist bereits kein Admin!" \
in response.data.decode()
assert not user.has_role('admin')
# try removing admin permissions from superadmin
response = client_authenticated.get(
f"/demote_admin/gandalf",
follow_redirects=True)
assert "hat Super-Admin-Rechte und kann nicht verändert werden!" \
in response.data.decode()
def test_list_tokens(client_authenticated):
response = client_authenticated.get(f"/tokens", follow_redirects=True)
# make sure the names for the test tokens are displayed
assert all([user in response.data.decode()
for user in ['Gandalf', 'Bilbo', 'Gimli']])
def test_token_log(client_authenticated):
response = client_authenticated.get(f"/token-log", follow_redirects=True)
page_src = response.data.decode()
# perform some checks for displayed log data
assert all([msg in page_src for msg in
['INFO', 'DEBUG', 'ERROR']])
assert "Valid token 04538cfa186280 of Gandalf" in page_src
assert "2021-04-17 13:09:06,207" in page_src
def test_backup_tokens(client_authenticated):
# test with invalid token
response = client_authenticated.get(f"/backup_tokens",
follow_redirects=True)
token_data = json.loads(response.data)
assert {'04387cfa186280', '043a81fa186280', '04538cfa186280',
'042979fa186280'}.issubset(token_data.keys())
def test_register_token(client_authenticated, mocker):
# test to make sure message is displayed when no tokens were recently
# scanned
response = client_authenticated.get(f"/register-token",
follow_redirects=True)
page_src = response.data.decode()
assert 'Keine unregistrierten Tokens in MQTT Nachrichten.' in page_src
# mockup scanned token
mocker.patch(
'imaginaerraum_door_admin.door_handle.DoorHandle.get_most_recent_token',
lambda x: {'timestamp': datetime.datetime.now(),
'token': '042979fa181280'})
response = client_authenticated.get(f"/register-token", follow_redirects=True)
csrf_token = extract_csrf_token(response)
page_src = response.data.decode()
assert 'Unregistrierter Token gelesen' in page_src
assert '042979fa181280' in page_src
# try registering with incomplete data
response = client_authenticated.post(f"/register-token", data={},
follow_redirects=True)
page_src = response.data.decode()
assert 'Token konnte nicht registiert werden' in page_src
# register the mocked token
payload = {'name': 'Legolas',
'organization': 'Elves',
'email': 'legolas@mirkwood.me',
'dsgvo': True,
'csrf_token': csrf_token}
response = client_authenticated.post(f"/register-token", data=payload,
follow_redirects=True)
page_src = response.data.decode()
# make sure the user info for the new token is displayed
assert '042979fa181280' in page_src
assert 'Legolas' in page_src
assert 'Elves' in page_src
assert 'legolas@mirkwood.me' in page_src
# check that the token is created in the token file
token_data = pathlib.Path(
client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '042979fa181280' in token_data
assert 'Legolas' in token_data
def test_edit_token(client_authenticated):
# test with invalid token
response = client_authenticated.get(f"/edit-token/nosuchtoken",
follow_redirects=True)
page_src = response.data.decode()
assert 'Ungültiger Token' in page_src
response = client_authenticated.post(f"/edit-token/nosuchtoken",
follow_redirects=True)
page_src = response.data.decode()
assert 'Token konnte nicht editiert werden' in page_src
# test using a valid token from the token file
response = client_authenticated.get(f"/edit-token/04538cfa186280",
follow_redirects=True)
csrf_token = extract_csrf_token(response)
payload = {
'name': 'Balin',
'organization': 'Dwarves',
'email': 'balin@erebor.me',
'active': True,
'limit_validity': False,
'valid_thru': datetime.date.today(),
'csrf_token': csrf_token
}
response = client_authenticated.post(f"/edit-token/04538cfa186280",
data=payload,
follow_redirects=True)
page_src = response.data.decode()
# make sure the new user info for the token is displayed
assert '04538cfa186280' in page_src
assert 'Balin' in page_src
assert 'Dwarves' in page_src
assert 'balin@erebor.me' in page_src
# check that the token is changed in the token file
token_data = pathlib.Path(client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '04538cfa186280' in token_data
assert 'Balin' in token_data
def test_delete_token(client_authenticated):
token_data = pathlib.Path(
client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '04538cfa186280' in token_data
# test with invalid token
response = client_authenticated.get(f"/delete-token/nosuchtoken",
follow_redirects=True)
page_src = response.data.decode()
assert 'Ungültiger Token' in page_src
# test using a valid token from the token file
response = client_authenticated.get(f"/delete-token/043a81fa186280",
follow_redirects=True)
csrf_token = extract_csrf_token(response)
# try deleting without form data
response = client_authenticated.post(f"/delete-token/043a81fa186280",
follow_redirects=True)
page_src = response.data.decode()
assert "wurde nicht gelöscht" in page_src
payload = {
'name': 'Bilbo',
'csrf_token': csrf_token
}
response = client_authenticated.post(f"/delete-token/043a81fa186280",
data=payload,
follow_redirects=True)
page_src = response.data.decode()
print(page_src)
assert "wurde gelöscht" in page_src
# check that the token is now gone from the token file
token_data = pathlib.Path(client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '043a81fa186280' not in token_data
def test_deactivate_token(client_authenticated):
token_data = pathlib.Path(
client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '04387cfa186280' in token_data
# test with invalid token
response = client_authenticated.get(f"/deactivate-token/nosuchtoken",
follow_redirects=True)
page_src = response.data.decode()
assert 'Ungültiger Token' in page_src
# deactivate token
response = client_authenticated.get(f"/deactivate-token/04387cfa186280",
follow_redirects=True)
# check that the token is now gone from the token file
token_data = pathlib.Path(
client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '#04387cfa186280' in token_data