Compare commits

..

15 Commits

Author SHA1 Message Date
2879a69445 make database and security objects global so we can access them in routes 2022-01-29 23:48:58 +01:00
97957e389c added tests for admin management operations, streamlined testing a bit 2022-01-29 23:46:20 +01:00
8f8bdb8cc3 added instructions for running tests 2022-01-29 23:45:31 +01:00
b64b0c7bb6 access config from current_app object 2022-01-27 23:57:06 +01:00
b3c585bd27 updated routes for blueprint based app 2022-01-27 23:56:48 +01:00
42345273dd removed requirements.txt now that we have setup.cfg 2022-01-27 23:49:32 +01:00
ace5868571 added test configuration fixtures 2022-01-27 23:48:31 +01:00
a104a3d00f attach door object to flask application and use application's logger 2022-01-27 23:46:45 +01:00
ff9d21bcd5 moved creation of initial admin user to separate function 2022-01-27 23:45:17 +01:00
38164aca4b started refactoring:
- use blueprint
- read configuration from file (default_app_config.py) and additional file specified by APPLICATION_SETTINGS environment variable
2022-01-25 21:42:35 +01:00
ba9379449a added tests for opening and closing the door via button 2022-01-25 19:38:19 +01:00
bb022fd1ce added login tests (headless and with selenium) 2022-01-25 00:09:14 +01:00
e0a22f770d added testing dependencies 2022-01-25 00:08:25 +01:00
03e7425b2a newer versions of wtforms use html5 by default 2022-01-23 12:33:57 +01:00
b4f9e4525b use setup.cfg based package setup script 2022-01-23 12:33:20 +01:00
15 changed files with 953 additions and 703 deletions

View File

@ -1 +1,9 @@
Flask-based web interface for user token adminstration of our hackerspace's door lock. Flask-based web interface for user token adminstration of our hackerspace's door lock.
# Development
```shell
cd tests
export APPLICATION_SETTINGS=/home/simon/imaginaerraum/door-lock/webinterface/tests/debug_app_config.py
pytest --cov=../imaginaerraum_door_admin --cov-report=html --cov-report=term
```

View File

@ -0,0 +1,170 @@
import logging
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_security.models import fsqla_v2 as fsqla
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
from email_validator import validate_email
import ldap3
from pathlib import Path
#from .webapp import door_app
from .door_handle import DoorHandle
from .auth import ExtendedLoginForm
security = Security()
db = SQLAlchemy()
# create admin users (only if they don't exists already)
def create_super_admins(app, db, user_datastore, logger):
# setup user database when starting the app
with app.app_context():
new_admin_data = []
if app.config['ADMIN_FILE'] is not None:
if not Path(app.config['ADMIN_FILE']).exists():
logger.warning(
f"Admin user creation file not found at {app.config['ADMIN_FILE']}")
else:
# store data for new admins in memory s.t. the file can be deleted afterwards
with open(app.config['ADMIN_FILE']) as f:
for i, line in enumerate(f.readlines()):
if not line.strip().startswith('#'):
try:
user, email, pw = line.split()
validate_email(email)
new_admin_data.append(
{'username': user, 'email': email,
'password': pw})
except Exception as e:
print(
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
db.create_all()
super_admin_role = user_datastore.find_or_create_role(
'super_admin') # root admin = can create other admins
admin_role = user_datastore.find_or_create_role(
'admin') # 'normal' admin
local_role = user_datastore.find_or_create_role(
'local') # LDAP user or local user
for d in new_admin_data:
if user_datastore.find_user(email=d['email'],
username=d['username']) is None:
roles = [super_admin_role, admin_role]
if not d['password'] == 'LDAP':
roles.append(local_role)
logger.info(
f"New super admin user created with username '{d['username']}' and email '{d['email']}', roles = {[r.name for r in roles]}")
# create new admin (only if admin does not already exist)
new_admin = user_datastore.create_user(email=d['email'],
username=d[
'username'],
password=hash_password(
d['password']),
roles=roles)
db.session.commit()
def setup_logging(app):
# set up logging for the web app
logger = logging.getLogger('webapp')
logger.setLevel(logging.INFO)
if app.config['LOG_FILE'] is not None:
ch = logging.FileHandler(app.config['LOG_FILE'])
ch.setLevel(logging.INFO)
else:
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
return logger
def create_app():
app = Flask(__name__)
app.config.from_object('imaginaerraum_door_admin.default_app_config.DefaultConfig')
app.config.from_envvar('APPLICATION_SETTINGS')
logger = setup_logging(app)
# do some checks for file existence etc.
try:
with open(app.config['KEY_FILE']) as f:
data = f.readlines()
if 'SECRET_KEY' in data[0]:
secret_key = data[0].split()[-1]
else:
raise Exception("Could not read SECURITY_PASSWORD_SALT")
if 'SECURITY_PASSWORD_SALT' in data[1]:
security_password_salt = data[1].split()[-1]
else:
raise Exception("Could not read SECURITY_PASSWORD_SALT")
except Exception as e:
logger.warning(
f"Flask keys could not be read from file at {Path(app.config['KEY_FILE']).absolute()}. Exception: {e}. Using default values instead.")
secret_key = 'Q7PJu2fg2jabYwP-Psop6c6f2G4'
security_password_salt = '10036796768252925167749545152988277953'
if Path(app.config['TEMPLATE_FOLDER']).is_absolute():
if not Path(app.config['TEMPLATE_FOLDER']).exists():
logger.error(
f"Flask template folder not found at {Path(app.config['TEMPLATE_FOLDER']).absolute()}")
else:
if not (Path(__file__).parent / app.config['TEMPLATE_FOLDER']).exists():
logger.error(
f"Flask template folder not found at {(Path(__file__).parent / app.config['TEMPLATE_FOLDER']).absolute()}")
if Path(app.config['STATIC_FOLDER']).is_absolute():
if not Path(app.config['STATIC_FOLDER']).exists():
logger.error(
f"Flask static folder not found at {Path(app.config['STATIC_FOLDER']).absolute()}")
else:
if not (Path(__file__).parent / app.config['STATIC_FOLDER']).exists():
logger.error(
f"Flask static folder not found at {(Path(__file__).parent / app.config['STATIC_FOLDER']).absolute()}")
if not Path(app.config['TOKEN_FILE']).exists():
logger.warning(
f"Token file not found at {Path(app.config['TOKEN_FILE']).absolute()}")
# create door objects which provides access to the token file and current door state via MQTT
app.door = DoorHandle(token_file=app.config['TOKEN_FILE'], mqtt_host=app.config['MQTT_HOST'],
nfc_socket=app.config['NFC_SOCKET'],
logger=logger)
# Mail Config
#mail = Mail(app)
# Create database connection object
db.init_app(app)
# Define models
fsqla.FsModels.set_db_info(db)
class Role(db.Model, fsqla.FsRoleMixin):
pass
class User(db.Model, fsqla.FsUserMixin):
pass
from . webapp import door_app
app.register_blueprint(door_app)
ldap_server = ldap3.Server(app.config['LDAP_URL'])
# Setup Flask-Security
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
create_super_admins(app, db, user_datastore, logger)
return app

View File

@ -0,0 +1,112 @@
from wtforms.fields import StringField, BooleanField
from flask import current_app
from flask_security import hash_password
from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import find_user
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
class ExtendedLoginForm(LoginForm):
email = StringField('Benutzername oder E-Mail', [Required()])
password = PasswordField('Passwort', [Required()])
remember = BooleanField('Login merken?')
def validate(self):
# search for user in the current database
user = find_user(self.email.data)
if user is not None:
# if a user is found we check if it is associated with LDAP or with the local database
if user.has_role('local'):
# try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate()
if authorized:
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database")
else:
# run LDAP authorization
# if the authorization succeeds we also get the new_user_data dict which contains information about
# the user's permissions etc.
authorized, new_user_data = validate_ldap(user.username, self.password.data)
if authorized:
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through LDAP")
# update permissions and password/email to stay up to date for login with no network connection
user.email = new_user_data['email']
user.password = new_user_data['password']
for role in new_user_data['roles']:
user_datastore.add_role_to_user(user, role)
user_datastore.commit()
self.user = user
else:
self.password.errors = ['Invalid password']
else:
# this means there is no user with that email in the database
# we assume that the username was entered instead of an email and use that for authentication with LDAP
username = self.email.data
# try LDAP authorization and create a new user if it succeeds
authorized, new_user_data = validate_ldap(username, self.password.data)
if authorized:
# if there was no user in the database before we create a new user
self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
password=new_user_data['password'], roles=new_user_data['roles'])
user_datastore.commit()
current_app.logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
" successful LDAP authorization")
# if any of the authorization methods is successful we authorize the user
return authorized
def validate_ldap(username, password):
"""Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized.
Then the permissions and additional information of the user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update the entry for the user in the local
database.
Parameters
----------
username : username for the LDAP server
password : password for the LDAP server
Returns
-------
bool : result of the authorization process (True = success, False = failure)
dict : dictionary with information about an authorized user (contains username, email, hashed password,
roles)
"""
try:
con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError:
# server not reachable -> fail (but will try authorization from local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
return False, None
# get user data and permissions from LDAP server
new_user_data = {}
new_user_data['username'] = username
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))',
attributes=ldap3.ALL_ATTRIBUTES)
authorized = True
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
authorized = False
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
if token_granting_permission:
new_user_data['roles'].append('admin')
return authorized, new_user_data

View File

@ -0,0 +1,55 @@
print("loading default config")
import bleach
from flask_security import uia_email_mapper
def uia_username_mapper(identity):
# we allow pretty much anything - but we bleach it.
return bleach.clean(identity, strip=True)
class DefaultConfig(object):
DEBUG = False
SECRET_KEY = 'supersecret'
TEMPLATE_FOLDER = 'templates'
STATIC_FOLDER = 'static'
SECURITY_REGISTERABLE = False
SECURITY_CHANGEABLE = True
SECURITY_RECOVERABLE = True
SECURITY_SEND_PASSWORD_CHANGE_EMAIL = False
SECURITY_POST_LOGIN_VIEW = '/'
SECURITY_EMAIL_SUBJECT_PASSWORD_RESET = 'Passwort zurücksetzen'
SECURITY_EMAIL_SUBJECT_PASSWORD_NOTICE = 'Passwort wurde zurückgesetzt'
SECURITY_PASSWORD_LENGTH_MIN = 10
SECURITY_USER_IDENTITY_ATTRIBUTES = [
{"email": {"mapper": uia_email_mapper, "case_insensitive": True}},
{"username": {"mapper": uia_username_mapper}}
]
# mail configuration
MAIL_SERVER = ''
MAIL_PORT = 465
MAIL_USE_SSL = True
MAIL_USERNAME = ''
MAIL_PASSWORD = ''
MAIL_DEFAULT_SENDER = ''
SQLALCHEMY_DATABASE_URI = 'sqlite:///admin.db'
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
# underlying engine. This option makes sure that DB connections from the
# pool are still valid. Important for entire application since
# many DBaaS options automatically close idle connections.
SQLALCHEMY_ENGINE_OPTIONS = {
"pool_pre_ping": True,
}
KEY_FILE = '/root/flask_keys'
TOKEN_FILE = "/etc/door_tokens"
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
NFC_SOCKET = "/tmp/nfc.sock"
LOG_FILE = "/var/log/webinterface.log"
NFC_LOG = "/var/log/nfc.log"
MQTT_HOST = '10.10.21.2'

View File

@ -27,15 +27,15 @@
{% endfor %} {% endfor %}
<td> <td>
{% if not data['super_admin'] %} {% if not data['super_admin'] %}
<a href="{{ url_for('admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a> <a href="{{ url_for('door_app.admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
<a href="{{ url_for('delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a> <a href="{{ url_for('door_app.delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
{% endif %} {% endif %}
{% if data['admin'] %} {% if data['admin'] %}
{% if not data['super_admin'] %} {% if not data['super_admin'] %}
<a href="{{ url_for('demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a> <a href="{{ url_for('door_app.demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a>
{% endif %} {% endif %}
{% else %} {% else %}
<a href="{{ url_for('promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a> <a href="{{ url_for('door_app.promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a>
{% endif %} {% endif %}
</td> </td>
</tr> </tr>
@ -68,14 +68,14 @@
</div> </div>
<div class="p-2 bg-light border"> <div class="p-2 bg-light border">
<h3>Nutzerdaten sichern:</h3> <h3>Nutzerdaten sichern:</h3>
<form action="{{ url_for('backup_user_datastore') }}" method="get"> <form action="{{ url_for('door_app.backup_user_datastore') }}" method="get">
<input type="submit" value="Download"> <input type="submit" value="Download">
</form> </form>
</div> </div>
<div class="p-2 bg-light border"> <div class="p-2 bg-light border">
<h3>Nutzerdaten wiederherstellen:</h3> <h3>Nutzerdaten wiederherstellen:</h3>
<form action="{{ url_for('restore_user_datastore') }}" method=post enctype=multipart/form-data> <form action="{{ url_for('door_app.restore_user_datastore') }}" method=post enctype=multipart/form-data>
<input type=file name=file> <input type=file name=file>
<input type=submit value="Abschicken"> <input type=submit value="Abschicken">
</form> </form>

View File

@ -13,7 +13,7 @@
<body> <body>
<nav class="navbar navbar-expand-lg navbar-light bg-light"> <nav class="navbar navbar-expand-lg navbar-light bg-light">
<div class="container-fluid"> <div class="container-fluid">
<a class="navbar-brand" href="{{ url_for('door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a> <a class="navbar-brand" href="{{ url_for('door_app.door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" <button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent"
aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation"> aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span> <span class="navbar-toggler-icon"></span>
@ -29,10 +29,10 @@
Tokens Tokens
</a> </a>
<div class="dropdown-menu" aria-labelledby="navbarDropdown"> <div class="dropdown-menu" aria-labelledby="navbarDropdown">
<a class="dropdown-item" href="{{ url_for('register') }}">Token Registrierung</a> <a class="dropdown-item" href="{{ url_for('door_app.register') }}">Token Registrierung</a>
<a class="dropdown-item" href="{{ url_for('list_tokens') }}">Token Übersicht</a> <a class="dropdown-item" href="{{ url_for('door_app.list_tokens') }}">Token Übersicht</a>
{% if current_user.has_role('super_admin') %} {% if current_user.has_role('super_admin') %}
<a class="dropdown-item" href="{{ url_for('token_log') }}">Token Log</a> <a class="dropdown-item" href="{{ url_for('door_app.token_log') }}">Token Log</a>
{% endif %} {% endif %}
</div> </div>
</li> </li>
@ -40,7 +40,7 @@
{% if current_user.has_role('super_admin') %} {% if current_user.has_role('super_admin') %}
<li class="nav-item"> <li class="nav-item">
<a class="nav-link" href="{{ url_for('manage_admins') }}">Benutzer verwalten</a> <a class="nav-link" href="{{ url_for('door_app.manage_admins') }}">Benutzer verwalten</a>
</li> </li>
{% endif %} {% endif %}

View File

@ -24,11 +24,11 @@
{% if current_user.is_authenticated %} {% if current_user.is_authenticated %}
<div class="row"> <div class="row">
<div class="col-3"> <div class="col-3">
<a href="{{ url_for('open_door') }}" class="btn btn-success" role="button">Tür öffnen</a> <a href="{{ url_for('door_app.open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
</div> </div>
<div class="col-1"></div> <div class="col-1"></div>
<div class="col-3"> <div class="col-3">
<a href="{{ url_for('close_door') }}" class="btn btn-danger" role="button">Tür schließen</a> <a href="{{ url_for('door_app.close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
</div> </div>
<div class="col-5"></div> <div class="col-5"></div>
</div> </div>

View File

@ -23,7 +23,7 @@
und diesen zustimmt. und diesen zustimmt.
</li> </li>
<li>Achtung: Der Token funktioniert nicht sofort, sondern muss erst explizit aktiviert werden! <li>Achtung: Der Token funktioniert nicht sofort, sondern muss erst explizit aktiviert werden!
Dazu in der <a href="{{ url_for('list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol Dazu in der <a href="{{ url_for('door_app.list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol
(<img src="static/edit.png" title="Editieren" alt="Edit">) klicken und den Haken bei "Aktiv?" setzen. (<img src="static/edit.png" title="Editieren" alt="Edit">) klicken und den Haken bei "Aktiv?" setzen.
</li> </li>
<li>Jetzt kann der Token verwendet werden.</li> <li>Jetzt kann der Token verwendet werden.</li>

View File

@ -29,9 +29,9 @@
<td>{{ data[field] if data[field] }}</td> <td>{{ data[field] if data[field] }}</td>
{% endfor %} {% endfor %}
<td> <td>
<a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a> <a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
<a href="{{ url_for('deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a> <a href="{{ url_for('door_app.deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a>
<a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a> <a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
</td> </td>
</tr> </tr>
{% endfor %} {% endfor %}
@ -47,14 +47,14 @@
<td>{{ data[field] if data[field] }}</td> <td>{{ data[field] if data[field] }}</td>
{% endfor %} {% endfor %}
<td> <td>
<a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a> <a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
<a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a> <a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
</td> </td>
</tr> </tr>
{% endfor %} {% endfor %}
</tbody> </tbody>
</table> </table>
<form action="{{ url_for('backup_tokens') }}" method="get"> <form action="{{ url_for('door_app.backup_tokens') }}" method="get">
<input type="submit" value="Token Daten sichern"> <input type="submit" value="Token Daten sichern">
</form> </form>
{% endblock %} {% endblock %}

View File

@ -1,30 +1,22 @@
from flask import Flask, render_template, request, flash, redirect, session, send_file from flask import render_template, request, flash, redirect, session, send_file, Blueprint, current_app
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms.fields.html5 import DateField, EmailField from wtforms.fields import DateField, EmailField
from wtforms.fields import StringField, BooleanField from wtforms.fields import StringField, BooleanField
from wtforms.validators import DataRequired, ValidationError, EqualTo from wtforms.validators import DataRequired, ValidationError, EqualTo
from flask_sqlalchemy import SQLAlchemy from flask_security import auth_required, hash_password, \
from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password, uia_email_mapper, \
current_user, roles_required current_user, roles_required
from flask_security.models import fsqla_v2 as fsqla
from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import find_user
from flask_security.views import change_password from flask_security.views import change_password
from flask_mail import Mail
from email_validator import validate_email from email_validator import validate_email
import json import json
import secrets import secrets
import bleach
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
from pathlib import Path from pathlib import Path
import logging
import tempfile import tempfile
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from .door_handle import DoorHandle
from imaginaerraum_door_admin import db, security
def validate_valid_thru_date(form, field): def validate_valid_thru_date(form, field):
@ -57,227 +49,16 @@ class AdminCreationForm(FlaskForm):
name = StringField('Name', validators=[DataRequired()]) name = StringField('Name', validators=[DataRequired()])
email = EmailField('E-Mail', validators=[DataRequired()]) email = EmailField('E-Mail', validators=[DataRequired()])
door_app = Blueprint('door_app', __name__,
def uia_username_mapper(identity): template_folder='templates')
# we allow pretty much anything - but we bleach it.
return bleach.clean(identity, strip=True)
def create_application(config): # we override the change_password view from flask security to only allow local users to change their passwords
# set up logging for the web app # LDAP users should use the LDAP self service for changing passwords
logger = logging.getLogger('webapp') # this route needs to be defined before the Flask Security setup
logger.setLevel(logging.INFO) @door_app.route('/change', methods=['GET', 'POST'])
@auth_required()
if config.log_file is not None: def change_pw():
ch = logging.FileHandler(config.log_file)
ch.setLevel(logging.INFO)
else:
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
# do some checks for file existence etc.
try:
with open(config.key_file) as f:
data = f.readlines()
if 'SECRET_KEY' in data[0]:
secret_key = data[0].split()[-1]
else:
raise Exception("Could not read SECURITY_PASSWORD_SALT")
if 'SECURITY_PASSWORD_SALT' in data[1]:
security_password_salt = data[1].split()[-1]
else:
raise Exception("Could not read SECURITY_PASSWORD_SALT")
except Exception as e:
logger.warning(f"Flask keys could not be read from file at {Path(config.key_file).absolute()}. Exception: {e}. Using default values instead.")
secret_key = 'Q7PJu2fg2jabYwP-Psop6c6f2G4'
security_password_salt = '10036796768252925167749545152988277953'
if Path(config.template_folder).is_absolute():
if not Path(config.template_folder).exists():
logger.error(f'Flask template folder not found at {Path(config.template_folder).absolute()}')
else:
if not (Path(__file__).parent / config.template_folder).exists():
logger.error(f'Flask template folder not found at {(Path(__file__).parent / config.template_folder).absolute()}')
if Path(config.static_folder).is_absolute():
if not Path(config.static_folder).exists():
logger.error(f'Flask static folder not found at {Path(config.static_folder).absolute()}')
else:
if not (Path(__file__).parent / config.static_folder).exists():
logger.error(f'Flask static folder not found at {(Path(__file__).parent / config.static_folder).absolute()}')
if not Path(config.token_file).exists():
logger.warning(f"Token file not found at {Path(config.token_file).absolute()}")
# create door objects which provides access to the token file and current door state via MQTT
door = DoorHandle(token_file=config.token_file, mqtt_host=config.mqtt_host, nfc_socket=config.nfc_socket,
logger=logger)
app = Flask(__name__, template_folder=config.template_folder, static_folder=config.static_folder)
# Generate a nice key using secrets.token_urlsafe()
app.config['SECRET_KEY'] = secret_key
# Bcrypt is set as default SECURITY_PASSWORD_HASH, which requires a salt
# Generate a good salt using: secrets.SystemRandom().getrandbits(128)
app.config['SECURITY_PASSWORD_SALT'] = security_password_salt
app.config['SECURITY_USER_IDENTITY_ATTRIBUTES'] = [
{"email": {"mapper": uia_email_mapper, "case_insensitive": True}},
{"username": {"mapper": uia_username_mapper}}
]
app.config['SECURITY_CHANGEABLE'] = True
app.config['SECURITY_RECOVERABLE'] = True
app.config['SECURITY_SEND_PASSWORD_CHANGE_EMAIL'] = False
# Mail Config
app.config['MAIL_SERVER'] = config.mail_server
app.config['MAIL_PORT'] = config.mail_port
app.config['MAIL_USE_TLS'] = config.mail_use_tls
app.config['MAIL_USE_SSL'] = config.mail_use_ssl
app.config['MAIL_USERNAME'] = config.mail_username
app.config['MAIL_PASSWORD'] = config.mail_password
app.config['MAIL_DEFAULT_SENDER'] = app.config['MAIL_USERNAME']
mail = Mail(app)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///admin.db'
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
# underlying engine. This option makes sure that DB connections from the
# pool are still valid. Important for entire application since
# many DBaaS options automatically close idle connections.
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
"pool_pre_ping": True,
}
# Create database connection object
db = SQLAlchemy(app)
# Define models
fsqla.FsModels.set_db_info(db)
class Role(db.Model, fsqla.FsRoleMixin):
pass
class User(db.Model, fsqla.FsUserMixin):
pass
# LDAP
ldap_server = ldap3.Server(config.ldap_url)
def validate_ldap(username, password):
"""Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized.
Then the permissions and additional information of the user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update the entry for the user in the local
database.
Parameters
----------
username : username for the LDAP server
password : password for the LDAP server
Returns
-------
bool : result of the authorization process (True = success, False = failure)
dict : dictionary with information about an authorized user (contains username, email, hashed password,
roles)
"""
try:
con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError:
# server not reachable -> fail (but will try authorization from local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
return False, None
# get user data and permissions from LDAP server
new_user_data = {}
new_user_data['username'] = username
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))',
attributes=ldap3.ALL_ATTRIBUTES)
authorized = True
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
authorized = False
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
if token_granting_permission:
new_user_data['roles'].append('admin')
return authorized, new_user_data
class ExtendedLoginForm(LoginForm):
email = StringField('Benutzername oder E-Mail', [Required()])
password = PasswordField('Passwort', [Required()])
remember = BooleanField('Login merken?')
def validate(self):
# search for user in the current database
user = find_user(self.email.data)
if user is not None:
# if a user is found we check if it is associated with LDAP or with the local database
if user.has_role('local'):
# try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate()
if authorized:
logger.info(f"User with credentials '{self.email.data}' authorized through local database")
else:
# run LDAP authorization
# if the authorization succeeds we also get the new_user_data dict which contains information about
# the user's permissions etc.
authorized, new_user_data = validate_ldap(user.username, self.password.data)
if authorized:
logger.info(f"User with credentials '{self.email.data}' authorized through LDAP")
# update permissions and password/email to stay up to date for login with no network connection
user.email = new_user_data['email']
user.password = new_user_data['password']
for role in new_user_data['roles']:
user_datastore.add_role_to_user(user, role)
user_datastore.commit()
self.user = user
else:
self.password.errors = ['Invalid password']
else:
# this means there is no user with that email in the database
# we assume that the username was entered instead of an email and use that for authentication with LDAP
username = self.email.data
# try LDAP authorization and create a new user if it succeeds
authorized, new_user_data = validate_ldap(username, self.password.data)
if authorized:
# if there was no user in the database before we create a new user
self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
password=new_user_data['password'], roles=new_user_data['roles'])
user_datastore.commit()
logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
" successful LDAP authorization")
# if any of the authorization methods is successful we authorize the user
return authorized
# we override the change_password view from flask security to only allow local users to change their passwords
# LDAP users should use the LDAP self service for changing passwords
# this route needs to be defined before the Flask Security setup
@app.route('/change', methods=['GET', 'POST'])
@auth_required()
def change_pw():
if current_user.has_role('local'): if current_user.has_role('local'):
# local users can change their password # local users can change their password
return change_password() return change_password()
@ -285,41 +66,41 @@ def create_application(config):
# LDAP users get redirected to the LDAP self service # LDAP users get redirected to the LDAP self service
return redirect('https://ldap.imaginaerraum.de/') return redirect('https://ldap.imaginaerraum.de/')
# Setup Flask-Security
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security = Security(app, user_datastore, login_form=ExtendedLoginForm)
# admin user management # admin user management
@app.route('/manage_admins', methods=['GET', 'POST']) @door_app.route('/manage_admins', methods=['GET', 'POST'])
@roles_required('super_admin') @roles_required('super_admin')
def manage_admins(): def manage_admins():
form = AdminCreationForm() form = AdminCreationForm()
if request.method == 'GET': if request.method == 'GET':
users = user_datastore.user_model.query.all() users = security.datastore.user_model.query.all()
admin_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, admin_data = [{'username': u.username, 'email': u.email, 'active': u.is_active,
'admin': u.has_role('admin'), 'super_admin': u.has_role('super_admin'), 'admin': u.has_role('admin'), 'super_admin': u.has_role('super_admin'),
} for u in users] } for u in users]
return render_template('admins.html', admin_data=admin_data, form=form) return render_template('admins.html', admin_data=admin_data, form=form)
elif form.validate(): elif form.validate():
if user_datastore.find_user(username=form.name.data) is not None or \ if security.datastore.find_user(username=form.name.data) is not None or \
user_datastore.find_user(email=form.email.data) is not None: security.datastore.find_user(email=form.email.data) is not None:
flash("Ein Benutzer mit diesem Nutzernamen oder dieser E-Mail-Adresse existiert bereits!") flash("Ein Benutzer mit diesem Nutzernamen oder dieser E-Mail-Adresse existiert bereits!")
return redirect('/manage_admins')
else: else:
pw = secrets.token_urlsafe(16) pw = secrets.token_urlsafe(16)
new_user = user_datastore.create_user(username=form.name.data, email=form.email.data, new_user = security.datastore.create_user(username=form.name.data, email=form.email.data,
password=hash_password(pw)) password=hash_password(pw))
user_datastore.add_role_to_user(new_user, 'local') security.datastore.add_role_to_user(new_user, 'local')
logger.info( current_app.logger.info(
f"Super admin {current_user.username} created new user account for {new_user.username} <{new_user.email}>") f"Super admin {current_user.username} created new user account for {new_user.username} <{new_user.email}>")
flash(f"Ein Account für den Nutzer {new_user.username} wurde erstellt. Verwende das Passwort {pw} um den Nutzer einzuloggen.") flash(f"Ein Account für den Nutzer {new_user.username} wurde erstellt. Verwende das Passwort {pw} um den Nutzer einzuloggen.")
db.session.commit() db.session.commit()
else:
flash(f"Ungültige Eingabe: {form.errors}")
return redirect('/manage_admins') return redirect('/manage_admins')
@app.route('/delete_admins/<username>', methods=['GET', 'POST'])
@roles_required('super_admin') @door_app.route('/delete_admins/<username>', methods=['GET', 'POST'])
def delete_admins(username): @roles_required('super_admin')
user = user_datastore.find_user(username=username) def delete_admins(username):
user = security.datastore.find_user(username=username)
if user is None: if user is None:
flash(f"Ungültiger Nutzer {username}") flash(f"Ungültiger Nutzer {username}")
return redirect('/manage_admins') return redirect('/manage_admins')
@ -338,52 +119,54 @@ def create_application(config):
# return page asking the user to confirm delete # return page asking the user to confirm delete
return render_template('delete_user.html', username=username, form=form) return render_template('delete_user.html', username=username, form=form)
elif form.validate(): elif form.validate():
user_datastore.delete_user(user) security.datastore.delete_user(user)
flash(f"Benutzer {username} wurde gelöscht.") flash(f"Benutzer {username} wurde gelöscht.")
logger.info(f"Super admin {current_user.username} deleted admin user {username}") current_app.logger.info(f"Super admin {current_user.username} deleted admin user {username}")
db.session.commit() db.session.commit()
return redirect('/manage_admins') return redirect('/manage_admins')
else: else:
flash("Der eingegebene Nutzername stimmt nicht überein. Der Benutzer wurde nicht gelöscht!") flash("Der eingegebene Nutzername stimmt nicht überein. Der Benutzer wurde nicht gelöscht!")
return redirect('/manage_admins') return redirect('/manage_admins')
@app.route('/admin_toggle_active/<username>')
@roles_required('super_admin') @door_app.route('/admin_toggle_active/<username>')
def admin_toggle_active(username): @roles_required('super_admin')
user = user_datastore.find_user(username=username) def admin_toggle_active(username):
user = security.datastore.find_user(username=username)
if user is None: if user is None:
flash(f"Ungültiger Nutzer {username}") flash(f"Ungültiger Nutzer {username}")
return redirect('/manage_admins') return redirect('/manage_admins')
if user.has_role('super_admin'): if user.has_role('super_admin'):
flash('Super-Admins können nicht deaktiviert werden!') flash('Super-Admins können nicht deaktiviert werden!')
return redirect('/manage_admins') return redirect('/manage_admins')
user_datastore.toggle_active(user) security.datastore.toggle_active(user)
if user.is_active: if user.is_active:
logger.info(f"Super admin {current_user.username} activated access for admin user {username}") current_app.logger.info(f"Super admin {current_user.username} activated access for admin user {username}")
else: else:
logger.info(f"Super admin {current_user.username} deactivated access for admin user {username}") current_app.logger.info(f"Super admin {current_user.username} deactivated access for admin user {username}")
db.session.commit() db.session.commit()
return redirect('/manage_admins') return redirect('/manage_admins')
@app.route('/promote_admin/<username>')
@roles_required('super_admin') @door_app.route('/promote_admin/<username>')
def promote_admin(username): @roles_required('super_admin')
user = user_datastore.find_user(username=username) def promote_admin(username):
user = security.datastore.find_user(username=username)
if user is None: if user is None:
flash(f"Ungültiger Nutzer {username}") flash(f"Ungültiger Nutzer {username}")
return redirect('/manage_admins') return redirect('/manage_admins')
if user.has_role('admin'): if user.has_role('admin'):
flash(f'Benutzer {username} hat bereits Admin-Rechte!') flash(f'Benutzer {username} hat bereits Admin-Rechte!')
return redirect('/manage_admins') return redirect('/manage_admins')
user_datastore.add_role_to_user(user, 'admin') security.datastore.add_role_to_user(user, 'admin')
logger.info(f"Super admin {current_user.username} granted admin privileges to user {username}") current_app.logger.info(f"Super admin {current_user.username} granted admin privileges to user {username}")
db.session.commit() db.session.commit()
return redirect('/manage_admins') return redirect('/manage_admins')
@app.route('/demote_admin/<username>') @door_app.route('/demote_admin/<username>')
@roles_required('super_admin') @roles_required('super_admin')
def demote_admin(username): def demote_admin(username):
user = user_datastore.find_user(username=username) user = security.datastore.find_user(username=username)
if user is None: if user is None:
flash(f"Ungültiger Nutzer {username}") flash(f"Ungültiger Nutzer {username}")
return redirect('/manage_admins') return redirect('/manage_admins')
@ -391,18 +174,19 @@ def create_application(config):
flash(f'Benutzer {username} hat Super-Admin-Rechte und kann nicht verändert werden!') flash(f'Benutzer {username} hat Super-Admin-Rechte und kann nicht verändert werden!')
return redirect('/manage_admins') return redirect('/manage_admins')
if user.has_role('admin'): if user.has_role('admin'):
user_datastore.remove_role_from_user(user, 'admin') security.datastore.remove_role_from_user(user, 'admin')
logger.info(f"Super admin {current_user.username} revoked admin privileges of user {username}") current_app.logger.info(f"Super admin {current_user.username} revoked admin privileges of user {username}")
db.session.commit() db.session.commit()
else: else:
flash(f'Benutzer {username} ist bereits kein Admin!') flash(f'Benutzer {username} ist bereits kein Admin!')
return redirect('/manage_admins') return redirect('/manage_admins')
@app.route('/backup_user_datastore')
@roles_required('super_admin') @door_app.route('/backup_user_datastore')
def backup_user_datastore(): @roles_required('super_admin')
def backup_user_datastore():
# get list of defined admin users for backup # get list of defined admin users for backup
users = user_datastore.user_model.query.all() users = security.datastore.user_model.query.all()
user_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, 'password_hash': u.password, user_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, 'password_hash': u.password,
'roles': [r.name for r in u.roles]} 'roles': [r.name for r in u.roles]}
for u in users if not u.has_role('super_admin')] for u in users if not u.has_role('super_admin')]
@ -414,9 +198,10 @@ def create_application(config):
except Exception as e: except Exception as e:
return str(e) return str(e)
@app.route('/restore_user_datastore', methods=['POST'])
@roles_required('super_admin') @door_app.route('/restore_user_datastore', methods=['POST'])
def restore_user_datastore(): @roles_required('super_admin')
def restore_user_datastore():
# check if the post request has the file part # check if the post request has the file part
if 'file' not in request.files: if 'file' not in request.files:
flash('Keine Datei ausgewählt!') flash('Keine Datei ausgewählt!')
@ -443,9 +228,9 @@ def create_application(config):
entry_valid &= type(d['roles']) == list entry_valid &= type(d['roles']) == list
validate_email(d['email']) validate_email(d['email'])
if entry_valid: if entry_valid:
existing_user = user_datastore.find_user(username=d['username'], email=d['email']) existing_user = security.datastore.find_user(username=d['username'], email=d['email'])
if existing_user is None: if existing_user is None:
user_datastore.create_user(username=d['username'], email=d['email'], security.datastore.create_user(username=d['username'], email=d['email'],
password=d['password_hash'], active=d['active'], password=d['password_hash'], active=d['active'],
roles=d['roles']) roles=d['roles'])
flash(f"Account für Benutzer '{d['username']} wurde wiederhergestellt.") flash(f"Account für Benutzer '{d['username']} wurde wiederhergestellt.")
@ -463,38 +248,43 @@ def create_application(config):
else: else:
flash("Ungültige Dateiendung") flash("Ungültige Dateiendung")
return redirect('/manage_admins') return redirect('/manage_admins')
# main page
@app.route('/')
def door_lock():
return render_template('index.html', door_state=door.state, encoder_position=door.encoder_position)
# token overview
@app.route('/tokens') # main page
@roles_required('admin') @door_app.route('/')
def list_tokens(): def door_lock():
tokens = door.get_tokens() return render_template('index.html', door_state=current_app.door.state, encoder_position=current_app.door.encoder_position)
# token overview
@door_app.route('/tokens')
@roles_required('admin')
def list_tokens():
tokens = current_app.door.get_tokens()
assigned_tokens = {t: data for t, data in tokens.items() if not data['inactive']} assigned_tokens = {t: data for t, data in tokens.items() if not data['inactive']}
inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']} inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']}
return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens) return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens)
@app.route('/token-log')
@roles_required('super_admin') @door_app.route('/token-log')
def token_log(): @roles_required('super_admin')
def token_log():
log = [] log = []
try: try:
with open(config.nfc_log) as f: with open(current_app.config['NFC_LOG']) as f:
log += f.readlines() log += f.readlines()
log.reverse() log.reverse()
log = [l.split(' - ') for l in log] log = [l.split(' - ') for l in log]
return render_template('token_log.html', log=log) return render_template('token_log.html', log=log)
except Exception as e: except Exception as e:
flash(f"NFC logfile {Path(config.nfc_log).absolute()} konnte nicht gelesen werden. Exception: {e}") flash(f"NFC logfile {Path(current_app.config['NFC_LOG']).absolute()} konnte nicht gelesen werden. Exception: {e}")
return redirect('/') return redirect('/')
# routes for registering, editing and deleting tokens
@app.route('/register-token', methods=['GET', 'POST']) # routes for registering, editing and deleting tokens
@roles_required('admin') @door_app.route('/register-token', methods=['GET', 'POST'])
def register(): @roles_required('admin')
def register():
"""Register new token for locking and unlocking the door. """Register new token for locking and unlocking the door.
This route displays the most recently scanned invalid token as reported in the logfile and provides a form for This route displays the most recently scanned invalid token as reported in the logfile and provides a form for
@ -503,7 +293,7 @@ def create_application(config):
If the route is called via POST the provided form data is checked and if the check succeeds the /store-token route If the route is called via POST the provided form data is checked and if the check succeeds the /store-token route
will be called which adds the new token to the database. will be called which adds the new token to the database.
""" """
token = door.get_most_recent_token() token = current_app.door.get_most_recent_token()
recent_token = {} recent_token = {}
if {'token', 'timestamp'}.issubset(set(token.keys())): if {'token', 'timestamp'}.issubset(set(token.keys())):
@ -521,7 +311,7 @@ def create_application(config):
return render_template('register.html', token=recent_token, form=form) return render_template('register.html', token=recent_token, form=form)
elif request.method == 'POST' and form.validate(): elif request.method == 'POST' and form.validate():
# store data in session cookie # store data in session cookie
session['token'] = door.get_most_recent_token()['token'] session['token'] = current_app.door.get_most_recent_token()['token']
session['name'] = form.name.data session['name'] = form.name.data
session['email'] = form.email.data session['email'] = form.email.data
session['organization'] = form.organization.data session['organization'] = form.organization.data
@ -534,9 +324,10 @@ def create_application(config):
else: else:
return render_template('register.html', token=recent_token, form=form) return render_template('register.html', token=recent_token, form=form)
@app.route('/edit-token/<token>', methods=['GET', 'POST'])
@roles_required('admin') @door_app.route('/edit-token/<token>', methods=['GET', 'POST'])
def edit_token(token): @roles_required('admin')
def edit_token(token):
"""Edit data in the token file (name, email, valid_thru date, active/inactive). """Edit data in the token file (name, email, valid_thru date, active/inactive).
If the route is accessed via GET it will provide a form for editing the currently stored data for the user. If the route is accessed via GET it will provide a form for editing the currently stored data for the user.
@ -552,7 +343,7 @@ def create_application(config):
form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed
# to it before # to it before
if request.method == 'GET': if request.method == 'GET':
tokens = door.get_tokens() tokens = current_app.door.get_tokens()
if token in tokens: if token in tokens:
# set default for form according to values from the token file # set default for form according to values from the token file
et = tokens[token] et = tokens[token]
@ -589,31 +380,33 @@ def create_application(config):
else: else:
return render_template('edit.html', token=token, form=form) return render_template('edit.html', token=token, form=form)
@app.route('/store-token')
@roles_required('admin') @door_app.route('/store-token')
def store_token(): @roles_required('admin')
def store_token():
"""Store token to the token file on disk. """Store token to the token file on disk.
This will use the token id and the associated data stored in the session cookie (filled by register_token() or This will use the token id and the associated data stored in the session cookie (filled by register_token() or
edit_token()) and create/modify a token and store the new token file to disk. edit_token()) and create/modify a token and store the new token file to disk.
""" """
token = session['token'] token = session['token']
tokens = door.get_tokens() tokens = current_app.door.get_tokens()
tokens[token] = {'name': session['name'], tokens[token] = {'name': session['name'],
'email': session['email'], 'email': session['email'],
'valid_thru': session['valid_thru'], 'valid_thru': session['valid_thru'],
'inactive': session['inactive'], 'inactive': session['inactive'],
'organization': session['organization']} 'organization': session['organization']}
try: try:
door.store_tokens(tokens) current_app.door.store_tokens(tokens)
logger.info(f"Token {token} stored in database by admin user {current_user.username}") current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}")
except Exception as e: except Exception as e:
flash(f"Error during store_tokens. Exception: {e}") flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens') return redirect('/tokens')
@app.route('/delete-token/<token>', methods=['GET', 'POST'])
@roles_required('admin') @door_app.route('/delete-token/<token>', methods=['GET', 'POST'])
def delete_token(token): @roles_required('admin')
def delete_token(token):
"""Delete the given token from the token file and store the new token file to disk """Delete the given token from the token file and store the new token file to disk
Parameters Parameters
@ -621,7 +414,7 @@ def create_application(config):
token : str token : str
The token to delete from the database. The token to delete from the database.
""" """
tokens = door.get_tokens() tokens = current_app.door.get_tokens()
if token in tokens: if token in tokens:
token_to_delete = tokens[token] token_to_delete = tokens[token]
@ -637,8 +430,8 @@ def create_application(config):
# form validation successful -> can delete the token # form validation successful -> can delete the token
tokens.pop(token) tokens.pop(token)
try: try:
door.store_tokens(tokens) current_app.door.store_tokens(tokens)
logger.info(f"Token {token} was deleted from database by admin user {current_user.username}") current_app.logger.info(f"Token {token} was deleted from database by admin user {current_user.username}")
except Exception as e: except Exception as e:
flash(f"Error during store_tokens. Exception: {e}") flash(f"Error during store_tokens. Exception: {e}")
flash(f"Token {token} wurde gelöscht!") flash(f"Token {token} wurde gelöscht!")
@ -652,9 +445,10 @@ def create_application(config):
flash(f'Ungültiger Token {token} für Löschung.') flash(f'Ungültiger Token {token} für Löschung.')
return redirect('/tokens') return redirect('/tokens')
@app.route('/deactivate-token/<token>')
@roles_required('admin') @door_app.route('/deactivate-token/<token>')
def deactivate_token(token): @roles_required('admin')
def deactivate_token(token):
"""Deactivate access for the given token. This updates the token file on disk. """Deactivate access for the given token. This updates the token file on disk.
Parameters Parameters
@ -662,21 +456,22 @@ def create_application(config):
token : str token : str
The token to deactivate. The token to deactivate.
""" """
tokens = door.get_tokens() tokens = current_app.door.get_tokens()
if token in tokens: if token in tokens:
tokens[token]['inactive'] = True tokens[token]['inactive'] = True
try: try:
door.store_tokens(tokens) current_app.door.store_tokens(tokens)
logger.info(f"Token {token} deactivated by admin user {current_user.username}") current_app.logger.info(f"Token {token} deactivated by admin user {current_user.username}")
except Exception as e: except Exception as e:
flash(f"Error during store_tokens. Exception: {e}") flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens') return redirect('/tokens')
@app.route('/backup_tokens')
@roles_required('admin') @door_app.route('/backup_tokens')
def backup_tokens(): @roles_required('admin')
def backup_tokens():
# get list of defined admin users for backup # get list of defined admin users for backup
tokens = door.get_tokens() tokens = current_app.door.get_tokens()
try: try:
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
file = Path(tmpdir, 'token_data.txt') file = Path(tmpdir, 'token_data.txt')
@ -685,68 +480,25 @@ def create_application(config):
except Exception as e: except Exception as e:
return str(e) return str(e)
@app.route('/open')
@auth_required()
def open_door():
@door_app.route('/open')
@auth_required()
def open_door():
try: try:
door.open_door(user=current_user.username) current_app.door.open_door(user=current_user.username)
logger.info(f"Door opened by admin user {current_user.username}") current_app.logger.info(f"Door opened by admin user {current_user.username}")
except Exception as e: except Exception as e:
flash(f'Could not open door. Exception: {e}') flash(f'Could not open door. Exception: {e}')
return redirect('/') return redirect('/')
# routes for opening and closing the door via the web interface
@app.route('/close') # routes for opening and closing the door via the web interface
@auth_required() @door_app.route('/close')
def close_door(): @auth_required()
def close_door():
try: try:
door.close_door(user=current_user.username) current_app.door.close_door(user=current_user.username)
logger.info(f"Door closed by admin user {current_user.username}") current_app.logger.info(f"Door closed by admin user {current_user.username}")
except Exception as e: except Exception as e:
flash(f'Could not close door. Exception: {e}') flash(f'Could not close door. Exception: {e}')
return redirect('/') return redirect('/')
# setup user database when starting the app
with app.app_context():
new_admin_data = []
if config.admin_file is not None:
if not Path(config.admin_file).exists():
logger.warning(f"Admin user creation file not found at {config.admin_file}")
else:
# store data for new admins in memory s.t. the file can be deleted afterwards
with open(config.admin_file) as f:
for i, line in enumerate(f.readlines()):
if not line.strip().startswith('#'):
try:
user, email, pw = line.split()
validate_email(email)
new_admin_data.append({'username': user, 'email': email, 'password': pw})
except Exception as e:
print(
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
# create admin users (only if they don't exists already)
def create_super_admins(new_admin_data):
db.create_all()
super_admin_role = user_datastore.find_or_create_role('super_admin') # root admin = can create other admins
admin_role = user_datastore.find_or_create_role('admin') # 'normal' admin
local_role = user_datastore.find_or_create_role('local') # LDAP user or local user
for d in new_admin_data:
if user_datastore.find_user(email=d['email'], username=d['username']) is None:
roles = [super_admin_role, admin_role]
if not d['password'] == 'LDAP':
roles.append(local_role)
logger.info(f"New super admin user created with username '{d['username']}' and email '{d['email']}', roles = {[r.name for r in roles]}")
# create new admin (only if admin does not already exist)
new_admin = user_datastore.create_user(email=d['email'], username=d['username'],
password=hash_password(d['password']),
roles=roles)
db.session.commit()
create_super_admins(new_admin_data)
return app

View File

@ -1,5 +0,0 @@
flask~=1.1.2
Flask-Security-Too~=4.0.0
WTForms~=2.3.3
paho-mqtt~=1.5.1
bleach~=3.3.0

View File

@ -15,3 +15,35 @@ classifiers =
Programming Language :: Python :: 3 Programming Language :: Python :: 3
License :: OSI Approved :: GNU General Public License v3 (GPLv3) License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Operating System :: OS Independent Operating System :: OS Independent
[options]
python_requires = >=3.8
install_requires =
bleach
Flask
Flask-Mail
Flask-Security-Too
Flask-SQLAlchemy
Flask-WTF
email_validator
paho-mqtt
ldap3
wtforms
include_package_data = True
packages = find:
setup_requires =
wheel
tests_require = pytest>=3
zip_safe = False
scripts= bin/launch_webadmin
[options.extras_require]
dev =
pytest
pytest-cov
pytest-flask
pytest-mock
flake8
selenium
beautifulsoup4

View File

@ -1,17 +1,3 @@
from setuptools import setup from setuptools import setup
setup(install_requires=[ setup()
'bleach',
'Flask',
'Flask-Mail',
'Flask-Security-Too',
'Flask-SQLAlchemy',
'Flask-WTF',
'email_validator',
'paho-mqtt',
'ldap3',
],
include_package_data=True,
scripts=['bin/launch_webadmin'],
packages=['imaginaerraum_door_admin'],
zip_safe=False)

21
tests/conftest.py Normal file
View File

@ -0,0 +1,21 @@
import pytest
from selenium.webdriver import Chrome
from imaginaerraum_door_admin import create_app
@pytest.fixture(scope='session')
def app():
"""Fixture to launch the webapp"""
app = create_app()
return app
@pytest.fixture
def browser():
"""Fixture for a selenium browser to access the webapp."""
driver = Chrome()
driver.implicitly_wait(10)
yield driver
driver.quit()

119
tests/test_webinterface.py Normal file
View File

@ -0,0 +1,119 @@
import pytest
from bs4 import BeautifulSoup
from imaginaerraum_door_admin.door_handle import DoorHandle
import re
def test_login(browser, live_server):
response = browser.get(f'http://localhost:{live_server.port}')
assert '<h1>Space Zugangsverwaltung</h1>' in browser.page_source
response = browser.get(f'http://localhost:{live_server.port}/login')
email_form = browser.find_element_by_id('email').send_keys('gandalf@shire.me')
password_form = browser.find_element_by_id('password').send_keys('shadowfax')
submit_button = browser.find_element_by_id('submit').click()
assert 'Tür öffnen' in browser.page_source
def extract_csrf_token(response):
soup = BeautifulSoup(response.data)
csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value']
return csrf_token
def headless_login(client, user='gandalf@shire.me', password='shadowfax'):
# extract csrf token from the login page source
response = client.get('/login')
csrf_token = extract_csrf_token(response)
# send login information
payload = {
'csrf_token': csrf_token,
'email': user,
'password': password
}
return client.post('/login', data=payload, follow_redirects=True)
def test_login_headless(client):
response = headless_login(client)
soup = BeautifulSoup(response.data)
# make sure login succeeded -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
@pytest.fixture
def client_authenticated(client):
# log in using admin account for testing
headless_login(client)
yield client
def test_open_door_button(client_authenticated, mocker):
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.open_door')
# visit route for open
client_authenticated.get('/open')
# make sure the open method was called
DoorHandle.open_door.assert_called_once_with(user='gandalf')
def test_close_door_button(client_authenticated, mocker):
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.close_door')
# visit route for open
client_authenticated.get('/close')
# make sure the open method was called
DoorHandle.close_door.assert_called_once_with(user='gandalf')
def test_manage_admins(client_authenticated):
# visit admin management page
response = client_authenticated.get('/manage_admins')
assert "Nutzer Übersicht" in response.data.decode()
assert "gandalf" in response.data.decode()
assert "gandalf@shire.me" in response.data.decode()
def test_create_admin(client_authenticated):
# visit admin management page
response = client_authenticated.get('/manage_admins')
csrf_token = extract_csrf_token(response)
# post data for creating a new admin
payload = {'name': 'bilbo',
'email': 'bilbo@shire.me',
'csrf_token': csrf_token}
response = client_authenticated.post('/manage_admins', data=payload,
follow_redirects=True)
# after the new admin user is created, we should have been redirected to the
# /manage_admin page. there, the password for login is displayed
# we test if the newly created user can log in with that password
# extract password displayed on the page
match = re.search('Passwort (?P<password>.*) um', response.data.decode())
assert match is not None
extracted_password = match['password']
# log out current user
response = client_authenticated.get('/logout')
# try to log in new user using the extracted password
response = headless_login(client_authenticated, user='bilbo',
password=extracted_password)
# - see if it works
soup = BeautifulSoup(response.data)
# make sure login succeeded
# -> username should be displayed
assert 'Benutzer <span>bilbo</span>' in soup.decode()
# -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])