added ability to backup and restore admin users for super admins

This commit is contained in:
Simon Pirkelmann 2021-03-28 21:50:44 +02:00
parent e707f4bd87
commit 8cdf549c4c
2 changed files with 103 additions and 22 deletions

View File

@ -47,4 +47,11 @@
</table> </table>
</form> </form>
</div> </div>
<form action="{{ url_for('backup_user_datastore') }}" method="get">
<input type="submit" value="Admin Daten sichern">
</form>
<form action="{{ url_for('restore_user_datastore') }}" method=post enctype=multipart/form-data>
<input type=file name=file>
<input type=submit value="Admin Daten wiederherstellen">
</form>
{% endblock %} {% endblock %}

View File

@ -1,23 +1,27 @@
import os import os
from flask import Flask, render_template, request, flash, redirect, session, url_for from flask import Flask, render_template, request, flash, redirect, session, send_file
from werkzeug.utils import secure_filename
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms.fields.html5 import DateField, EmailField from wtforms.fields.html5 import DateField, EmailField
from wtforms.fields import StringField, BooleanField from wtforms.fields import StringField, BooleanField
from wtforms.validators import DataRequired, ValidationError, EqualTo from wtforms.validators import DataRequired, ValidationError, EqualTo
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password, uia_email_mapper, current_user, roles_required from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password, uia_email_mapper, \
current_user, roles_required
from flask_security.models import fsqla_v2 as fsqla from flask_security.models import fsqla_v2 as fsqla
from flask_security.forms import LoginForm, Required, PasswordField from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import find_user, verify_password from flask_security.utils import find_user, verify_password
from flask_mail import Mail from flask_mail import Mail
from email_validator import validate_email from email_validator import validate_email
import json
import secrets import secrets
import bleach import bleach
import ldap3 import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
from pathlib import Path from pathlib import Path
import logging import logging
import tempfile
from datetime import date from datetime import date
from .door_handle import DoorHandle from .door_handle import DoorHandle
@ -43,18 +47,22 @@ class TokenForm(FlaskForm):
active = BooleanField('Aktiv?') active = BooleanField('Aktiv?')
dsgvo = BooleanField('Einwilligung Nutzungsbedingungen erfragt?', validators=[DataRequired()]) dsgvo = BooleanField('Einwilligung Nutzungsbedingungen erfragt?', validators=[DataRequired()])
class TokenDeleteForm(FlaskForm):
class ConfirmDeleteForm(FlaskForm):
name = StringField('Name', validators=[DataRequired(), EqualTo('name_confirm', 'Name stimmt nicht überein')]) name = StringField('Name', validators=[DataRequired(), EqualTo('name_confirm', 'Name stimmt nicht überein')])
name_confirm = StringField('Name confirm') name_confirm = StringField('Name confirm')
class AdminCreationForm(FlaskForm): class AdminCreationForm(FlaskForm):
name = StringField('Name', validators=[DataRequired()]) name = StringField('Name', validators=[DataRequired()])
email = EmailField('E-Mail', validators=[DataRequired()]) email = EmailField('E-Mail', validators=[DataRequired()])
def uia_username_mapper(identity): def uia_username_mapper(identity):
# we allow pretty much anything - but we bleach it. # we allow pretty much anything - but we bleach it.
return bleach.clean(identity, strip=True) return bleach.clean(identity, strip=True)
def create_application(config): def create_application(config):
# set up logging for the web app # set up logging for the web app
logger = logging.getLogger('webapp') logger = logging.getLogger('webapp')
@ -136,7 +144,7 @@ def create_application(config):
# LDAP # LDAP
ldap_server = ldap3.Server(config.ldap_url) ldap_server = ldap3.Server(config.ldap_url)
local_ldap_cache = {} # dict for caching LDAP authorization locally (stores username + hashed password) local_ldap_cache = {} # dict for caching LDAP authorization locally (stores username + hashed password)
def validate_ldap(user, password): def validate_ldap(user, password):
"""Validate the user and password through an LDAP server. """Validate the user and password through an LDAP server.
@ -158,7 +166,7 @@ def create_application(config):
try: try:
con = ldap3.Connection(ldap_server, user="uid=%s,ou=Users,dc=imaginaerraum,dc=de" % (user.username,), con = ldap3.Connection(ldap_server, user="uid=%s,ou=Users,dc=imaginaerraum,dc=de" % (user.username,),
password=password, auto_bind=True) password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError: except ldap3.core.exceptions.LDAPBindError:
# server reachable but user unauthorized -> fail # server reachable but user unauthorized -> fail
return False return False
@ -200,8 +208,7 @@ def create_application(config):
user_datastore = SQLAlchemyUserDatastore(db, User, Role) user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security = Security(app, user_datastore, login_form=ExtendedLoginForm) security = Security(app, user_datastore, login_form=ExtendedLoginForm)
# admin user management
@app.route('/manage_admins', methods=['GET', 'POST']) @app.route('/manage_admins', methods=['GET', 'POST'])
@roles_required('super_admin') @roles_required('super_admin')
def manage_admins(): def manage_admins():
@ -212,15 +219,17 @@ def create_application(config):
return render_template('admins.html', admin_data=admin_data, form=form) return render_template('admins.html', admin_data=admin_data, form=form)
elif form.validate(): elif form.validate():
if user_datastore.find_user(username=form.name.data) is not None or \ if user_datastore.find_user(username=form.name.data) is not None or \
user_datastore.find_user(email=form.email.data) is not None: user_datastore.find_user(email=form.email.data) is not None:
flash("A user with the same name or email is already registered!") flash("A user with the same name or email is already registered!")
return redirect('/manage_admins') return redirect('/manage_admins')
else: else:
pw = secrets.token_urlsafe(16) pw = secrets.token_urlsafe(16)
new_user = user_datastore.create_user(username=form.name.data, email=form.email.data, roles=['admin'], new_user = user_datastore.create_user(username=form.name.data, email=form.email.data, roles=['admin'],
password=hash_password(pw)) password=hash_password(pw))
logger.info(f"Super admin {current_user.username} created new admin account for {new_user.username} <{new_user.email}>") logger.info(
flash(f"An account for the new admin user {new_user.username} has been created. Use the randomly generated password {pw} to log in.") f"Super admin {current_user.username} created new admin account for {new_user.username} <{new_user.email}>")
flash(
f"An account for the new admin user {new_user.username} has been created. Use the randomly generated password {pw} to log in.")
db.session.commit() db.session.commit()
return redirect('/manage_admins') return redirect('/manage_admins')
@ -239,7 +248,7 @@ def create_application(config):
return redirect('/manage_admins') return redirect('/manage_admins')
# set up form for confirming deletion # set up form for confirming deletion
form = TokenDeleteForm() form = ConfirmDeleteForm()
form.name_confirm.data = username form.name_confirm.data = username
if request.method == 'GET': if request.method == 'GET':
@ -247,9 +256,13 @@ def create_application(config):
return render_template('delete_admin.html', username=username, form=form) return render_template('delete_admin.html', username=username, form=form)
elif form.validate(): elif form.validate():
user_datastore.delete_user(user) user_datastore.delete_user(user)
flash(f"Username {username} was deleted.")
logger.info(f"Super admin {current_user.username} deleted admin user {username}") logger.info(f"Super admin {current_user.username} deleted admin user {username}")
db.session.commit() db.session.commit()
return redirect('/manage_admins') return redirect('/manage_admins')
else:
flash("Username does not match. User was not deleted!")
return redirect('/manage_admins')
@app.route('/admin_toggle_active/<username>') @app.route('/admin_toggle_active/<username>')
@roles_required('super_admin') @roles_required('super_admin')
@ -269,11 +282,74 @@ def create_application(config):
db.session.commit() db.session.commit()
return redirect('/manage_admins') return redirect('/manage_admins')
@app.route('/backup_user_datastore')
@roles_required('super_admin')
def backup_user_datastore():
# get list of defined admin users for backup
users = user_datastore.user_model.query.all()
user_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, 'password_hash': u.password}
for u in users if not u.has_role('super_admin')]
try:
with tempfile.TemporaryDirectory() as tmpdir:
file = Path(tmpdir, 'user_data.txt')
file.write_text(json.dumps(user_data))
return send_file(file, as_attachment=True, cache_timeout=-1)
except Exception as e:
return str(e)
@app.route('/restore_user_datastore', methods=['POST'])
@roles_required('super_admin')
def restore_user_datastore():
# check if the post request has the file part
if 'file' not in request.files:
flash('Keine Datei ausgewählt!')
return redirect(request.url)
file = request.files['file']
# if user does not select file, browser also
# submit an empty part without filename
if file.filename == '':
flash('Keine Datei ausgewählt!')
return redirect('/manage_admins')
filename = secure_filename(file.filename)
if file and filename.endswith('.txt'):
data = file.stream.read()
try:
# check validity of user data
user_data = json.loads(data)
valid = type(user_data) == list
valid &= all(type(d) == dict for d in user_data)
if valid:
for d in user_data:
entry_valid = set(d.keys()) == { 'active', 'email', 'password_hash', 'username'}
entry_valid &= all(len(d[key]) > 0 for key in ['email', 'password_hash', 'username'])
entry_valid &= type(d['active']) == bool
validate_email(d['email'])
if entry_valid:
existing_user = user_datastore.find_user(username=d['username'], email=d['email'])
if existing_user is None:
user_datastore.create_user(username=d['username'], email=d['email'], password=d['password_hash'],
roles=['admin'], active=d['active'])
flash(f"Admin Account für Benutzer '{d['username']} wurde wiederhergestellt.")
else:
flash(f"Admin '{d['username']} existiert bereits. Eintrag wird übersprungen.")
else:
raise ValueError(f"Ungültige Daten für User Entry {d}")
else:
raise ValueError("Admin User Datei hat ungültiges Format.")
except Exception as e:
flash(f"Die Datei konnte nicht gelesen werden. Exception: {e}")
return redirect('/manage_admins')
flash("Admin Benutzer aus Datei gelesen.")
db.session.commit()
else:
flash("Ungültige Dateiendung")
return redirect('/manage_admins')
# main page
@app.route('/') @app.route('/')
def door_lock(): def door_lock():
return render_template('index.html', door_state=door.state, encoder_position=door.encoder_position) return render_template('index.html', door_state=door.state, encoder_position=door.encoder_position)
# token overview
@app.route('/tokens') @app.route('/tokens')
@auth_required() @auth_required()
def list_tokens(): def list_tokens():
@ -282,7 +358,7 @@ def create_application(config):
inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']} inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']}
return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens) return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens)
# routes for registering, editing and deleting tokens
@app.route('/register-token', methods=['GET', 'POST']) @app.route('/register-token', methods=['GET', 'POST'])
@auth_required() @auth_required()
def register(): def register():
@ -315,7 +391,6 @@ def create_application(config):
else: else:
return render_template('register.html', token=door.get_most_recent_token(), form=form) return render_template('register.html', token=door.get_most_recent_token(), form=form)
@app.route('/edit-token/<token>', methods=['GET', 'POST']) @app.route('/edit-token/<token>', methods=['GET', 'POST'])
@auth_required() @auth_required()
def edit_token(token): def edit_token(token):
@ -332,7 +407,7 @@ def create_application(config):
""" """
form = TokenForm(request.form) form = TokenForm(request.form)
form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed
# to it before # to it before
if request.method == 'GET': if request.method == 'GET':
tokens = door.get_tokens() tokens = door.get_tokens()
if token in tokens: if token in tokens:
@ -371,7 +446,6 @@ def create_application(config):
else: else:
return render_template('edit.html', token=token, form=form) return render_template('edit.html', token=token, form=form)
@app.route('/store-token') @app.route('/store-token')
@auth_required() @auth_required()
def store_token(): def store_token():
@ -394,7 +468,6 @@ def create_application(config):
flash(f"Error during store_tokens. Exception: {e}") flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens') return redirect('/tokens')
@app.route('/delete-token/<token>', methods=['GET', 'POST']) @app.route('/delete-token/<token>', methods=['GET', 'POST'])
@auth_required() @auth_required()
def delete_token(token): def delete_token(token):
@ -411,7 +484,7 @@ def create_application(config):
token_to_delete = tokens[token] token_to_delete = tokens[token]
# set up form for confirming deletion # set up form for confirming deletion
form = TokenDeleteForm() form = ConfirmDeleteForm()
form.name_confirm.data = token_to_delete['name'] form.name_confirm.data = token_to_delete['name']
if request.method == 'GET': if request.method == 'GET':
@ -429,13 +502,13 @@ def create_application(config):
return redirect('/tokens') return redirect('/tokens')
else: else:
# form validation failed -> return to token overview and flash message # form validation failed -> return to token overview and flash message
flash(f"Der eingegebene Name stimmt nicht überein. Der Token {token} von {token_to_delete['name']} wurde nicht gelöscht.") flash(
f"Der eingegebene Name stimmt nicht überein. Der Token {token} von {token_to_delete['name']} wurde nicht gelöscht.")
return redirect('/tokens') return redirect('/tokens')
else: else:
flash(f'Ungültiger Token {token} für Löschung.') flash(f'Ungültiger Token {token} für Löschung.')
return redirect('/tokens') return redirect('/tokens')
@app.route('/deactivate-token/<token>') @app.route('/deactivate-token/<token>')
@auth_required() @auth_required()
def deactivate_token(token): def deactivate_token(token):
@ -466,6 +539,7 @@ def create_application(config):
flash(f'Could not open door. Exception: {e}') flash(f'Could not open door. Exception: {e}')
return redirect('/') return redirect('/')
# routes for opening and closing the door via the web interface
@app.route('/close') @app.route('/close')
@auth_required() @auth_required()
def close_door(): def close_door():