import os from flask import Flask, render_template, request, flash, redirect, session, url_for from flask_wtf import FlaskForm from wtforms.fields.html5 import DateField, EmailField from wtforms.fields import StringField, BooleanField from wtforms.validators import DataRequired, ValidationError, EqualTo from flask_sqlalchemy import SQLAlchemy from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password, uia_email_mapper from flask_security.models import fsqla_v2 as fsqla from flask_security.forms import LoginForm, Required, PasswordField from flask_security.utils import find_user from flask_mail import Mail from email_validator import validate_email import bleach import ldap3 from datetime import date from .door_handle import DoorHandle def validate_valid_thru_date(form, field): if form.limit_validity.data: # only check date format if limited validity of token is set try: if not field.data >= date.today(): raise ValueError except ValueError as e: flash("Ungültiges Datum") raise ValidationError return True class TokenForm(FlaskForm): name = StringField('Name', validators=[DataRequired()]) email = EmailField('E-Mail', validators=[DataRequired()]) organization = StringField('Organization', validators=[DataRequired()]) limit_validity = BooleanField('Gültigkeit begrenzen?') valid_thru = DateField('Gültig bis', validators=[validate_valid_thru_date]) active = BooleanField('Aktiv?') dsgvo = BooleanField('Einwilligung Nutzungsbedingungen erfragt?', validators=[DataRequired()]) class TokenDeleteForm(FlaskForm): name = StringField('Name', validators=[DataRequired(), EqualTo('name_confirm', 'Name stimmt nicht überein')]) name_confirm = StringField('Name confirm') def uia_username_mapper(identity): # we allow pretty much anything - but we bleach it. return bleach.clean(identity, strip=True) def create_application(config): # create door objects which provides access to the token file and current door state via MQTT door = DoorHandle(token_file=config.token_file, mqtt_host=config.mqtt_host, nfc_socket=config.nfc_socket) app = Flask(__name__, template_folder=config.template_folder, static_folder=config.static_folder) # Generate a nice key using secrets.token_urlsafe() app.config['SECRET_KEY'] = os.environ.get("SECRET_KEY", 'Q7PJu2fg2jabYwP-Psop6c6f2G4') # Bcrypt is set as default SECURITY_PASSWORD_HASH, which requires a salt # Generate a good salt using: secrets.SystemRandom().getrandbits(128) app.config['SECURITY_PASSWORD_SALT'] = os.environ.get("SECURITY_PASSWORD_SALT", '10036796768252925167749545152988277953') app.config['SECURITY_USER_IDENTITY_ATTRIBUTES'] = [ {"email": {"mapper": uia_email_mapper, "case_insensitive": True}}, {"username": {"mapper": uia_username_mapper}} ] app.config['SECURITY_CHANGEABLE'] = True app.config['SECURITY_RECOVERABLE'] = True app.config['SECURITY_SEND_PASSWORD_CHANGE_EMAIL'] = False # Mail Config app.config['MAIL_SERVER'] = config.mail_server app.config['MAIL_PORT'] = config.mail_port app.config['MAIL_USE_TLS'] = config.mail_use_tls app.config['MAIL_USE_SSL'] = config.mail_use_ssl app.config['MAIL_USERNAME'] = config.mail_username app.config['MAIL_PASSWORD'] = config.mail_password app.config['MAIL_DEFAULT_SENDER'] = app.config['MAIL_USERNAME'] mail = Mail(app) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///admin.db' # As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the # underlying engine. This option makes sure that DB connections from the # pool are still valid. Important for entire application since # many DBaaS options automatically close idle connections. app.config["SQLALCHEMY_ENGINE_OPTIONS"] = { "pool_pre_ping": True, } # Create database connection object db = SQLAlchemy(app) # Define models fsqla.FsModels.set_db_info(db) class Role(db.Model, fsqla.FsRoleMixin): pass class User(db.Model, fsqla.FsUserMixin): username = db.Column(db.String(255)) # LDAP ldap_server = ldap3.Server(config.ldap_url) def validate_ldap(user, password): # try to connect to the LDAP server # if the connection completes successfully the given user and password is authorized try: con = ldap3.Connection(ldap_server, user="uid=%s,ou=Users,dc=imaginaerraum,dc=de" % (user.username,), password=password, auto_bind=True) except Exception: return False return con is not None class ExtendedLoginForm(LoginForm): email = StringField('Benutzername oder E-Mail', [Required()]) password = PasswordField('Passwort', [Required()]) def validate(self): # authorization in LDAP uses username -> get username associated with email from the database user = find_user(self.email.data) # try authorizing using LDAP response_ldap = validate_ldap(user, self.password.data) if response_ldap: # if LDAP authorization succeeds we update the currently stored password in the Flask user datastore # with the one used for LDAP authorization. This way we can authorize with the LDAP password later # even if the server is not reachable user.password = hash_password(self.password.data) # try authorizing using Flask security response_orig = super(ExtendedLoginForm, self).validate() # if any of the authorization methods is successful we authorize the user return response_ldap or response_orig app.config['SECURITY_MSG_USERID_NOT_PROVIDED'] = ('User ID not provided', 'error') # Setup Flask-Security user_datastore = SQLAlchemyUserDatastore(db, User, Role) security = Security(app, user_datastore, login_form=ExtendedLoginForm) # create admin users (only if they don't exists already) def create_admins(admin_user_file): with open(admin_user_file) as f: admin_data = f.readlines() for i, d in enumerate(admin_data): try: user, email, pw = d.split() if user_datastore.find_user(email=email, username=user) is None: validate_email(email) # create new admin (only if admin does not already exist) user_datastore.create_user(email=email, username=user, password=hash_password(pw)) except Exception as e: print(f"Error while parsing line {i} in admin config file. Config file should contain lines of " f"' \\n'\n Exception: {e}\nAdmin account could not be created.") db.session.commit() # Create a user to test with @app.before_first_request def create_user(): db.create_all() if config.admin_file is not None: # create admin accounts from given file create_admins(config.admin_file) db.session.commit() @app.route('/') def door_lock(): return render_template('index.html', door_state=door.state, encoder_position=door.encoder_position) @app.route('/tokens') @auth_required() def list_tokens(): tokens = door.get_tokens() assigned_tokens = {t: data for t, data in tokens.items() if not data['inactive']} inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']} return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens) @app.route('/register-token', methods=['GET', 'POST']) @auth_required() def register(): """Register new token for locking and unlocking the door. This route displays the most recently scanned invalid token as reported in the logfile and provides a form for entering user info (name, email, valid thru date (optional)) for the new token. If the route is called via POST the provided form data is checked and if the check succeeds the /store-token route will be called which adds the new token to the database. """ form = TokenForm() if request.method == 'GET': # set default valid thru date to today to make sure form validity check passes # (will not be used if limited validity is disabled) form.valid_thru.data = date.today() return render_template('register.html', token=door.get_most_recent_token(), form=form) elif request.method == 'POST' and form.validate(): # store data in session cookie session['token'] = door.get_most_recent_token()['token'] session['name'] = form.name.data session['email'] = form.email.data session['organization'] = form.organization.data if form.limit_validity.data: session['valid_thru'] = form.valid_thru.data.isoformat() else: session['valid_thru'] = '' session['inactive'] = not form.active.data return redirect(f'/store-token') else: return render_template('register.html', token=door.get_most_recent_token(), form=form) @app.route('/edit-token/', methods=['GET', 'POST']) @auth_required() def edit_token(token): """Edit data in the token file (name, email, valid_thru date, active/inactive). If the route is accessed via GET it will provide a form for editing the currently stored data for the user. If the route is accessed via POST it will check if the form data is good and then store the modified user data in the database (by redirecting to the /store-token route) Parameters ---------- token : str The token for which data should be edited. """ form = TokenForm(request.form) form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed # to it before if request.method == 'GET': tokens = door.get_tokens() if token in tokens: # set default for form according to values from the token file et = tokens[token] form.active.data = not et['inactive'] form.name.data = et['name'] if et['name'] else '' form.email.data = et['email'] if et['email'] else '' form.organization.data = et['organization'] if et['organization'] else '' # for the valid thru date we use today's date in case there is not valid date in the database try: form.valid_thru.data = date.fromisoformat(et['valid_thru']) form.limit_validity.data = True except Exception: form.valid_thru.data = date.today() return render_template('edit.html', token=token, form=form) else: # flash an error message if the route is accessed with an invalid token flash(f'Ausgewaehlter Token {token} in Tokenfile nicht gefunden.') return redirect('/tokens') elif request.method == 'POST': if form.validate(): # store data in session cookie session['token'] = token session['name'] = form.name.data session['organization'] = form.organization.data session['email'] = form.email.data if form.limit_validity.data: session['valid_thru'] = form.valid_thru.data.isoformat() else: session['valid_thru'] = '' session['inactive'] = not form.active.data return redirect(f'/store-token') else: return render_template('edit.html', token=token, form=form) @app.route('/store-token') @auth_required() def store_token(): """Store token to the token file on disk. This will use the token id and the associated data stored in the session cookie (filled by register_token() or edit_token()) and create/modify a token and store the new token file to disk. """ token = session['token'] tokens = door.get_tokens() tokens[token] = {'name': session['name'], 'email': session['email'], 'valid_thru': session['valid_thru'], 'inactive': session['inactive'], 'organization': session['organization']} try: door.store_tokens(tokens) except Exception as e: flash(f"Error during store_tokens. Exception: {e}") return redirect('/tokens') @app.route('/delete-token/', methods=['GET', 'POST']) @auth_required() def delete_token(token): """Delete the given token from the token file and store the new token file to disk Parameters ---------- token : str The token to delete from the database. """ tokens = door.get_tokens() if token in tokens: token_to_delete = tokens[token] # set up form for confirming deletion form = TokenDeleteForm() form.name_confirm.data = token_to_delete['name'] if request.method == 'GET': # return page asking the user to confirm delete return render_template('delete.html', token=token_to_delete, form=form) elif form.validate(): # form validation successful -> can delete the token tokens.pop(token) try: door.store_tokens(tokens) except Exception as e: flash(f"Error during store_tokens. Exception: {e}") flash(f"Token {token} wurde gelöscht!") return redirect('/tokens') else: # form validation failed -> return to token overview and flash message flash(f"Der eingegebene Name stimmt nicht überein. Der Token {token} von {token_to_delete['name']} wurde nicht gelöscht.") return redirect('/tokens') else: flash(f'Ungültiger Token {token} für Löschung.') return redirect('/tokens') @app.route('/deactivate-token/') @auth_required() def deactivate_token(token): """Deactivate access for the given token. This updates the token file on disk. Parameters ---------- token : str The token to deactivate. """ tokens = door.get_tokens() if token in tokens: tokens[token]['inactive'] = True try: door.store_tokens(tokens) except Exception as e: flash(f"Error during store_tokens. Exception: {e}") return redirect('/tokens') @app.route('/open') @auth_required() def open_door(): try: door.open_door() except Exception as e: flash(f'Could not open door. Exception: {e}') return redirect('/') @app.route('/close') @auth_required() def close_door(): try: door.close_door() except Exception as e: flash(f'Could not close door. Exception: {e}') return redirect('/') return app