started refactoring:

- use blueprint
- read configuration from file (default_app_config.py) and additional file specified by APPLICATION_SETTINGS environment variable
This commit is contained in:
Simon Pirkelmann 2022-01-25 21:42:35 +01:00
parent ba9379449a
commit 38164aca4b
6 changed files with 737 additions and 669 deletions

View File

@ -0,0 +1,170 @@
import logging
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_security.models import fsqla_v2 as fsqla
from flask_security import Security, SQLAlchemyUserDatastore
import ldap3
from pathlib import Path
from .webapp import door_app
from .door_handle import DoorHandle
from .auth import ExtendedLoginForm
security = Security()
def setup_logging(app):
# set up logging for the web app
logger = logging.getLogger('webapp')
logger.setLevel(logging.INFO)
if app.config['LOG_FILE'] is not None:
ch = logging.FileHandler(app.config['LOG_FILE'])
ch.setLevel(logging.INFO)
else:
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
return logger
def create_app():
app = Flask(__name__)
app.config.from_object('imaginaerraum_door_admin.default_app_config.DefaultConfig')
app.config.from_envvar('APPLICATION_SETTINGS')
logger = setup_logging(app)
# do some checks for file existence etc.
try:
with open(app.config['KEY_FILE']) as f:
data = f.readlines()
if 'SECRET_KEY' in data[0]:
secret_key = data[0].split()[-1]
else:
raise Exception("Could not read SECURITY_PASSWORD_SALT")
if 'SECURITY_PASSWORD_SALT' in data[1]:
security_password_salt = data[1].split()[-1]
else:
raise Exception("Could not read SECURITY_PASSWORD_SALT")
except Exception as e:
logger.warning(
f"Flask keys could not be read from file at {Path(app.config['KEY_FILE']).absolute()}. Exception: {e}. Using default values instead.")
secret_key = 'Q7PJu2fg2jabYwP-Psop6c6f2G4'
security_password_salt = '10036796768252925167749545152988277953'
if Path(app.config['TEMPLATE_FOLDER']).is_absolute():
if not Path(app.config['TEMPLATE_FOLDER']).exists():
logger.error(
f"Flask template folder not found at {Path(app.config['TEMPLATE_FOLDER']).absolute()}")
else:
if not (Path(__file__).parent / app.config['TEMPLATE_FOLDER']).exists():
logger.error(
f"Flask template folder not found at {(Path(__file__).parent / app.config['TEMPLATE_FOLDER']).absolute()}")
if Path(app.config['STATIC_FOLDER']).is_absolute():
if not Path(app.config['STATIC_FOLDER']).exists():
logger.error(
f"Flask static folder not found at {Path(app.config['STATIC_FOLDER']).absolute()}")
else:
if not (Path(__file__).parent / app.config['STATIC_FOLDER']).exists():
logger.error(
f"Flask static folder not found at {(Path(__file__).parent / app.config['STATIC_FOLDER']).absolute()}")
if not Path(app.config['TOKEN_FILE']).exists():
logger.warning(
f"Token file not found at {Path(app.config['TOKEN_FILE']).absolute()}")
# create door objects which provides access to the token file and current door state via MQTT
app.door = DoorHandle(token_file=app.config['TOKEN_FILE'], mqtt_host=app.config['MQTT_HOST'],
nfc_socket=app.config['NFC_SOCKET'],
logger=logger)
# Mail Config
#mail = Mail(app)
from . import webapp
#app.register_blueprint
# Create database connection object
db = SQLAlchemy(app)
# Define models
fsqla.FsModels.set_db_info(db)
class Role(db.Model, fsqla.FsRoleMixin):
pass
class User(db.Model, fsqla.FsUserMixin):
pass
app.register_blueprint(door_app)
# setup user database when starting the app
# with app.app_context():
# new_admin_data = []
# if config.admin_file is not None:
# if not Path(config.admin_file).exists():
# logger.warning(
# f"Admin user creation file not found at {config.admin_file}")
# else:
# # store data for new admins in memory s.t. the file can be deleted afterwards
# with open(config.admin_file) as f:
# for i, line in enumerate(f.readlines()):
# if not line.strip().startswith('#'):
# try:
# user, email, pw = line.split()
# validate_email(email)
# new_admin_data.append(
# {'username': user, 'email': email,
# 'password': pw})
# except Exception as e:
# print(
# f"Error while parsing line {i} in admin config file. Config file should contain lines of "
# f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
#
# # create admin users (only if they don't exists already)
# def create_super_admins(new_admin_data):
# db.create_all()
# super_admin_role = user_datastore.find_or_create_role(
# 'super_admin') # root admin = can create other admins
# admin_role = user_datastore.find_or_create_role(
# 'admin') # 'normal' admin
# local_role = user_datastore.find_or_create_role(
# 'local') # LDAP user or local user
#
# for d in new_admin_data:
# if user_datastore.find_user(email=d['email'],
# username=d['username']) is None:
# roles = [super_admin_role, admin_role]
# if not d['password'] == 'LDAP':
# roles.append(local_role)
# logger.info(
# f"New super admin user created with username '{d['username']}' and email '{d['email']}', roles = {[r.name for r in roles]}")
#
# # create new admin (only if admin does not already exist)
# new_admin = user_datastore.create_user(email=d['email'],
# username=d[
# 'username'],
# password=hash_password(
# d['password']),
# roles=roles)
# db.session.commit()
#
# create_super_admins(new_admin_data)
ldap_server = ldap3.Server(app.config['LDAP_URL'])
# Setup Flask-Security
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
return app

View File

@ -0,0 +1,111 @@
from wtforms.fields import StringField, BooleanField
from flask_security import hash_password
from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import find_user
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
class ExtendedLoginForm(LoginForm):
email = StringField('Benutzername oder E-Mail', [Required()])
password = PasswordField('Passwort', [Required()])
remember = BooleanField('Login merken?')
def validate(self):
# search for user in the current database
user = find_user(self.email.data)
if user is not None:
# if a user is found we check if it is associated with LDAP or with the local database
if user.has_role('local'):
# try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate()
if authorized:
logger.info(f"User with credentials '{self.email.data}' authorized through local database")
else:
# run LDAP authorization
# if the authorization succeeds we also get the new_user_data dict which contains information about
# the user's permissions etc.
authorized, new_user_data = validate_ldap(user.username, self.password.data)
if authorized:
logger.info(f"User with credentials '{self.email.data}' authorized through LDAP")
# update permissions and password/email to stay up to date for login with no network connection
user.email = new_user_data['email']
user.password = new_user_data['password']
for role in new_user_data['roles']:
user_datastore.add_role_to_user(user, role)
user_datastore.commit()
self.user = user
else:
self.password.errors = ['Invalid password']
else:
# this means there is no user with that email in the database
# we assume that the username was entered instead of an email and use that for authentication with LDAP
username = self.email.data
# try LDAP authorization and create a new user if it succeeds
authorized, new_user_data = validate_ldap(username, self.password.data)
if authorized:
# if there was no user in the database before we create a new user
self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
password=new_user_data['password'], roles=new_user_data['roles'])
user_datastore.commit()
logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
" successful LDAP authorization")
# if any of the authorization methods is successful we authorize the user
return authorized
def validate_ldap(username, password):
"""Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized.
Then the permissions and additional information of the user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update the entry for the user in the local
database.
Parameters
----------
username : username for the LDAP server
password : password for the LDAP server
Returns
-------
bool : result of the authorization process (True = success, False = failure)
dict : dictionary with information about an authorized user (contains username, email, hashed password,
roles)
"""
try:
con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError:
# server not reachable -> fail (but will try authorization from local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
return False, None
# get user data and permissions from LDAP server
new_user_data = {}
new_user_data['username'] = username
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))',
attributes=ldap3.ALL_ATTRIBUTES)
authorized = True
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
authorized = False
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
if token_granting_permission:
new_user_data['roles'].append('admin')
return authorized, new_user_data

View File

@ -0,0 +1,55 @@
print("loading default config")
import bleach
from flask_security import uia_email_mapper
def uia_username_mapper(identity):
# we allow pretty much anything - but we bleach it.
return bleach.clean(identity, strip=True)
class DefaultConfig(object):
DEBUG = False
SECRET_KEY = 'supersecret'
TEMPLATE_FOLDER = 'templates'
STATIC_FOLDER = 'static'
SECURITY_REGISTERABLE = False
SECURITY_CHANGEABLE = True
SECURITY_RECOVERABLE = True
SECURITY_SEND_PASSWORD_CHANGE_EMAIL = False
SECURITY_POST_LOGIN_VIEW = '/'
SECURITY_EMAIL_SUBJECT_PASSWORD_RESET = 'Passwort zurücksetzen'
SECURITY_EMAIL_SUBJECT_PASSWORD_NOTICE = 'Passwort wurde zurückgesetzt'
SECURITY_PASSWORD_LENGTH_MIN = 10
SECURITY_USER_IDENTITY_ATTRIBUTES = [
{"email": {"mapper": uia_email_mapper, "case_insensitive": True}},
{"username": {"mapper": uia_username_mapper}}
]
# mail configuration
MAIL_SERVER = ''
MAIL_PORT = 465
MAIL_USE_SSL = True
MAIL_USERNAME = ''
MAIL_PASSWORD = ''
MAIL_DEFAULT_SENDER = ''
SQLALCHEMY_DATABASE_URI = 'sqlite:///admin.db'
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
# underlying engine. This option makes sure that DB connections from the
# pool are still valid. Important for entire application since
# many DBaaS options automatically close idle connections.
SQLALCHEMY_ENGINE_OPTIONS = {
"pool_pre_ping": True,
}
KEY_FILE = '/root/flask_keys'
TOKEN_FILE = "/etc/door_tokens"
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
NFC_SOCKET = "/tmp/nfc.sock"
LOG_FILE = "/var/log/webinterface.log"
NFC_LOG = "/var/log/nfc.log"
MQTT_HOST = '10.10.21.2'

View File

@ -13,7 +13,7 @@
<body>
<nav class="navbar navbar-expand-lg navbar-light bg-light">
<div class="container-fluid">
<a class="navbar-brand" href="{{ url_for('door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
<a class="navbar-brand" href="{{ url_for('door_app.door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent"
aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
@ -29,10 +29,10 @@
Tokens
</a>
<div class="dropdown-menu" aria-labelledby="navbarDropdown">
<a class="dropdown-item" href="{{ url_for('register') }}">Token Registrierung</a>
<a class="dropdown-item" href="{{ url_for('list_tokens') }}">Token Übersicht</a>
<a class="dropdown-item" href="{{ url_for('door_app.register') }}">Token Registrierung</a>
<a class="dropdown-item" href="{{ url_for('door_app.list_tokens') }}">Token Übersicht</a>
{% if current_user.has_role('super_admin') %}
<a class="dropdown-item" href="{{ url_for('token_log') }}">Token Log</a>
<a class="dropdown-item" href="{{ url_for('door_app.token_log') }}">Token Log</a>
{% endif %}
</div>
</li>
@ -40,7 +40,7 @@
{% if current_user.has_role('super_admin') %}
<li class="nav-item">
<a class="nav-link" href="{{ url_for('manage_admins') }}">Benutzer verwalten</a>
<a class="nav-link" href="{{ url_for('door_app.manage_admins') }}">Benutzer verwalten</a>
</li>
{% endif %}

View File

@ -24,11 +24,11 @@
{% if current_user.is_authenticated %}
<div class="row">
<div class="col-3">
<a href="{{ url_for('open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
<a href="{{ url_for('door_app.open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
</div>
<div class="col-1"></div>
<div class="col-3">
<a href="{{ url_for('close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
<a href="{{ url_for('door_app.close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
</div>
<div class="col-5"></div>
</div>

View File

@ -1,31 +1,20 @@
from flask import Flask, render_template, request, flash, redirect, session, send_file
from flask import render_template, request, flash, redirect, session, send_file, Blueprint, current_app
from werkzeug.utils import secure_filename
from flask_wtf import FlaskForm
from wtforms.fields import DateField, EmailField
from wtforms.fields import StringField, BooleanField
from wtforms.validators import DataRequired, ValidationError, EqualTo
from flask_sqlalchemy import SQLAlchemy
from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password, uia_email_mapper, \
from flask_security import auth_required, hash_password, \
current_user, roles_required
from flask_security.models import fsqla_v2 as fsqla
from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import find_user
from flask_security.views import change_password
from flask_mail import Mail
from email_validator import validate_email
import json
import secrets
import bleach
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
from pathlib import Path
import logging
import tempfile
from datetime import date, datetime, timedelta
from .door_handle import DoorHandle
def validate_valid_thru_date(form, field):
if form.limit_validity.data: # only check date format if limited validity of token is set
@ -57,227 +46,17 @@ class AdminCreationForm(FlaskForm):
name = StringField('Name', validators=[DataRequired()])
email = EmailField('E-Mail', validators=[DataRequired()])
def uia_username_mapper(identity):
# we allow pretty much anything - but we bleach it.
return bleach.clean(identity, strip=True)
door_app = Blueprint('door_app', __name__,
template_folder='templates')
def create_application(config):
# set up logging for the web app
logger = logging.getLogger('webapp')
logger.setLevel(logging.INFO)
if config.log_file is not None:
ch = logging.FileHandler(config.log_file)
ch.setLevel(logging.INFO)
else:
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
# do some checks for file existence etc.
try:
with open(config.key_file) as f:
data = f.readlines()
if 'SECRET_KEY' in data[0]:
secret_key = data[0].split()[-1]
else:
raise Exception("Could not read SECURITY_PASSWORD_SALT")
if 'SECURITY_PASSWORD_SALT' in data[1]:
security_password_salt = data[1].split()[-1]
else:
raise Exception("Could not read SECURITY_PASSWORD_SALT")
except Exception as e:
logger.warning(f"Flask keys could not be read from file at {Path(config.key_file).absolute()}. Exception: {e}. Using default values instead.")
secret_key = 'Q7PJu2fg2jabYwP-Psop6c6f2G4'
security_password_salt = '10036796768252925167749545152988277953'
if Path(config.template_folder).is_absolute():
if not Path(config.template_folder).exists():
logger.error(f'Flask template folder not found at {Path(config.template_folder).absolute()}')
else:
if not (Path(__file__).parent / config.template_folder).exists():
logger.error(f'Flask template folder not found at {(Path(__file__).parent / config.template_folder).absolute()}')
if Path(config.static_folder).is_absolute():
if not Path(config.static_folder).exists():
logger.error(f'Flask static folder not found at {Path(config.static_folder).absolute()}')
else:
if not (Path(__file__).parent / config.static_folder).exists():
logger.error(f'Flask static folder not found at {(Path(__file__).parent / config.static_folder).absolute()}')
if not Path(config.token_file).exists():
logger.warning(f"Token file not found at {Path(config.token_file).absolute()}")
# create door objects which provides access to the token file and current door state via MQTT
door = DoorHandle(token_file=config.token_file, mqtt_host=config.mqtt_host, nfc_socket=config.nfc_socket,
logger=logger)
app = Flask(__name__, template_folder=config.template_folder, static_folder=config.static_folder)
# Generate a nice key using secrets.token_urlsafe()
app.config['SECRET_KEY'] = secret_key
# Bcrypt is set as default SECURITY_PASSWORD_HASH, which requires a salt
# Generate a good salt using: secrets.SystemRandom().getrandbits(128)
app.config['SECURITY_PASSWORD_SALT'] = security_password_salt
app.config['SECURITY_USER_IDENTITY_ATTRIBUTES'] = [
{"email": {"mapper": uia_email_mapper, "case_insensitive": True}},
{"username": {"mapper": uia_username_mapper}}
]
app.config['SECURITY_CHANGEABLE'] = True
app.config['SECURITY_RECOVERABLE'] = True
app.config['SECURITY_SEND_PASSWORD_CHANGE_EMAIL'] = False
# Mail Config
app.config['MAIL_SERVER'] = config.mail_server
app.config['MAIL_PORT'] = config.mail_port
app.config['MAIL_USE_TLS'] = config.mail_use_tls
app.config['MAIL_USE_SSL'] = config.mail_use_ssl
app.config['MAIL_USERNAME'] = config.mail_username
app.config['MAIL_PASSWORD'] = config.mail_password
app.config['MAIL_DEFAULT_SENDER'] = app.config['MAIL_USERNAME']
mail = Mail(app)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///admin.db'
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
# underlying engine. This option makes sure that DB connections from the
# pool are still valid. Important for entire application since
# many DBaaS options automatically close idle connections.
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
"pool_pre_ping": True,
}
# Create database connection object
db = SQLAlchemy(app)
# Define models
fsqla.FsModels.set_db_info(db)
class Role(db.Model, fsqla.FsRoleMixin):
pass
class User(db.Model, fsqla.FsUserMixin):
pass
# LDAP
ldap_server = ldap3.Server(config.ldap_url)
def validate_ldap(username, password):
"""Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized.
Then the permissions and additional information of the user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update the entry for the user in the local
database.
Parameters
----------
username : username for the LDAP server
password : password for the LDAP server
Returns
-------
bool : result of the authorization process (True = success, False = failure)
dict : dictionary with information about an authorized user (contains username, email, hashed password,
roles)
"""
try:
con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError:
# server not reachable -> fail (but will try authorization from local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
return False, None
# get user data and permissions from LDAP server
new_user_data = {}
new_user_data['username'] = username
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))',
attributes=ldap3.ALL_ATTRIBUTES)
authorized = True
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
authorized = False
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
if token_granting_permission:
new_user_data['roles'].append('admin')
return authorized, new_user_data
class ExtendedLoginForm(LoginForm):
email = StringField('Benutzername oder E-Mail', [Required()])
password = PasswordField('Passwort', [Required()])
remember = BooleanField('Login merken?')
def validate(self):
# search for user in the current database
user = find_user(self.email.data)
if user is not None:
# if a user is found we check if it is associated with LDAP or with the local database
if user.has_role('local'):
# try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate()
if authorized:
logger.info(f"User with credentials '{self.email.data}' authorized through local database")
else:
# run LDAP authorization
# if the authorization succeeds we also get the new_user_data dict which contains information about
# the user's permissions etc.
authorized, new_user_data = validate_ldap(user.username, self.password.data)
if authorized:
logger.info(f"User with credentials '{self.email.data}' authorized through LDAP")
# update permissions and password/email to stay up to date for login with no network connection
user.email = new_user_data['email']
user.password = new_user_data['password']
for role in new_user_data['roles']:
user_datastore.add_role_to_user(user, role)
user_datastore.commit()
self.user = user
else:
self.password.errors = ['Invalid password']
else:
# this means there is no user with that email in the database
# we assume that the username was entered instead of an email and use that for authentication with LDAP
username = self.email.data
# try LDAP authorization and create a new user if it succeeds
authorized, new_user_data = validate_ldap(username, self.password.data)
if authorized:
# if there was no user in the database before we create a new user
self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
password=new_user_data['password'], roles=new_user_data['roles'])
user_datastore.commit()
logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
" successful LDAP authorization")
# if any of the authorization methods is successful we authorize the user
return authorized
# we override the change_password view from flask security to only allow local users to change their passwords
# LDAP users should use the LDAP self service for changing passwords
# this route needs to be defined before the Flask Security setup
@app.route('/change', methods=['GET', 'POST'])
@auth_required()
def change_pw():
# we override the change_password view from flask security to only allow local users to change their passwords
# LDAP users should use the LDAP self service for changing passwords
# this route needs to be defined before the Flask Security setup
@door_app.route('/change', methods=['GET', 'POST'])
@auth_required()
def change_pw():
if current_user.has_role('local'):
# local users can change their password
return change_password()
@ -285,14 +64,10 @@ def create_application(config):
# LDAP users get redirected to the LDAP self service
return redirect('https://ldap.imaginaerraum.de/')
# Setup Flask-Security
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security = Security(app, user_datastore, login_form=ExtendedLoginForm)
# admin user management
@app.route('/manage_admins', methods=['GET', 'POST'])
@roles_required('super_admin')
def manage_admins():
# admin user management
@door_app.route('/manage_admins', methods=['GET', 'POST'])
@roles_required('super_admin')
def manage_admins():
form = AdminCreationForm()
if request.method == 'GET':
users = user_datastore.user_model.query.all()
@ -316,9 +91,9 @@ def create_application(config):
db.session.commit()
return redirect('/manage_admins')
@app.route('/delete_admins/<username>', methods=['GET', 'POST'])
@roles_required('super_admin')
def delete_admins(username):
@door_app.route('/delete_admins/<username>', methods=['GET', 'POST'])
@roles_required('super_admin')
def delete_admins(username):
user = user_datastore.find_user(username=username)
if user is None:
flash(f"Ungültiger Nutzer {username}")
@ -347,9 +122,9 @@ def create_application(config):
flash("Der eingegebene Nutzername stimmt nicht überein. Der Benutzer wurde nicht gelöscht!")
return redirect('/manage_admins')
@app.route('/admin_toggle_active/<username>')
@roles_required('super_admin')
def admin_toggle_active(username):
@door_app.route('/admin_toggle_active/<username>')
@roles_required('super_admin')
def admin_toggle_active(username):
user = user_datastore.find_user(username=username)
if user is None:
flash(f"Ungültiger Nutzer {username}")
@ -365,9 +140,9 @@ def create_application(config):
db.session.commit()
return redirect('/manage_admins')
@app.route('/promote_admin/<username>')
@roles_required('super_admin')
def promote_admin(username):
@door_app.route('/promote_admin/<username>')
@roles_required('super_admin')
def promote_admin(username):
user = user_datastore.find_user(username=username)
if user is None:
flash(f"Ungültiger Nutzer {username}")
@ -380,9 +155,9 @@ def create_application(config):
db.session.commit()
return redirect('/manage_admins')
@app.route('/demote_admin/<username>')
@roles_required('super_admin')
def demote_admin(username):
@door_app.route('/demote_admin/<username>')
@roles_required('super_admin')
def demote_admin(username):
user = user_datastore.find_user(username=username)
if user is None:
flash(f"Ungültiger Nutzer {username}")
@ -398,9 +173,9 @@ def create_application(config):
flash(f'Benutzer {username} ist bereits kein Admin!')
return redirect('/manage_admins')
@app.route('/backup_user_datastore')
@roles_required('super_admin')
def backup_user_datastore():
@door_app.route('/backup_user_datastore')
@roles_required('super_admin')
def backup_user_datastore():
# get list of defined admin users for backup
users = user_datastore.user_model.query.all()
user_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, 'password_hash': u.password,
@ -414,9 +189,9 @@ def create_application(config):
except Exception as e:
return str(e)
@app.route('/restore_user_datastore', methods=['POST'])
@roles_required('super_admin')
def restore_user_datastore():
@door_app.route('/restore_user_datastore', methods=['POST'])
@roles_required('super_admin')
def restore_user_datastore():
# check if the post request has the file part
if 'file' not in request.files:
flash('Keine Datei ausgewählt!')
@ -463,23 +238,24 @@ def create_application(config):
else:
flash("Ungültige Dateiendung")
return redirect('/manage_admins')
# main page
@app.route('/')
def door_lock():
return render_template('index.html', door_state=door.state, encoder_position=door.encoder_position)
# main page
@door_app.route('/')
def door_lock():
with current_app.app_context():
return render_template('index.html', door_state=current_app.door.state, encoder_position=current_app.door.encoder_position)
# token overview
@app.route('/tokens')
@roles_required('admin')
def list_tokens():
# token overview
@door_app.route('/tokens')
@roles_required('admin')
def list_tokens():
tokens = door.get_tokens()
assigned_tokens = {t: data for t, data in tokens.items() if not data['inactive']}
inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']}
return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens)
@app.route('/token-log')
@roles_required('super_admin')
def token_log():
@door_app.route('/token-log')
@roles_required('super_admin')
def token_log():
log = []
try:
with open(config.nfc_log) as f:
@ -491,10 +267,10 @@ def create_application(config):
flash(f"NFC logfile {Path(config.nfc_log).absolute()} konnte nicht gelesen werden. Exception: {e}")
return redirect('/')
# routes for registering, editing and deleting tokens
@app.route('/register-token', methods=['GET', 'POST'])
@roles_required('admin')
def register():
# routes for registering, editing and deleting tokens
@door_app.route('/register-token', methods=['GET', 'POST'])
@roles_required('admin')
def register():
"""Register new token for locking and unlocking the door.
This route displays the most recently scanned invalid token as reported in the logfile and provides a form for
@ -534,9 +310,9 @@ def create_application(config):
else:
return render_template('register.html', token=recent_token, form=form)
@app.route('/edit-token/<token>', methods=['GET', 'POST'])
@roles_required('admin')
def edit_token(token):
@door_app.route('/edit-token/<token>', methods=['GET', 'POST'])
@roles_required('admin')
def edit_token(token):
"""Edit data in the token file (name, email, valid_thru date, active/inactive).
If the route is accessed via GET it will provide a form for editing the currently stored data for the user.
@ -589,9 +365,9 @@ def create_application(config):
else:
return render_template('edit.html', token=token, form=form)
@app.route('/store-token')
@roles_required('admin')
def store_token():
@door_app.route('/store-token')
@roles_required('admin')
def store_token():
"""Store token to the token file on disk.
This will use the token id and the associated data stored in the session cookie (filled by register_token() or
@ -611,9 +387,9 @@ def create_application(config):
flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens')
@app.route('/delete-token/<token>', methods=['GET', 'POST'])
@roles_required('admin')
def delete_token(token):
@door_app.route('/delete-token/<token>', methods=['GET', 'POST'])
@roles_required('admin')
def delete_token(token):
"""Delete the given token from the token file and store the new token file to disk
Parameters
@ -652,9 +428,9 @@ def create_application(config):
flash(f'Ungültiger Token {token} für Löschung.')
return redirect('/tokens')
@app.route('/deactivate-token/<token>')
@roles_required('admin')
def deactivate_token(token):
@door_app.route('/deactivate-token/<token>')
@roles_required('admin')
def deactivate_token(token):
"""Deactivate access for the given token. This updates the token file on disk.
Parameters
@ -672,9 +448,9 @@ def create_application(config):
flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens')
@app.route('/backup_tokens')
@roles_required('admin')
def backup_tokens():
@door_app.route('/backup_tokens')
@roles_required('admin')
def backup_tokens():
# get list of defined admin users for backup
tokens = door.get_tokens()
try:
@ -685,9 +461,9 @@ def create_application(config):
except Exception as e:
return str(e)
@app.route('/open')
@auth_required()
def open_door():
@door_app.route('/open')
@auth_required()
def open_door():
try:
door.open_door(user=current_user.username)
@ -696,57 +472,13 @@ def create_application(config):
flash(f'Could not open door. Exception: {e}')
return redirect('/')
# routes for opening and closing the door via the web interface
@app.route('/close')
@auth_required()
def close_door():
# routes for opening and closing the door via the web interface
@door_app.route('/close')
@auth_required()
def close_door():
try:
door.close_door(user=current_user.username)
logger.info(f"Door closed by admin user {current_user.username}")
except Exception as e:
flash(f'Could not close door. Exception: {e}')
return redirect('/')
# setup user database when starting the app
with app.app_context():
new_admin_data = []
if config.admin_file is not None:
if not Path(config.admin_file).exists():
logger.warning(f"Admin user creation file not found at {config.admin_file}")
else:
# store data for new admins in memory s.t. the file can be deleted afterwards
with open(config.admin_file) as f:
for i, line in enumerate(f.readlines()):
if not line.strip().startswith('#'):
try:
user, email, pw = line.split()
validate_email(email)
new_admin_data.append({'username': user, 'email': email, 'password': pw})
except Exception as e:
print(
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
# create admin users (only if they don't exists already)
def create_super_admins(new_admin_data):
db.create_all()
super_admin_role = user_datastore.find_or_create_role('super_admin') # root admin = can create other admins
admin_role = user_datastore.find_or_create_role('admin') # 'normal' admin
local_role = user_datastore.find_or_create_role('local') # LDAP user or local user
for d in new_admin_data:
if user_datastore.find_user(email=d['email'], username=d['username']) is None:
roles = [super_admin_role, admin_role]
if not d['password'] == 'LDAP':
roles.append(local_role)
logger.info(f"New super admin user created with username '{d['username']}' and email '{d['email']}', roles = {[r.name for r in roles]}")
# create new admin (only if admin does not already exist)
new_admin = user_datastore.create_user(email=d['email'], username=d['username'],
password=hash_password(d['password']),
roles=roles)
db.session.commit()
create_super_admins(new_admin_data)
return app