Compare commits

...

5 Commits

5 changed files with 331 additions and 58 deletions

View File

@ -0,0 +1,57 @@
{% extends 'base.html' %}
{% block header %}
{% block title %}<h1>Admin Übersicht</h1>{% endblock %}
<script src="../static/jquery-3.6.0.js"></script>
{% endblock %}
{% block content %}
<table border="1">
<td>Username</td>
<td>E-Mail</td>
<td>Aktiv</td>
<td>Aktionen</td>
{% for data in admin_data %}
<tr>
{% for field in ['username', 'email', 'active'] %}
<td>{{ data[field] if data[field] }}</td>
{% endfor %}
<td>
<a href="{{ url_for('admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
<a href="{{ url_for('delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
</td>
</tr>
{% endfor %}
</table>
<div>
<p>
Neuen Admin erstellen:
</p>
<form method="POST">
<table>
{{ form.csrf_token }}
<tr>
<td>{{ form.name.label }}</td>
<td>{{ form.name(size=20) }}</td>
</tr>
<tr>
<td>{{ form.email.label }}</td>
<td>{{ form.email(size=20) }}</td>
</tr>
<tr>
<td></td>
<td>
<input type="submit" value="Abschicken">
</td>
</tr>
</table>
</form>
</div>
<form action="{{ url_for('backup_user_datastore') }}" method="get">
<input type="submit" value="Admin Daten sichern">
</form>
<form action="{{ url_for('restore_user_datastore') }}" method=post enctype=multipart/form-data>
<input type=file name=file>
<input type=submit value="Admin Daten wiederherstellen">
</form>
{% endblock %}

View File

@ -8,6 +8,9 @@
<li><a href="{{ url_for('list_tokens') }}">Token Übersicht</a> <li><a href="{{ url_for('list_tokens') }}">Token Übersicht</a>
<li><a href="{{ url_for('open_door') }}">Tür öffnen</a> <li><a href="{{ url_for('open_door') }}">Tür öffnen</a>
<li><a href="{{ url_for('close_door') }}">Tür schließen</a> <li><a href="{{ url_for('close_door') }}">Tür schließen</a>
{% if current_user.has_role('super_admin') %}
<li><a href="{{ url_for('manage_admins') }}">Admins verwalten</a>
{% endif %}
{% if current_user.is_authenticated %} {% if current_user.is_authenticated %}
<li><a href="{{ url_for('security.change_password') }}">Passwort ändern</a> <li><a href="{{ url_for('security.change_password') }}">Passwort ändern</a>
<li><a href="{{ url_for('security.logout') }}">Benutzer <span>{{ current_user.username }}</span> ausloggen</a> <li><a href="{{ url_for('security.logout') }}">Benutzer <span>{{ current_user.username }}</span> ausloggen</a>

View File

@ -0,0 +1,27 @@
{% extends 'base.html' %}
{% block header %}
{% block title %}<h1>Admin Benutzer löschen</h1>{% endblock %}
<script src="../static/jquery-3.6.0.js"></script>
{% endblock %}
{% block content %}
<div>
Achtung, Admin '{{ username }}' wird gelöscht.
Bitte zur Bestätigung den Nutzernamen eingeben:
<form method="POST">
<table>
{{ form.csrf_token }}
<tr>
<td>{{ form.name.label }}</td>
<td>{{ form.name(size=20) }}</td>
</tr>
<tr>
<td></td>
<td>
<input type="submit" value="Bestätigen">
</td>
</tr>
</table>
</form>
</div>
{% endblock %}

View File

@ -39,4 +39,7 @@
</tr> </tr>
{% endfor %} {% endfor %}
</table> </table>
<form action="{{ url_for('backup_tokens') }}" method="get">
<input type="submit" value="Token Daten sichern">
</form>
{% endblock %} {% endblock %}

View File

@ -1,21 +1,27 @@
import os import os
from flask import Flask, render_template, request, flash, redirect, session, url_for from flask import Flask, render_template, request, flash, redirect, session, send_file
from werkzeug.utils import secure_filename
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms.fields.html5 import DateField, EmailField from wtforms.fields.html5 import DateField, EmailField
from wtforms.fields import StringField, BooleanField from wtforms.fields import StringField, BooleanField
from wtforms.validators import DataRequired, ValidationError, EqualTo from wtforms.validators import DataRequired, ValidationError, EqualTo
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password, uia_email_mapper, current_user from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password, uia_email_mapper, \
current_user, roles_required
from flask_security.models import fsqla_v2 as fsqla from flask_security.models import fsqla_v2 as fsqla
from flask_security.forms import LoginForm, Required, PasswordField from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import find_user, verify_password from flask_security.utils import find_user, verify_password
from flask_mail import Mail from flask_mail import Mail
from email_validator import validate_email from email_validator import validate_email, EmailNotValidError
import json
import secrets
import bleach import bleach
import ldap3 import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
from pathlib import Path from pathlib import Path
import logging import logging
import tempfile
from datetime import date from datetime import date
from .door_handle import DoorHandle from .door_handle import DoorHandle
@ -41,14 +47,22 @@ class TokenForm(FlaskForm):
active = BooleanField('Aktiv?') active = BooleanField('Aktiv?')
dsgvo = BooleanField('Einwilligung Nutzungsbedingungen erfragt?', validators=[DataRequired()]) dsgvo = BooleanField('Einwilligung Nutzungsbedingungen erfragt?', validators=[DataRequired()])
class TokenDeleteForm(FlaskForm):
class ConfirmDeleteForm(FlaskForm):
name = StringField('Name', validators=[DataRequired(), EqualTo('name_confirm', 'Name stimmt nicht überein')]) name = StringField('Name', validators=[DataRequired(), EqualTo('name_confirm', 'Name stimmt nicht überein')])
name_confirm = StringField('Name confirm') name_confirm = StringField('Name confirm')
class AdminCreationForm(FlaskForm):
name = StringField('Name', validators=[DataRequired()])
email = EmailField('E-Mail', validators=[DataRequired()])
def uia_username_mapper(identity): def uia_username_mapper(identity):
# we allow pretty much anything - but we bleach it. # we allow pretty much anything - but we bleach it.
return bleach.clean(identity, strip=True) return bleach.clean(identity, strip=True)
def create_application(config): def create_application(config):
# set up logging for the web app # set up logging for the web app
logger = logging.getLogger('webapp') logger = logging.getLogger('webapp')
@ -76,22 +90,6 @@ def create_application(config):
if not Path(config.token_file).exists(): if not Path(config.token_file).exists():
logger.warning(f"Token file not found at {config.token_file}") logger.warning(f"Token file not found at {config.token_file}")
new_admin_data = []
if config.admin_file is not None:
if not Path(config.admin_file).exists():
logger.warning(f"Admin user creation file not found at {config.admin_file}")
else:
# store data for new admins in memory s.t. the file can be deleted afterwards
with open(config.admin_file) as f:
for i, d in enumerate(f.readlines()):
try:
user, email, pw = d.split()
validate_email(email)
new_admin_data.append({'username': user, 'email': email, 'password': pw})
except Exception as e:
print(f"Error while parsing line {i} in admin config file. Config file should contain lines of "
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
# create door objects which provides access to the token file and current door state via MQTT # create door objects which provides access to the token file and current door state via MQTT
door = DoorHandle(token_file=config.token_file, mqtt_host=config.mqtt_host, nfc_socket=config.nfc_socket, door = DoorHandle(token_file=config.token_file, mqtt_host=config.mqtt_host, nfc_socket=config.nfc_socket,
logger=logger) logger=logger)
@ -146,9 +144,9 @@ def create_application(config):
# LDAP # LDAP
ldap_server = ldap3.Server(config.ldap_url) ldap_server = ldap3.Server(config.ldap_url)
local_ldap_cache = {} # dict for caching LDAP authorization locally (stores username + hashed password) local_ldap_cache = {} # dict for caching LDAP authorization locally (stores username + hashed password)
def validate_ldap(user, password): def validate_ldap(username, password):
"""Validate the user and password through an LDAP server. """Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized and the password is stored If the connection completes successfully the given user and password is authorized and the password is stored
@ -167,19 +165,22 @@ def create_application(config):
""" """
try: try:
con = ldap3.Connection(ldap_server, user="uid=%s,ou=Users,dc=imaginaerraum,dc=de" % (user.username,), con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
password=password, auto_bind=True) password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError: except ldap3.core.exceptions.LDAPBindError:
# server reachable but user unauthorized -> fail # server reachable but user unauthorized -> fail
return False return False
except LDAPSocketOpenError: except LDAPSocketOpenError:
# server not reachable -> try cached authorization data # server not reachable -> try cached authorization data
return user.username in local_ldap_cache and verify_password(password, local_ldap_cache[user.username]) return user.username in local_ldap_cache and verify_password(password, local_ldap_cache[user.username])
except Exception: except Exception as e:
# for other Exceptions we just fail # for other Exceptions we just fail
return False return False
# TODO check if user has permission to edit tokens # TODO check if user has permission to edit tokens
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de', f'(&(uid={user.username})(memberof=cn=Members,ou=Groups,dc=imaginaerraum,dc=de))')
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={user.username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
# if LDAP authorization succeeds we cache the password locally (in memory) to allow LDAP authentication even if # if LDAP authorization succeeds we cache the password locally (in memory) to allow LDAP authentication even if
# the server is not reachable # the server is not reachable
local_ldap_cache[user.username] = hash_password(password) local_ldap_cache[user.username] = hash_password(password)
@ -190,17 +191,32 @@ def create_application(config):
password = PasswordField('Passwort', [Required()]) password = PasswordField('Passwort', [Required()])
def validate(self): def validate(self):
# try authorizing locally using Flask security user datastore # try authorizing using LDAP
authorized = super(ExtendedLoginForm, self).validate() # authorization in LDAP uses username -> get username associated with email from the database
try:
# if an email (instead of a username) was entered for authentication we check if there already is a user
# with that email in the database
validate_email(self.email.data)
user = find_user(self.email.data)
if user is not None:
username = user.username
else:
# this means there is no user with that email in the database
username = None
except EmailNotValidError:
# else we use the entered credentials as username
username = self.email.data
authorized = validate_ldap(username, self.password.data)
if authorized:
logger.info(f"Admin user with credentials '{self.email.data}' authorized through LDAP")
if not authorized: if not authorized:
# try authorizing using LDAP # try authorizing locally using Flask security user datastore
# authorization in LDAP uses username -> get username associated with email from the database authorized = super(ExtendedLoginForm, self).validate()
user = find_user(self.email.data)
authorized = validate_ldap(user, self.password.data) if authorized:
if authorized:
logger.info(f"Admin user with credentials '{self.email.data}' authorized through LDAP")
else:
logger.info(f"Admin user with credentials '{self.email.data}' authorized through local database") logger.info(f"Admin user with credentials '{self.email.data}' authorized through local database")
# if any of the authorization methods is successful we authorize the user # if any of the authorization methods is successful we authorize the user
@ -210,28 +226,148 @@ def create_application(config):
user_datastore = SQLAlchemyUserDatastore(db, User, Role) user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security = Security(app, user_datastore, login_form=ExtendedLoginForm) security = Security(app, user_datastore, login_form=ExtendedLoginForm)
# create admin users (only if they don't exists already) # admin user management
def create_admins(new_admin_data): @app.route('/manage_admins', methods=['GET', 'POST'])
for d in new_admin_data: @roles_required('super_admin')
if user_datastore.find_user(email=d['email'], username=d['username']) is None: def manage_admins():
logger.info(f"New admin user created with username '{d['username']}' and email '{d['email']}'") form = AdminCreationForm()
# create new admin (only if admin does not already exist) if request.method == 'GET':
user_datastore.create_user(email=d['email'], username=d['username'], password=hash_password(d['password'])) users = user_datastore.user_model.query.all()
admin_data = [{'username': u.username, 'email': u.email, 'active': u.is_active} for u in users]
return render_template('admins.html', admin_data=admin_data, form=form)
elif form.validate():
if user_datastore.find_user(username=form.name.data) is not None or \
user_datastore.find_user(email=form.email.data) is not None:
flash("A user with the same name or email is already registered!")
return redirect('/manage_admins')
else:
pw = secrets.token_urlsafe(16)
new_user = user_datastore.create_user(username=form.name.data, email=form.email.data, roles=['admin'],
password=hash_password(pw))
logger.info(
f"Super admin {current_user.username} created new admin account for {new_user.username} <{new_user.email}>")
flash(
f"An account for the new admin user {new_user.username} has been created. Use the randomly generated password {pw} to log in.")
db.session.commit()
return redirect('/manage_admins')
# Create a user to test with @app.route('/delete_admins/<username>', methods=['GET', 'POST'])
@app.before_first_request @roles_required('super_admin')
def create_user(): def delete_admins(username):
db.create_all() user = user_datastore.find_user(username=username)
if len(new_admin_data) > 0: if user is None:
# create admin accounts from given file flash(f"Invalid user {username}")
create_admins(new_admin_data) return redirect('/manage_admins')
if user.has_role('super_admin'):
flash('Cannot delete super admins!')
return redirect('/manage_admins')
if user.is_active:
flash('Cannot delete active users. Please deactivate user first!')
return redirect('/manage_admins')
# set up form for confirming deletion
form = ConfirmDeleteForm()
form.name_confirm.data = username
if request.method == 'GET':
# return page asking the user to confirm delete
return render_template('delete_admin.html', username=username, form=form)
elif form.validate():
user_datastore.delete_user(user)
flash(f"Username {username} was deleted.")
logger.info(f"Super admin {current_user.username} deleted admin user {username}")
db.session.commit()
return redirect('/manage_admins')
else:
flash("Username does not match. User was not deleted!")
return redirect('/manage_admins')
@app.route('/admin_toggle_active/<username>')
@roles_required('super_admin')
def admin_toggle_active(username):
user = user_datastore.find_user(username=username)
if user is None:
flash(f"Invalid user {username}")
return redirect('/manage_admins')
if user.has_role('super_admin'):
flash('Cannot deactivate super admins!')
return redirect('/manage_admins')
user_datastore.toggle_active(user)
if user.is_active:
logger.info(f"Super admin {current_user.username} activated access for admin user {username}")
else:
logger.info(f"Super admin {current_user.username} deactivated access for admin user {username}")
db.session.commit() db.session.commit()
return redirect('/manage_admins')
@app.route('/backup_user_datastore')
@roles_required('super_admin')
def backup_user_datastore():
# get list of defined admin users for backup
users = user_datastore.user_model.query.all()
user_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, 'password_hash': u.password}
for u in users if not u.has_role('super_admin')]
try:
with tempfile.TemporaryDirectory() as tmpdir:
file = Path(tmpdir, 'user_data.txt')
file.write_text(json.dumps(user_data))
return send_file(file, as_attachment=True, cache_timeout=-1)
except Exception as e:
return str(e)
@app.route('/restore_user_datastore', methods=['POST'])
@roles_required('super_admin')
def restore_user_datastore():
# check if the post request has the file part
if 'file' not in request.files:
flash('Keine Datei ausgewählt!')
return redirect(request.url)
file = request.files['file']
# if user does not select file, browser also
# submit an empty part without filename
if file.filename == '':
flash('Keine Datei ausgewählt!')
return redirect('/manage_admins')
filename = secure_filename(file.filename)
if file and filename.endswith('.txt'):
data = file.stream.read()
try:
# check validity of user data
user_data = json.loads(data)
valid = type(user_data) == list
valid &= all(type(d) == dict for d in user_data)
if valid:
for d in user_data:
entry_valid = set(d.keys()) == { 'active', 'email', 'password_hash', 'username'}
entry_valid &= all(len(d[key]) > 0 for key in ['email', 'password_hash', 'username'])
entry_valid &= type(d['active']) == bool
validate_email(d['email'])
if entry_valid:
existing_user = user_datastore.find_user(username=d['username'], email=d['email'])
if existing_user is None:
user_datastore.create_user(username=d['username'], email=d['email'], password=d['password_hash'],
roles=['admin'], active=d['active'])
flash(f"Admin Account für Benutzer '{d['username']} wurde wiederhergestellt.")
else:
flash(f"Admin '{d['username']} existiert bereits. Eintrag wird übersprungen.")
else:
raise ValueError(f"Ungültige Daten für User Entry {d}")
else:
raise ValueError("Admin User Datei hat ungültiges Format.")
except Exception as e:
flash(f"Die Datei konnte nicht gelesen werden. Exception: {e}")
return redirect('/manage_admins')
flash("Admin Benutzer aus Datei gelesen.")
db.session.commit()
else:
flash("Ungültige Dateiendung")
return redirect('/manage_admins')
# main page
@app.route('/') @app.route('/')
def door_lock(): def door_lock():
return render_template('index.html', door_state=door.state, encoder_position=door.encoder_position) return render_template('index.html', door_state=door.state, encoder_position=door.encoder_position)
# token overview
@app.route('/tokens') @app.route('/tokens')
@auth_required() @auth_required()
def list_tokens(): def list_tokens():
@ -240,7 +376,7 @@ def create_application(config):
inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']} inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']}
return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens) return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens)
# routes for registering, editing and deleting tokens
@app.route('/register-token', methods=['GET', 'POST']) @app.route('/register-token', methods=['GET', 'POST'])
@auth_required() @auth_required()
def register(): def register():
@ -269,11 +405,10 @@ def create_application(config):
else: else:
session['valid_thru'] = '' session['valid_thru'] = ''
session['inactive'] = not form.active.data session['inactive'] = not form.active.data
return redirect(f'/store-token') return redirect('/store-token')
else: else:
return render_template('register.html', token=door.get_most_recent_token(), form=form) return render_template('register.html', token=door.get_most_recent_token(), form=form)
@app.route('/edit-token/<token>', methods=['GET', 'POST']) @app.route('/edit-token/<token>', methods=['GET', 'POST'])
@auth_required() @auth_required()
def edit_token(token): def edit_token(token):
@ -290,7 +425,7 @@ def create_application(config):
""" """
form = TokenForm(request.form) form = TokenForm(request.form)
form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed
# to it before # to it before
if request.method == 'GET': if request.method == 'GET':
tokens = door.get_tokens() tokens = door.get_tokens()
if token in tokens: if token in tokens:
@ -329,7 +464,6 @@ def create_application(config):
else: else:
return render_template('edit.html', token=token, form=form) return render_template('edit.html', token=token, form=form)
@app.route('/store-token') @app.route('/store-token')
@auth_required() @auth_required()
def store_token(): def store_token():
@ -352,7 +486,6 @@ def create_application(config):
flash(f"Error during store_tokens. Exception: {e}") flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens') return redirect('/tokens')
@app.route('/delete-token/<token>', methods=['GET', 'POST']) @app.route('/delete-token/<token>', methods=['GET', 'POST'])
@auth_required() @auth_required()
def delete_token(token): def delete_token(token):
@ -369,7 +502,7 @@ def create_application(config):
token_to_delete = tokens[token] token_to_delete = tokens[token]
# set up form for confirming deletion # set up form for confirming deletion
form = TokenDeleteForm() form = ConfirmDeleteForm()
form.name_confirm.data = token_to_delete['name'] form.name_confirm.data = token_to_delete['name']
if request.method == 'GET': if request.method == 'GET':
@ -387,13 +520,13 @@ def create_application(config):
return redirect('/tokens') return redirect('/tokens')
else: else:
# form validation failed -> return to token overview and flash message # form validation failed -> return to token overview and flash message
flash(f"Der eingegebene Name stimmt nicht überein. Der Token {token} von {token_to_delete['name']} wurde nicht gelöscht.") flash(
f"Der eingegebene Name stimmt nicht überein. Der Token {token} von {token_to_delete['name']} wurde nicht gelöscht.")
return redirect('/tokens') return redirect('/tokens')
else: else:
flash(f'Ungültiger Token {token} für Löschung.') flash(f'Ungültiger Token {token} für Löschung.')
return redirect('/tokens') return redirect('/tokens')
@app.route('/deactivate-token/<token>') @app.route('/deactivate-token/<token>')
@auth_required() @auth_required()
def deactivate_token(token): def deactivate_token(token):
@ -414,6 +547,19 @@ def create_application(config):
flash(f"Error during store_tokens. Exception: {e}") flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens') return redirect('/tokens')
@app.route('/backup_tokens')
@auth_required()
def backup_tokens():
# get list of defined admin users for backup
tokens = door.get_tokens()
try:
with tempfile.TemporaryDirectory() as tmpdir:
file = Path(tmpdir, 'token_data.txt')
file.write_text(json.dumps(tokens))
return send_file(file, as_attachment=True, cache_timeout=-1)
except Exception as e:
return str(e)
@app.route('/open') @app.route('/open')
@auth_required() @auth_required()
def open_door(): def open_door():
@ -424,6 +570,7 @@ def create_application(config):
flash(f'Could not open door. Exception: {e}') flash(f'Could not open door. Exception: {e}')
return redirect('/') return redirect('/')
# routes for opening and closing the door via the web interface
@app.route('/close') @app.route('/close')
@auth_required() @auth_required()
def close_door(): def close_door():
@ -434,4 +581,40 @@ def create_application(config):
flash(f'Could not close door. Exception: {e}') flash(f'Could not close door. Exception: {e}')
return redirect('/') return redirect('/')
# setup user database when starting the app
with app.app_context():
new_admin_data = []
if config.admin_file is not None:
if not Path(config.admin_file).exists():
logger.warning(f"Admin user creation file not found at {config.admin_file}")
else:
# store data for new admins in memory s.t. the file can be deleted afterwards
with open(config.admin_file) as f:
for i, d in enumerate(f.readlines()):
try:
user, email, pw = d.split()
validate_email(email)
new_admin_data.append({'username': user, 'email': email, 'password': pw})
except Exception as e:
print(
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
# create admin users (only if they don't exists already)
def create_super_admins(new_admin_data):
db.create_all()
super_admin_role = user_datastore.find_or_create_role('super_admin') # root admin = can create other admins
admin_role = user_datastore.find_or_create_role('admin') # 'normal' admin
for d in new_admin_data:
if user_datastore.find_user(email=d['email'], username=d['username']) is None:
logger.info(f"New admin user created with username '{d['username']}' and email '{d['email']}'")
# create new admin (only if admin does not already exist)
new_admin = user_datastore.create_user(email=d['email'], username=d['username'],
password=hash_password(d['password']),
roles=[super_admin_role, admin_role])
db.session.commit()
create_super_admins(new_admin_data)
return app return app