DoorAdmin/imaginaerraum_door_admin/webapp.py

516 lines
23 KiB
Python

import os
from flask import Flask, render_template, request, flash, redirect, session, url_for
from flask_wtf import FlaskForm
from wtforms.fields.html5 import DateField, EmailField
from wtforms.fields import StringField, BooleanField
from wtforms.validators import DataRequired, ValidationError, EqualTo
from flask_sqlalchemy import SQLAlchemy
from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password, uia_email_mapper, current_user, roles_required
from flask_security.models import fsqla_v2 as fsqla
from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import find_user, verify_password
from flask_mail import Mail
from email_validator import validate_email
import secrets
import bleach
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
from pathlib import Path
import logging
from datetime import date
from .door_handle import DoorHandle
def validate_valid_thru_date(form, field):
if form.limit_validity.data: # only check date format if limited validity of token is set
try:
if not field.data >= date.today():
raise ValueError
except ValueError as e:
flash("Ungültiges Datum")
raise ValidationError
return True
class TokenForm(FlaskForm):
name = StringField('Name', validators=[DataRequired()])
email = EmailField('E-Mail', validators=[DataRequired()])
organization = StringField('Organization', validators=[DataRequired()])
limit_validity = BooleanField('Gültigkeit begrenzen?')
valid_thru = DateField('Gültig bis', validators=[validate_valid_thru_date])
active = BooleanField('Aktiv?')
dsgvo = BooleanField('Einwilligung Nutzungsbedingungen erfragt?', validators=[DataRequired()])
class TokenDeleteForm(FlaskForm):
name = StringField('Name', validators=[DataRequired(), EqualTo('name_confirm', 'Name stimmt nicht überein')])
name_confirm = StringField('Name confirm')
class AdminCreationForm(FlaskForm):
name = StringField('Name', validators=[DataRequired()])
email = EmailField('E-Mail', validators=[DataRequired()])
def uia_username_mapper(identity):
# we allow pretty much anything - but we bleach it.
return bleach.clean(identity, strip=True)
def create_application(config):
# set up logging for the web app
logger = logging.getLogger('webapp')
logger.setLevel(logging.INFO)
if config.log_file is not None:
ch = logging.FileHandler(config.log_file)
ch.setLevel(logging.INFO)
else:
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
# do some checks for file existence etc.
if not Path(config.template_folder).exists():
logger.error(f'Flask template folder not found at {config.template_folder}')
if not Path(config.static_folder).exists():
logger.error(f'Flask static folder not found at {config.static_folder}')
if not Path(config.token_file).exists():
logger.warning(f"Token file not found at {config.token_file}")
# create door objects which provides access to the token file and current door state via MQTT
door = DoorHandle(token_file=config.token_file, mqtt_host=config.mqtt_host, nfc_socket=config.nfc_socket,
logger=logger)
app = Flask(__name__, template_folder=config.template_folder, static_folder=config.static_folder)
# Generate a nice key using secrets.token_urlsafe()
app.config['SECRET_KEY'] = os.environ.get("SECRET_KEY", 'Q7PJu2fg2jabYwP-Psop6c6f2G4')
# Bcrypt is set as default SECURITY_PASSWORD_HASH, which requires a salt
# Generate a good salt using: secrets.SystemRandom().getrandbits(128)
app.config['SECURITY_PASSWORD_SALT'] = os.environ.get("SECURITY_PASSWORD_SALT",
'10036796768252925167749545152988277953')
app.config['SECURITY_USER_IDENTITY_ATTRIBUTES'] = [
{"email": {"mapper": uia_email_mapper, "case_insensitive": True}},
{"username": {"mapper": uia_username_mapper}}
]
app.config['SECURITY_CHANGEABLE'] = True
app.config['SECURITY_RECOVERABLE'] = True
app.config['SECURITY_SEND_PASSWORD_CHANGE_EMAIL'] = False
# Mail Config
app.config['MAIL_SERVER'] = config.mail_server
app.config['MAIL_PORT'] = config.mail_port
app.config['MAIL_USE_TLS'] = config.mail_use_tls
app.config['MAIL_USE_SSL'] = config.mail_use_ssl
app.config['MAIL_USERNAME'] = config.mail_username
app.config['MAIL_PASSWORD'] = config.mail_password
app.config['MAIL_DEFAULT_SENDER'] = app.config['MAIL_USERNAME']
mail = Mail(app)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///admin.db'
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
# underlying engine. This option makes sure that DB connections from the
# pool are still valid. Important for entire application since
# many DBaaS options automatically close idle connections.
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
"pool_pre_ping": True,
}
# Create database connection object
db = SQLAlchemy(app)
# Define models
fsqla.FsModels.set_db_info(db)
class Role(db.Model, fsqla.FsRoleMixin):
pass
class User(db.Model, fsqla.FsUserMixin):
username = db.Column(db.String(255))
# LDAP
ldap_server = ldap3.Server(config.ldap_url)
local_ldap_cache = {} # dict for caching LDAP authorization locally (stores username + hashed password)
def validate_ldap(user, password):
"""Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized and the password is stored
locally for future authorization without internet connectivity.
If the server is not reachable we check the password against a locally stored password (if the user previously
authorized through LDAP).
Parameters
----------
user : username for the LDAP server
password : password for the LDAP server
Returns
-------
bool : result of the authorization process (True = success, False = failure)
"""
try:
con = ldap3.Connection(ldap_server, user="uid=%s,ou=Users,dc=imaginaerraum,dc=de" % (user.username,),
password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError:
# server reachable but user unauthorized -> fail
return False
except LDAPSocketOpenError:
# server not reachable -> try cached authorization data
return user.username in local_ldap_cache and verify_password(password, local_ldap_cache[user.username])
except Exception:
# for other Exceptions we just fail
return False
# TODO check if user has permission to edit tokens
# if LDAP authorization succeeds we cache the password locally (in memory) to allow LDAP authentication even if
# the server is not reachable
local_ldap_cache[user.username] = hash_password(password)
return True
class ExtendedLoginForm(LoginForm):
email = StringField('Benutzername oder E-Mail', [Required()])
password = PasswordField('Passwort', [Required()])
def validate(self):
# try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate()
if not authorized:
# try authorizing using LDAP
# authorization in LDAP uses username -> get username associated with email from the database
user = find_user(self.email.data)
authorized = validate_ldap(user, self.password.data)
if authorized:
logger.info(f"Admin user with credentials '{self.email.data}' authorized through LDAP")
else:
logger.info(f"Admin user with credentials '{self.email.data}' authorized through local database")
# if any of the authorization methods is successful we authorize the user
return authorized
# Setup Flask-Security
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security = Security(app, user_datastore, login_form=ExtendedLoginForm)
@app.route('/manage_admins', methods=['GET', 'POST'])
@roles_required('super_admin')
def manage_admins():
form = AdminCreationForm()
if request.method == 'GET':
users = user_datastore.user_model.query.all()
admin_data = [{'username': u.username, 'email': u.email, 'active': u.is_active} for u in users]
return render_template('admins.html', admin_data=admin_data, form=form)
elif form.validate():
if user_datastore.find_user(username=form.name.data) is not None or \
user_datastore.find_user(email=form.email.data) is not None:
flash("A user with the same name or email is already registered!")
return redirect('/manage_admins')
else:
pw = secrets.token_urlsafe(16)
new_user = user_datastore.create_user(username=form.name.data, email=form.email.data, roles=['admin'],
password=hash_password(pw))
logger.info(f"Super admin {current_user.username} created new admin account for {new_user.username} <{new_user.email}>")
flash(f"An account for the new admin user {new_user.username} has been created. Use the randomly generated password {pw} to log in.")
db.session.commit()
return redirect('/manage_admins')
@app.route('/delete_admins/<username>', methods=['GET', 'POST'])
@roles_required('super_admin')
def delete_admins(username):
user = user_datastore.find_user(username=username)
if user is None:
flash(f"Invalid user {username}")
return redirect('/manage_admins')
if user.has_role('super_admin'):
flash('Cannot delete super admins!')
return redirect('/manage_admins')
if user.is_active:
flash('Cannot delete active users. Please deactivate user first!')
return redirect('/manage_admins')
# set up form for confirming deletion
form = TokenDeleteForm()
form.name_confirm.data = username
if request.method == 'GET':
# return page asking the user to confirm delete
return render_template('delete_admin.html', username=username, form=form)
elif form.validate():
user_datastore.delete_user(user)
logger.info(f"Super admin {current_user.username} deleted admin user {username}")
db.session.commit()
return redirect('/manage_admins')
@app.route('/admin_toggle_active/<username>')
@roles_required('super_admin')
def admin_toggle_active(username):
user = user_datastore.find_user(username=username)
if user is None:
flash(f"Invalid user {username}")
return redirect('/manage_admins')
if user.has_role('super_admin'):
flash('Cannot deactivate super admins!')
return redirect('/manage_admins')
user_datastore.toggle_active(user)
if user.is_active:
logger.info(f"Super admin {current_user.username} activated access for admin user {username}")
else:
logger.info(f"Super admin {current_user.username} deactivated access for admin user {username}")
db.session.commit()
return redirect('/manage_admins')
@app.route('/')
def door_lock():
return render_template('index.html', door_state=door.state, encoder_position=door.encoder_position)
@app.route('/tokens')
@auth_required()
def list_tokens():
tokens = door.get_tokens()
assigned_tokens = {t: data for t, data in tokens.items() if not data['inactive']}
inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']}
return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens)
@app.route('/register-token', methods=['GET', 'POST'])
@auth_required()
def register():
"""Register new token for locking and unlocking the door.
This route displays the most recently scanned invalid token as reported in the logfile and provides a form for
entering user info (name, email, valid thru date (optional)) for the new token.
If the route is called via POST the provided form data is checked and if the check succeeds the /store-token route
will be called which adds the new token to the database.
"""
form = TokenForm()
if request.method == 'GET':
# set default valid thru date to today to make sure form validity check passes
# (will not be used if limited validity is disabled)
form.valid_thru.data = date.today()
return render_template('register.html', token=door.get_most_recent_token(), form=form)
elif request.method == 'POST' and form.validate():
# store data in session cookie
session['token'] = door.get_most_recent_token()['token']
session['name'] = form.name.data
session['email'] = form.email.data
session['organization'] = form.organization.data
if form.limit_validity.data:
session['valid_thru'] = form.valid_thru.data.isoformat()
else:
session['valid_thru'] = ''
session['inactive'] = not form.active.data
return redirect('/store-token')
else:
return render_template('register.html', token=door.get_most_recent_token(), form=form)
@app.route('/edit-token/<token>', methods=['GET', 'POST'])
@auth_required()
def edit_token(token):
"""Edit data in the token file (name, email, valid_thru date, active/inactive).
If the route is accessed via GET it will provide a form for editing the currently stored data for the user.
If the route is accessed via POST it will check if the form data is good and then store the modified user data in
the database (by redirecting to the /store-token route)
Parameters
----------
token : str
The token for which data should be edited.
"""
form = TokenForm(request.form)
form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed
# to it before
if request.method == 'GET':
tokens = door.get_tokens()
if token in tokens:
# set default for form according to values from the token file
et = tokens[token]
form.active.data = not et['inactive']
form.name.data = et['name'] if et['name'] else ''
form.email.data = et['email'] if et['email'] else ''
form.organization.data = et['organization'] if et['organization'] else ''
# for the valid thru date we use today's date in case there is not valid date in the database
try:
form.valid_thru.data = date.fromisoformat(et['valid_thru'])
form.limit_validity.data = True
except Exception:
form.valid_thru.data = date.today()
return render_template('edit.html', token=token, form=form)
else:
# flash an error message if the route is accessed with an invalid token
flash(f'Ausgewaehlter Token {token} in Tokenfile nicht gefunden.')
return redirect('/tokens')
elif request.method == 'POST':
if form.validate():
# store data in session cookie
session['token'] = token
session['name'] = form.name.data
session['organization'] = form.organization.data
session['email'] = form.email.data
if form.limit_validity.data:
session['valid_thru'] = form.valid_thru.data.isoformat()
else:
session['valid_thru'] = ''
session['inactive'] = not form.active.data
return redirect(f'/store-token')
else:
return render_template('edit.html', token=token, form=form)
@app.route('/store-token')
@auth_required()
def store_token():
"""Store token to the token file on disk.
This will use the token id and the associated data stored in the session cookie (filled by register_token() or
edit_token()) and create/modify a token and store the new token file to disk.
"""
token = session['token']
tokens = door.get_tokens()
tokens[token] = {'name': session['name'],
'email': session['email'],
'valid_thru': session['valid_thru'],
'inactive': session['inactive'],
'organization': session['organization']}
try:
door.store_tokens(tokens)
logger.info(f"Token {token} stored in database by admin user {current_user.username}")
except Exception as e:
flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens')
@app.route('/delete-token/<token>', methods=['GET', 'POST'])
@auth_required()
def delete_token(token):
"""Delete the given token from the token file and store the new token file to disk
Parameters
----------
token : str
The token to delete from the database.
"""
tokens = door.get_tokens()
if token in tokens:
token_to_delete = tokens[token]
# set up form for confirming deletion
form = TokenDeleteForm()
form.name_confirm.data = token_to_delete['name']
if request.method == 'GET':
# return page asking the user to confirm delete
return render_template('delete.html', token=token_to_delete, form=form)
elif form.validate():
# form validation successful -> can delete the token
tokens.pop(token)
try:
door.store_tokens(tokens)
logger.info(f"Token {token} was deleted from database by admin user {current_user.username}")
except Exception as e:
flash(f"Error during store_tokens. Exception: {e}")
flash(f"Token {token} wurde gelöscht!")
return redirect('/tokens')
else:
# form validation failed -> return to token overview and flash message
flash(f"Der eingegebene Name stimmt nicht überein. Der Token {token} von {token_to_delete['name']} wurde nicht gelöscht.")
return redirect('/tokens')
else:
flash(f'Ungültiger Token {token} für Löschung.')
return redirect('/tokens')
@app.route('/deactivate-token/<token>')
@auth_required()
def deactivate_token(token):
"""Deactivate access for the given token. This updates the token file on disk.
Parameters
----------
token : str
The token to deactivate.
"""
tokens = door.get_tokens()
if token in tokens:
tokens[token]['inactive'] = True
try:
door.store_tokens(tokens)
logger.info(f"Token {token} deactivated by admin user {current_user.username}")
except Exception as e:
flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens')
@app.route('/open')
@auth_required()
def open_door():
try:
door.open_door()
logger.info(f"Door opened by admin user {current_user.username}")
except Exception as e:
flash(f'Could not open door. Exception: {e}')
return redirect('/')
@app.route('/close')
@auth_required()
def close_door():
try:
door.close_door()
logger.info(f"Door closed by admin user {current_user.username}")
except Exception as e:
flash(f'Could not close door. Exception: {e}')
return redirect('/')
# setup user database when starting the app
with app.app_context():
new_admin_data = []
if config.admin_file is not None:
if not Path(config.admin_file).exists():
logger.warning(f"Admin user creation file not found at {config.admin_file}")
else:
# store data for new admins in memory s.t. the file can be deleted afterwards
with open(config.admin_file) as f:
for i, d in enumerate(f.readlines()):
try:
user, email, pw = d.split()
validate_email(email)
new_admin_data.append({'username': user, 'email': email, 'password': pw})
except Exception as e:
print(
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
# create admin users (only if they don't exists already)
def create_super_admins(new_admin_data):
db.create_all()
super_admin_role = user_datastore.find_or_create_role('super_admin') # root admin = can create other admins
admin_role = user_datastore.find_or_create_role('admin') # 'normal' admin
for d in new_admin_data:
if user_datastore.find_user(email=d['email'], username=d['username']) is None:
logger.info(f"New admin user created with username '{d['username']}' and email '{d['email']}'")
# create new admin (only if admin does not already exist)
new_admin = user_datastore.create_user(email=d['email'], username=d['username'],
password=hash_password(d['password']),
roles=[super_admin_role, admin_role])
db.session.commit()
create_super_admins(new_admin_data)
return app