from flask import Flask, render_template, request, flash, redirect, session, send_file from werkzeug.utils import secure_filename from flask_wtf import FlaskForm from wtforms.fields import DateField, EmailField from wtforms.fields import StringField, BooleanField from wtforms.validators import DataRequired, ValidationError, EqualTo from flask_sqlalchemy import SQLAlchemy from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password, uia_email_mapper, \ current_user, roles_required from flask_security.models import fsqla_v2 as fsqla from flask_security.forms import LoginForm, Required, PasswordField from flask_security.utils import find_user from flask_security.views import change_password from flask_mail import Mail from email_validator import validate_email import json import secrets import bleach import ldap3 from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError from pathlib import Path import logging import tempfile from datetime import date, datetime, timedelta from .door_handle import DoorHandle def validate_valid_thru_date(form, field): if form.limit_validity.data: # only check date format if limited validity of token is set try: if not field.data >= date.today(): raise ValueError except ValueError as e: flash("Ungültiges Datum") raise ValidationError return True class TokenForm(FlaskForm): name = StringField('Name', validators=[DataRequired()]) email = EmailField('E-Mail', validators=[DataRequired()]) organization = StringField('Organization', validators=[DataRequired()]) limit_validity = BooleanField('Gültigkeit begrenzen?') valid_thru = DateField('Gültig bis', validators=[validate_valid_thru_date]) active = BooleanField('Aktiv?') dsgvo = BooleanField('Einwilligung Nutzungsbedingungen erfragt?', validators=[DataRequired()]) class ConfirmDeleteForm(FlaskForm): name = StringField('Name', validators=[DataRequired(), EqualTo('name_confirm', 'Name stimmt nicht überein')]) name_confirm = StringField('Name confirm') class AdminCreationForm(FlaskForm): name = StringField('Name', validators=[DataRequired()]) email = EmailField('E-Mail', validators=[DataRequired()]) def uia_username_mapper(identity): # we allow pretty much anything - but we bleach it. return bleach.clean(identity, strip=True) def create_application(config): # set up logging for the web app logger = logging.getLogger('webapp') logger.setLevel(logging.INFO) if config.log_file is not None: ch = logging.FileHandler(config.log_file) ch.setLevel(logging.INFO) else: # create console handler and set level to debug ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) # create formatter formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # add formatter to ch ch.setFormatter(formatter) # add ch to logger logger.addHandler(ch) # do some checks for file existence etc. try: with open(config.key_file) as f: data = f.readlines() if 'SECRET_KEY' in data[0]: secret_key = data[0].split()[-1] else: raise Exception("Could not read SECURITY_PASSWORD_SALT") if 'SECURITY_PASSWORD_SALT' in data[1]: security_password_salt = data[1].split()[-1] else: raise Exception("Could not read SECURITY_PASSWORD_SALT") except Exception as e: logger.warning(f"Flask keys could not be read from file at {Path(config.key_file).absolute()}. Exception: {e}. Using default values instead.") secret_key = 'Q7PJu2fg2jabYwP-Psop6c6f2G4' security_password_salt = '10036796768252925167749545152988277953' if Path(config.template_folder).is_absolute(): if not Path(config.template_folder).exists(): logger.error(f'Flask template folder not found at {Path(config.template_folder).absolute()}') else: if not (Path(__file__).parent / config.template_folder).exists(): logger.error(f'Flask template folder not found at {(Path(__file__).parent / config.template_folder).absolute()}') if Path(config.static_folder).is_absolute(): if not Path(config.static_folder).exists(): logger.error(f'Flask static folder not found at {Path(config.static_folder).absolute()}') else: if not (Path(__file__).parent / config.static_folder).exists(): logger.error(f'Flask static folder not found at {(Path(__file__).parent / config.static_folder).absolute()}') if not Path(config.token_file).exists(): logger.warning(f"Token file not found at {Path(config.token_file).absolute()}") # create door objects which provides access to the token file and current door state via MQTT door = DoorHandle(token_file=config.token_file, mqtt_host=config.mqtt_host, nfc_socket=config.nfc_socket, logger=logger) app = Flask(__name__, template_folder=config.template_folder, static_folder=config.static_folder) # Generate a nice key using secrets.token_urlsafe() app.config['SECRET_KEY'] = secret_key # Bcrypt is set as default SECURITY_PASSWORD_HASH, which requires a salt # Generate a good salt using: secrets.SystemRandom().getrandbits(128) app.config['SECURITY_PASSWORD_SALT'] = security_password_salt app.config['SECURITY_USER_IDENTITY_ATTRIBUTES'] = [ {"email": {"mapper": uia_email_mapper, "case_insensitive": True}}, {"username": {"mapper": uia_username_mapper}} ] app.config['SECURITY_CHANGEABLE'] = True app.config['SECURITY_RECOVERABLE'] = True app.config['SECURITY_SEND_PASSWORD_CHANGE_EMAIL'] = False # Mail Config app.config['MAIL_SERVER'] = config.mail_server app.config['MAIL_PORT'] = config.mail_port app.config['MAIL_USE_TLS'] = config.mail_use_tls app.config['MAIL_USE_SSL'] = config.mail_use_ssl app.config['MAIL_USERNAME'] = config.mail_username app.config['MAIL_PASSWORD'] = config.mail_password app.config['MAIL_DEFAULT_SENDER'] = app.config['MAIL_USERNAME'] mail = Mail(app) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///admin.db' # As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the # underlying engine. This option makes sure that DB connections from the # pool are still valid. Important for entire application since # many DBaaS options automatically close idle connections. app.config["SQLALCHEMY_ENGINE_OPTIONS"] = { "pool_pre_ping": True, } # Create database connection object db = SQLAlchemy(app) # Define models fsqla.FsModels.set_db_info(db) class Role(db.Model, fsqla.FsRoleMixin): pass class User(db.Model, fsqla.FsUserMixin): pass # LDAP ldap_server = ldap3.Server(config.ldap_url) def validate_ldap(username, password): """Validate the user and password through an LDAP server. If the connection completes successfully the given user and password is authorized. Then the permissions and additional information of the user are obtained through an LDAP search. The data is stored in a dict which will be used later to create/update the entry for the user in the local database. Parameters ---------- username : username for the LDAP server password : password for the LDAP server Returns ------- bool : result of the authorization process (True = success, False = failure) dict : dictionary with information about an authorized user (contains username, email, hashed password, roles) """ try: con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de", password=password, auto_bind=True) except ldap3.core.exceptions.LDAPBindError: # server reachable but user unauthorized -> fail return False, None except LDAPSocketOpenError: # server not reachable -> fail (but will try authorization from local database later) return False, None except Exception as e: # for other Exceptions we just fail return False, None # get user data and permissions from LDAP server new_user_data = {} new_user_data['username'] = username new_user_data['password'] = hash_password(password) new_user_data['roles'] = [] lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de', f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))', attributes=ldap3.ALL_ATTRIBUTES) authorized = True if lock_permission: new_user_data['email'] = con.entries[0].mail.value else: authorized = False token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de', f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))') if token_granting_permission: new_user_data['roles'].append('admin') return authorized, new_user_data class ExtendedLoginForm(LoginForm): email = StringField('Benutzername oder E-Mail', [Required()]) password = PasswordField('Passwort', [Required()]) remember = BooleanField('Login merken?') def validate(self): # search for user in the current database user = find_user(self.email.data) if user is not None: # if a user is found we check if it is associated with LDAP or with the local database if user.has_role('local'): # try authorizing locally using Flask security user datastore authorized = super(ExtendedLoginForm, self).validate() if authorized: logger.info(f"User with credentials '{self.email.data}' authorized through local database") else: # run LDAP authorization # if the authorization succeeds we also get the new_user_data dict which contains information about # the user's permissions etc. authorized, new_user_data = validate_ldap(user.username, self.password.data) if authorized: logger.info(f"User with credentials '{self.email.data}' authorized through LDAP") # update permissions and password/email to stay up to date for login with no network connection user.email = new_user_data['email'] user.password = new_user_data['password'] for role in new_user_data['roles']: user_datastore.add_role_to_user(user, role) user_datastore.commit() self.user = user else: self.password.errors = ['Invalid password'] else: # this means there is no user with that email in the database # we assume that the username was entered instead of an email and use that for authentication with LDAP username = self.email.data # try LDAP authorization and create a new user if it succeeds authorized, new_user_data = validate_ldap(username, self.password.data) if authorized: # if there was no user in the database before we create a new user self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'], password=new_user_data['password'], roles=new_user_data['roles']) user_datastore.commit() logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after" " successful LDAP authorization") # if any of the authorization methods is successful we authorize the user return authorized # we override the change_password view from flask security to only allow local users to change their passwords # LDAP users should use the LDAP self service for changing passwords # this route needs to be defined before the Flask Security setup @app.route('/change', methods=['GET', 'POST']) @auth_required() def change_pw(): if current_user.has_role('local'): # local users can change their password return change_password() else: # LDAP users get redirected to the LDAP self service return redirect('https://ldap.imaginaerraum.de/') # Setup Flask-Security user_datastore = SQLAlchemyUserDatastore(db, User, Role) security = Security(app, user_datastore, login_form=ExtendedLoginForm) # admin user management @app.route('/manage_admins', methods=['GET', 'POST']) @roles_required('super_admin') def manage_admins(): form = AdminCreationForm() if request.method == 'GET': users = user_datastore.user_model.query.all() admin_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, 'admin': u.has_role('admin'), 'super_admin': u.has_role('super_admin'), } for u in users] return render_template('admins.html', admin_data=admin_data, form=form) elif form.validate(): if user_datastore.find_user(username=form.name.data) is not None or \ user_datastore.find_user(email=form.email.data) is not None: flash("Ein Benutzer mit diesem Nutzernamen oder dieser E-Mail-Adresse existiert bereits!") return redirect('/manage_admins') else: pw = secrets.token_urlsafe(16) new_user = user_datastore.create_user(username=form.name.data, email=form.email.data, password=hash_password(pw)) user_datastore.add_role_to_user(new_user, 'local') logger.info( f"Super admin {current_user.username} created new user account for {new_user.username} <{new_user.email}>") flash(f"Ein Account für den Nutzer {new_user.username} wurde erstellt. Verwende das Passwort {pw} um den Nutzer einzuloggen.") db.session.commit() return redirect('/manage_admins') @app.route('/delete_admins/', methods=['GET', 'POST']) @roles_required('super_admin') def delete_admins(username): user = user_datastore.find_user(username=username) if user is None: flash(f"Ungültiger Nutzer {username}") return redirect('/manage_admins') if user.has_role('super_admin'): flash('Super-Admins können nicht gelöscht werden!') return redirect('/manage_admins') if user.is_active: flash('Aktive Nutzer können nicht gelöscht werden! Bitte den Benutzer zuerst deaktivieren.') return redirect('/manage_admins') # set up form for confirming deletion form = ConfirmDeleteForm() form.name_confirm.data = username if request.method == 'GET': # return page asking the user to confirm delete return render_template('delete_user.html', username=username, form=form) elif form.validate(): user_datastore.delete_user(user) flash(f"Benutzer {username} wurde gelöscht.") logger.info(f"Super admin {current_user.username} deleted admin user {username}") db.session.commit() return redirect('/manage_admins') else: flash("Der eingegebene Nutzername stimmt nicht überein. Der Benutzer wurde nicht gelöscht!") return redirect('/manage_admins') @app.route('/admin_toggle_active/') @roles_required('super_admin') def admin_toggle_active(username): user = user_datastore.find_user(username=username) if user is None: flash(f"Ungültiger Nutzer {username}") return redirect('/manage_admins') if user.has_role('super_admin'): flash('Super-Admins können nicht deaktiviert werden!') return redirect('/manage_admins') user_datastore.toggle_active(user) if user.is_active: logger.info(f"Super admin {current_user.username} activated access for admin user {username}") else: logger.info(f"Super admin {current_user.username} deactivated access for admin user {username}") db.session.commit() return redirect('/manage_admins') @app.route('/promote_admin/') @roles_required('super_admin') def promote_admin(username): user = user_datastore.find_user(username=username) if user is None: flash(f"Ungültiger Nutzer {username}") return redirect('/manage_admins') if user.has_role('admin'): flash(f'Benutzer {username} hat bereits Admin-Rechte!') return redirect('/manage_admins') user_datastore.add_role_to_user(user, 'admin') logger.info(f"Super admin {current_user.username} granted admin privileges to user {username}") db.session.commit() return redirect('/manage_admins') @app.route('/demote_admin/') @roles_required('super_admin') def demote_admin(username): user = user_datastore.find_user(username=username) if user is None: flash(f"Ungültiger Nutzer {username}") return redirect('/manage_admins') if user.has_role('super_admin'): flash(f'Benutzer {username} hat Super-Admin-Rechte und kann nicht verändert werden!') return redirect('/manage_admins') if user.has_role('admin'): user_datastore.remove_role_from_user(user, 'admin') logger.info(f"Super admin {current_user.username} revoked admin privileges of user {username}") db.session.commit() else: flash(f'Benutzer {username} ist bereits kein Admin!') return redirect('/manage_admins') @app.route('/backup_user_datastore') @roles_required('super_admin') def backup_user_datastore(): # get list of defined admin users for backup users = user_datastore.user_model.query.all() user_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, 'password_hash': u.password, 'roles': [r.name for r in u.roles]} for u in users if not u.has_role('super_admin')] try: with tempfile.TemporaryDirectory() as tmpdir: file = Path(tmpdir, 'user_data.txt') file.write_text(json.dumps(user_data)) return send_file(file, as_attachment=True, cache_timeout=-1) except Exception as e: return str(e) @app.route('/restore_user_datastore', methods=['POST']) @roles_required('super_admin') def restore_user_datastore(): # check if the post request has the file part if 'file' not in request.files: flash('Keine Datei ausgewählt!') return redirect(request.url) file = request.files['file'] # if user does not select file, browser also # submit an empty part without filename if file.filename == '': flash('Keine Datei ausgewählt!') return redirect('/manage_admins') filename = secure_filename(file.filename) if file and filename.endswith('.txt'): data = file.stream.read() try: # check validity of user data user_data = json.loads(data) valid = type(user_data) == list valid &= all(type(d) == dict for d in user_data) if valid: for d in user_data: entry_valid = set(d.keys()) == { 'active', 'email', 'password_hash', 'username', 'roles'} entry_valid &= all(len(d[key]) > 0 for key in ['email', 'password_hash', 'username']) entry_valid &= type(d['active']) == bool entry_valid &= type(d['roles']) == list validate_email(d['email']) if entry_valid: existing_user = user_datastore.find_user(username=d['username'], email=d['email']) if existing_user is None: user_datastore.create_user(username=d['username'], email=d['email'], password=d['password_hash'], active=d['active'], roles=d['roles']) flash(f"Account für Benutzer '{d['username']} wurde wiederhergestellt.") else: flash(f"Benutzer '{d['username']} existiert bereits. Eintrag wird übersprungen.") else: raise ValueError(f"Ungültige Daten für User Entry {d}") else: raise ValueError("Admin User Datei hat ungültiges Format.") except Exception as e: flash(f"Die Datei konnte nicht gelesen werden. Exception: {e}") return redirect('/manage_admins') flash("Benutzer aus Datei gelesen.") db.session.commit() else: flash("Ungültige Dateiendung") return redirect('/manage_admins') # main page @app.route('/') def door_lock(): return render_template('index.html', door_state=door.state, encoder_position=door.encoder_position) # token overview @app.route('/tokens') @roles_required('admin') def list_tokens(): tokens = door.get_tokens() assigned_tokens = {t: data for t, data in tokens.items() if not data['inactive']} inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']} return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens) @app.route('/token-log') @roles_required('super_admin') def token_log(): log = [] try: with open(config.nfc_log) as f: log += f.readlines() log.reverse() log = [l.split(' - ') for l in log] return render_template('token_log.html', log=log) except Exception as e: flash(f"NFC logfile {Path(config.nfc_log).absolute()} konnte nicht gelesen werden. Exception: {e}") return redirect('/') # routes for registering, editing and deleting tokens @app.route('/register-token', methods=['GET', 'POST']) @roles_required('admin') def register(): """Register new token for locking and unlocking the door. This route displays the most recently scanned invalid token as reported in the logfile and provides a form for entering user info (name, email, valid thru date (optional)) for the new token. If the route is called via POST the provided form data is checked and if the check succeeds the /store-token route will be called which adds the new token to the database. """ token = door.get_most_recent_token() recent_token = {} if {'token', 'timestamp'}.issubset(set(token.keys())): dt = datetime.utcnow() - token['timestamp'] if dt < timedelta(minutes=10): recent_token = token recent_token['timedelta_minutes'] = int(dt.total_seconds() / 60.0) form = TokenForm() if request.method == 'GET': # set default valid thru date to today to make sure form validity check passes # (will not be used if limited validity is disabled) form.valid_thru.data = date.today() return render_template('register.html', token=recent_token, form=form) elif request.method == 'POST' and form.validate(): # store data in session cookie session['token'] = door.get_most_recent_token()['token'] session['name'] = form.name.data session['email'] = form.email.data session['organization'] = form.organization.data if form.limit_validity.data: session['valid_thru'] = form.valid_thru.data.isoformat() else: session['valid_thru'] = '' session['inactive'] = not form.active.data return redirect('/store-token') else: return render_template('register.html', token=recent_token, form=form) @app.route('/edit-token/', methods=['GET', 'POST']) @roles_required('admin') def edit_token(token): """Edit data in the token file (name, email, valid_thru date, active/inactive). If the route is accessed via GET it will provide a form for editing the currently stored data for the user. If the route is accessed via POST it will check if the form data is good and then store the modified user data in the database (by redirecting to the /store-token route) Parameters ---------- token : str The token for which data should be edited. """ form = TokenForm(request.form) form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed # to it before if request.method == 'GET': tokens = door.get_tokens() if token in tokens: # set default for form according to values from the token file et = tokens[token] form.active.data = not et['inactive'] form.name.data = et['name'] if et['name'] else '' form.email.data = et['email'] if et['email'] else '' form.organization.data = et['organization'] if et['organization'] else '' # for the valid thru date we use today's date in case there is not valid date in the database try: form.valid_thru.data = date.fromisoformat(et['valid_thru']) form.limit_validity.data = True except Exception: form.valid_thru.data = date.today() return render_template('edit.html', token=token, form=form) else: # flash an error message if the route is accessed with an invalid token flash(f'Ausgewaehlter Token {token} in Tokenfile nicht gefunden.') return redirect('/tokens') elif request.method == 'POST': if form.validate(): # store data in session cookie session['token'] = token session['name'] = form.name.data session['organization'] = form.organization.data session['email'] = form.email.data if form.limit_validity.data: session['valid_thru'] = form.valid_thru.data.isoformat() else: session['valid_thru'] = '' session['inactive'] = not form.active.data return redirect(f'/store-token') else: return render_template('edit.html', token=token, form=form) @app.route('/store-token') @roles_required('admin') def store_token(): """Store token to the token file on disk. This will use the token id and the associated data stored in the session cookie (filled by register_token() or edit_token()) and create/modify a token and store the new token file to disk. """ token = session['token'] tokens = door.get_tokens() tokens[token] = {'name': session['name'], 'email': session['email'], 'valid_thru': session['valid_thru'], 'inactive': session['inactive'], 'organization': session['organization']} try: door.store_tokens(tokens) logger.info(f"Token {token} stored in database by admin user {current_user.username}") except Exception as e: flash(f"Error during store_tokens. Exception: {e}") return redirect('/tokens') @app.route('/delete-token/', methods=['GET', 'POST']) @roles_required('admin') def delete_token(token): """Delete the given token from the token file and store the new token file to disk Parameters ---------- token : str The token to delete from the database. """ tokens = door.get_tokens() if token in tokens: token_to_delete = tokens[token] # set up form for confirming deletion form = ConfirmDeleteForm() form.name_confirm.data = token_to_delete['name'] if request.method == 'GET': # return page asking the user to confirm delete return render_template('delete.html', token=token_to_delete, form=form) elif form.validate(): # form validation successful -> can delete the token tokens.pop(token) try: door.store_tokens(tokens) logger.info(f"Token {token} was deleted from database by admin user {current_user.username}") except Exception as e: flash(f"Error during store_tokens. Exception: {e}") flash(f"Token {token} wurde gelöscht!") return redirect('/tokens') else: # form validation failed -> return to token overview and flash message flash( f"Der eingegebene Name stimmt nicht überein. Der Token {token} von {token_to_delete['name']} wurde nicht gelöscht.") return redirect('/tokens') else: flash(f'Ungültiger Token {token} für Löschung.') return redirect('/tokens') @app.route('/deactivate-token/') @roles_required('admin') def deactivate_token(token): """Deactivate access for the given token. This updates the token file on disk. Parameters ---------- token : str The token to deactivate. """ tokens = door.get_tokens() if token in tokens: tokens[token]['inactive'] = True try: door.store_tokens(tokens) logger.info(f"Token {token} deactivated by admin user {current_user.username}") except Exception as e: flash(f"Error during store_tokens. Exception: {e}") return redirect('/tokens') @app.route('/backup_tokens') @roles_required('admin') def backup_tokens(): # get list of defined admin users for backup tokens = door.get_tokens() try: with tempfile.TemporaryDirectory() as tmpdir: file = Path(tmpdir, 'token_data.txt') file.write_text(json.dumps(tokens)) return send_file(file, as_attachment=True, cache_timeout=-1) except Exception as e: return str(e) @app.route('/open') @auth_required() def open_door(): try: door.open_door(user=current_user.username) logger.info(f"Door opened by admin user {current_user.username}") except Exception as e: flash(f'Could not open door. Exception: {e}') return redirect('/') # routes for opening and closing the door via the web interface @app.route('/close') @auth_required() def close_door(): try: door.close_door(user=current_user.username) logger.info(f"Door closed by admin user {current_user.username}") except Exception as e: flash(f'Could not close door. Exception: {e}') return redirect('/') # setup user database when starting the app with app.app_context(): new_admin_data = [] if config.admin_file is not None: if not Path(config.admin_file).exists(): logger.warning(f"Admin user creation file not found at {config.admin_file}") else: # store data for new admins in memory s.t. the file can be deleted afterwards with open(config.admin_file) as f: for i, line in enumerate(f.readlines()): if not line.strip().startswith('#'): try: user, email, pw = line.split() validate_email(email) new_admin_data.append({'username': user, 'email': email, 'password': pw}) except Exception as e: print( f"Error while parsing line {i} in admin config file. Config file should contain lines of " f"' \\n'\n Exception: {e}\nAdmin account could not be created.") # create admin users (only if they don't exists already) def create_super_admins(new_admin_data): db.create_all() super_admin_role = user_datastore.find_or_create_role('super_admin') # root admin = can create other admins admin_role = user_datastore.find_or_create_role('admin') # 'normal' admin local_role = user_datastore.find_or_create_role('local') # LDAP user or local user for d in new_admin_data: if user_datastore.find_user(email=d['email'], username=d['username']) is None: roles = [super_admin_role, admin_role] if not d['password'] == 'LDAP': roles.append(local_role) logger.info(f"New super admin user created with username '{d['username']}' and email '{d['email']}', roles = {[r.name for r in roles]}") # create new admin (only if admin does not already exist) new_admin = user_datastore.create_user(email=d['email'], username=d['username'], password=hash_password(d['password']), roles=roles) db.session.commit() create_super_admins(new_admin_data) return app