Compare commits
No commits in common. "2879a694457d77f7060413f805ee70d49ae18e32" and "master" have entirely different histories.
2879a69445
...
master
|
@ -1,9 +1 @@
|
||||||
Flask-based web interface for user token adminstration of our hackerspace's door lock.
|
Flask-based web interface for user token adminstration of our hackerspace's door lock.
|
||||||
|
|
||||||
|
|
||||||
# Development
|
|
||||||
```shell
|
|
||||||
cd tests
|
|
||||||
export APPLICATION_SETTINGS=/home/simon/imaginaerraum/door-lock/webinterface/tests/debug_app_config.py
|
|
||||||
pytest --cov=../imaginaerraum_door_admin --cov-report=html --cov-report=term
|
|
||||||
```
|
|
||||||
|
|
|
@ -1,170 +0,0 @@
|
||||||
import logging
|
|
||||||
from flask import Flask
|
|
||||||
from flask_sqlalchemy import SQLAlchemy
|
|
||||||
from flask_security.models import fsqla_v2 as fsqla
|
|
||||||
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
|
|
||||||
from email_validator import validate_email
|
|
||||||
|
|
||||||
import ldap3
|
|
||||||
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
#from .webapp import door_app
|
|
||||||
from .door_handle import DoorHandle
|
|
||||||
from .auth import ExtendedLoginForm
|
|
||||||
|
|
||||||
security = Security()
|
|
||||||
db = SQLAlchemy()
|
|
||||||
|
|
||||||
|
|
||||||
# create admin users (only if they don't exists already)
|
|
||||||
def create_super_admins(app, db, user_datastore, logger):
|
|
||||||
# setup user database when starting the app
|
|
||||||
with app.app_context():
|
|
||||||
new_admin_data = []
|
|
||||||
if app.config['ADMIN_FILE'] is not None:
|
|
||||||
if not Path(app.config['ADMIN_FILE']).exists():
|
|
||||||
logger.warning(
|
|
||||||
f"Admin user creation file not found at {app.config['ADMIN_FILE']}")
|
|
||||||
else:
|
|
||||||
# store data for new admins in memory s.t. the file can be deleted afterwards
|
|
||||||
with open(app.config['ADMIN_FILE']) as f:
|
|
||||||
for i, line in enumerate(f.readlines()):
|
|
||||||
if not line.strip().startswith('#'):
|
|
||||||
try:
|
|
||||||
user, email, pw = line.split()
|
|
||||||
validate_email(email)
|
|
||||||
new_admin_data.append(
|
|
||||||
{'username': user, 'email': email,
|
|
||||||
'password': pw})
|
|
||||||
except Exception as e:
|
|
||||||
print(
|
|
||||||
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
|
|
||||||
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
|
|
||||||
|
|
||||||
db.create_all()
|
|
||||||
super_admin_role = user_datastore.find_or_create_role(
|
|
||||||
'super_admin') # root admin = can create other admins
|
|
||||||
admin_role = user_datastore.find_or_create_role(
|
|
||||||
'admin') # 'normal' admin
|
|
||||||
local_role = user_datastore.find_or_create_role(
|
|
||||||
'local') # LDAP user or local user
|
|
||||||
|
|
||||||
for d in new_admin_data:
|
|
||||||
if user_datastore.find_user(email=d['email'],
|
|
||||||
username=d['username']) is None:
|
|
||||||
roles = [super_admin_role, admin_role]
|
|
||||||
if not d['password'] == 'LDAP':
|
|
||||||
roles.append(local_role)
|
|
||||||
logger.info(
|
|
||||||
f"New super admin user created with username '{d['username']}' and email '{d['email']}', roles = {[r.name for r in roles]}")
|
|
||||||
|
|
||||||
# create new admin (only if admin does not already exist)
|
|
||||||
new_admin = user_datastore.create_user(email=d['email'],
|
|
||||||
username=d[
|
|
||||||
'username'],
|
|
||||||
password=hash_password(
|
|
||||||
d['password']),
|
|
||||||
roles=roles)
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
def setup_logging(app):
|
|
||||||
# set up logging for the web app
|
|
||||||
logger = logging.getLogger('webapp')
|
|
||||||
logger.setLevel(logging.INFO)
|
|
||||||
|
|
||||||
if app.config['LOG_FILE'] is not None:
|
|
||||||
ch = logging.FileHandler(app.config['LOG_FILE'])
|
|
||||||
ch.setLevel(logging.INFO)
|
|
||||||
else:
|
|
||||||
# create console handler and set level to debug
|
|
||||||
ch = logging.StreamHandler()
|
|
||||||
ch.setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
# create formatter
|
|
||||||
formatter = logging.Formatter(
|
|
||||||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
||||||
# add formatter to ch
|
|
||||||
ch.setFormatter(formatter)
|
|
||||||
# add ch to logger
|
|
||||||
logger.addHandler(ch)
|
|
||||||
|
|
||||||
return logger
|
|
||||||
|
|
||||||
def create_app():
|
|
||||||
app = Flask(__name__)
|
|
||||||
app.config.from_object('imaginaerraum_door_admin.default_app_config.DefaultConfig')
|
|
||||||
app.config.from_envvar('APPLICATION_SETTINGS')
|
|
||||||
|
|
||||||
logger = setup_logging(app)
|
|
||||||
|
|
||||||
# do some checks for file existence etc.
|
|
||||||
try:
|
|
||||||
with open(app.config['KEY_FILE']) as f:
|
|
||||||
data = f.readlines()
|
|
||||||
if 'SECRET_KEY' in data[0]:
|
|
||||||
secret_key = data[0].split()[-1]
|
|
||||||
else:
|
|
||||||
raise Exception("Could not read SECURITY_PASSWORD_SALT")
|
|
||||||
if 'SECURITY_PASSWORD_SALT' in data[1]:
|
|
||||||
security_password_salt = data[1].split()[-1]
|
|
||||||
else:
|
|
||||||
raise Exception("Could not read SECURITY_PASSWORD_SALT")
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(
|
|
||||||
f"Flask keys could not be read from file at {Path(app.config['KEY_FILE']).absolute()}. Exception: {e}. Using default values instead.")
|
|
||||||
secret_key = 'Q7PJu2fg2jabYwP-Psop6c6f2G4'
|
|
||||||
security_password_salt = '10036796768252925167749545152988277953'
|
|
||||||
|
|
||||||
if Path(app.config['TEMPLATE_FOLDER']).is_absolute():
|
|
||||||
if not Path(app.config['TEMPLATE_FOLDER']).exists():
|
|
||||||
logger.error(
|
|
||||||
f"Flask template folder not found at {Path(app.config['TEMPLATE_FOLDER']).absolute()}")
|
|
||||||
else:
|
|
||||||
if not (Path(__file__).parent / app.config['TEMPLATE_FOLDER']).exists():
|
|
||||||
logger.error(
|
|
||||||
f"Flask template folder not found at {(Path(__file__).parent / app.config['TEMPLATE_FOLDER']).absolute()}")
|
|
||||||
if Path(app.config['STATIC_FOLDER']).is_absolute():
|
|
||||||
if not Path(app.config['STATIC_FOLDER']).exists():
|
|
||||||
logger.error(
|
|
||||||
f"Flask static folder not found at {Path(app.config['STATIC_FOLDER']).absolute()}")
|
|
||||||
else:
|
|
||||||
if not (Path(__file__).parent / app.config['STATIC_FOLDER']).exists():
|
|
||||||
logger.error(
|
|
||||||
f"Flask static folder not found at {(Path(__file__).parent / app.config['STATIC_FOLDER']).absolute()}")
|
|
||||||
if not Path(app.config['TOKEN_FILE']).exists():
|
|
||||||
logger.warning(
|
|
||||||
f"Token file not found at {Path(app.config['TOKEN_FILE']).absolute()}")
|
|
||||||
|
|
||||||
# create door objects which provides access to the token file and current door state via MQTT
|
|
||||||
app.door = DoorHandle(token_file=app.config['TOKEN_FILE'], mqtt_host=app.config['MQTT_HOST'],
|
|
||||||
nfc_socket=app.config['NFC_SOCKET'],
|
|
||||||
logger=logger)
|
|
||||||
|
|
||||||
# Mail Config
|
|
||||||
#mail = Mail(app)
|
|
||||||
|
|
||||||
# Create database connection object
|
|
||||||
db.init_app(app)
|
|
||||||
|
|
||||||
# Define models
|
|
||||||
fsqla.FsModels.set_db_info(db)
|
|
||||||
|
|
||||||
class Role(db.Model, fsqla.FsRoleMixin):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class User(db.Model, fsqla.FsUserMixin):
|
|
||||||
pass
|
|
||||||
|
|
||||||
from . webapp import door_app
|
|
||||||
app.register_blueprint(door_app)
|
|
||||||
|
|
||||||
ldap_server = ldap3.Server(app.config['LDAP_URL'])
|
|
||||||
|
|
||||||
# Setup Flask-Security
|
|
||||||
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
|
||||||
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
|
|
||||||
|
|
||||||
create_super_admins(app, db, user_datastore, logger)
|
|
||||||
|
|
||||||
return app
|
|
|
@ -1,112 +0,0 @@
|
||||||
from wtforms.fields import StringField, BooleanField
|
|
||||||
from flask import current_app
|
|
||||||
from flask_security import hash_password
|
|
||||||
from flask_security.forms import LoginForm, Required, PasswordField
|
|
||||||
from flask_security.utils import find_user
|
|
||||||
import ldap3
|
|
||||||
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
|
|
||||||
|
|
||||||
|
|
||||||
class ExtendedLoginForm(LoginForm):
|
|
||||||
email = StringField('Benutzername oder E-Mail', [Required()])
|
|
||||||
password = PasswordField('Passwort', [Required()])
|
|
||||||
remember = BooleanField('Login merken?')
|
|
||||||
|
|
||||||
def validate(self):
|
|
||||||
# search for user in the current database
|
|
||||||
user = find_user(self.email.data)
|
|
||||||
if user is not None:
|
|
||||||
# if a user is found we check if it is associated with LDAP or with the local database
|
|
||||||
if user.has_role('local'):
|
|
||||||
# try authorizing locally using Flask security user datastore
|
|
||||||
authorized = super(ExtendedLoginForm, self).validate()
|
|
||||||
|
|
||||||
if authorized:
|
|
||||||
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database")
|
|
||||||
else:
|
|
||||||
# run LDAP authorization
|
|
||||||
# if the authorization succeeds we also get the new_user_data dict which contains information about
|
|
||||||
# the user's permissions etc.
|
|
||||||
authorized, new_user_data = validate_ldap(user.username, self.password.data)
|
|
||||||
|
|
||||||
if authorized:
|
|
||||||
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through LDAP")
|
|
||||||
# update permissions and password/email to stay up to date for login with no network connection
|
|
||||||
user.email = new_user_data['email']
|
|
||||||
user.password = new_user_data['password']
|
|
||||||
for role in new_user_data['roles']:
|
|
||||||
user_datastore.add_role_to_user(user, role)
|
|
||||||
user_datastore.commit()
|
|
||||||
self.user = user
|
|
||||||
else:
|
|
||||||
self.password.errors = ['Invalid password']
|
|
||||||
else:
|
|
||||||
# this means there is no user with that email in the database
|
|
||||||
# we assume that the username was entered instead of an email and use that for authentication with LDAP
|
|
||||||
username = self.email.data
|
|
||||||
# try LDAP authorization and create a new user if it succeeds
|
|
||||||
authorized, new_user_data = validate_ldap(username, self.password.data)
|
|
||||||
|
|
||||||
if authorized:
|
|
||||||
# if there was no user in the database before we create a new user
|
|
||||||
self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
|
|
||||||
password=new_user_data['password'], roles=new_user_data['roles'])
|
|
||||||
user_datastore.commit()
|
|
||||||
current_app.logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
|
|
||||||
" successful LDAP authorization")
|
|
||||||
|
|
||||||
# if any of the authorization methods is successful we authorize the user
|
|
||||||
return authorized
|
|
||||||
|
|
||||||
def validate_ldap(username, password):
|
|
||||||
"""Validate the user and password through an LDAP server.
|
|
||||||
|
|
||||||
If the connection completes successfully the given user and password is authorized.
|
|
||||||
Then the permissions and additional information of the user are obtained through an LDAP search.
|
|
||||||
The data is stored in a dict which will be used later to create/update the entry for the user in the local
|
|
||||||
database.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
username : username for the LDAP server
|
|
||||||
password : password for the LDAP server
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
bool : result of the authorization process (True = success, False = failure)
|
|
||||||
dict : dictionary with information about an authorized user (contains username, email, hashed password,
|
|
||||||
roles)
|
|
||||||
"""
|
|
||||||
|
|
||||||
try:
|
|
||||||
con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
|
|
||||||
password=password, auto_bind=True)
|
|
||||||
except ldap3.core.exceptions.LDAPBindError:
|
|
||||||
# server reachable but user unauthorized -> fail
|
|
||||||
return False, None
|
|
||||||
except LDAPSocketOpenError:
|
|
||||||
# server not reachable -> fail (but will try authorization from local database later)
|
|
||||||
return False, None
|
|
||||||
except Exception as e:
|
|
||||||
# for other Exceptions we just fail
|
|
||||||
return False, None
|
|
||||||
|
|
||||||
# get user data and permissions from LDAP server
|
|
||||||
new_user_data = {}
|
|
||||||
new_user_data['username'] = username
|
|
||||||
new_user_data['password'] = hash_password(password)
|
|
||||||
new_user_data['roles'] = []
|
|
||||||
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
|
|
||||||
f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))',
|
|
||||||
attributes=ldap3.ALL_ATTRIBUTES)
|
|
||||||
authorized = True
|
|
||||||
if lock_permission:
|
|
||||||
new_user_data['email'] = con.entries[0].mail.value
|
|
||||||
else:
|
|
||||||
authorized = False
|
|
||||||
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
|
|
||||||
f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
|
|
||||||
if token_granting_permission:
|
|
||||||
new_user_data['roles'].append('admin')
|
|
||||||
|
|
||||||
return authorized, new_user_data
|
|
|
@ -1,55 +0,0 @@
|
||||||
print("loading default config")
|
|
||||||
import bleach
|
|
||||||
from flask_security import uia_email_mapper
|
|
||||||
|
|
||||||
def uia_username_mapper(identity):
|
|
||||||
# we allow pretty much anything - but we bleach it.
|
|
||||||
return bleach.clean(identity, strip=True)
|
|
||||||
|
|
||||||
class DefaultConfig(object):
|
|
||||||
DEBUG = False
|
|
||||||
|
|
||||||
SECRET_KEY = 'supersecret'
|
|
||||||
|
|
||||||
TEMPLATE_FOLDER = 'templates'
|
|
||||||
STATIC_FOLDER = 'static'
|
|
||||||
|
|
||||||
SECURITY_REGISTERABLE = False
|
|
||||||
SECURITY_CHANGEABLE = True
|
|
||||||
SECURITY_RECOVERABLE = True
|
|
||||||
SECURITY_SEND_PASSWORD_CHANGE_EMAIL = False
|
|
||||||
|
|
||||||
SECURITY_POST_LOGIN_VIEW = '/'
|
|
||||||
SECURITY_EMAIL_SUBJECT_PASSWORD_RESET = 'Passwort zurücksetzen'
|
|
||||||
SECURITY_EMAIL_SUBJECT_PASSWORD_NOTICE = 'Passwort wurde zurückgesetzt'
|
|
||||||
SECURITY_PASSWORD_LENGTH_MIN = 10
|
|
||||||
|
|
||||||
SECURITY_USER_IDENTITY_ATTRIBUTES = [
|
|
||||||
{"email": {"mapper": uia_email_mapper, "case_insensitive": True}},
|
|
||||||
{"username": {"mapper": uia_username_mapper}}
|
|
||||||
]
|
|
||||||
|
|
||||||
# mail configuration
|
|
||||||
MAIL_SERVER = ''
|
|
||||||
MAIL_PORT = 465
|
|
||||||
MAIL_USE_SSL = True
|
|
||||||
MAIL_USERNAME = ''
|
|
||||||
MAIL_PASSWORD = ''
|
|
||||||
MAIL_DEFAULT_SENDER = ''
|
|
||||||
|
|
||||||
SQLALCHEMY_DATABASE_URI = 'sqlite:///admin.db'
|
|
||||||
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
|
|
||||||
# underlying engine. This option makes sure that DB connections from the
|
|
||||||
# pool are still valid. Important for entire application since
|
|
||||||
# many DBaaS options automatically close idle connections.
|
|
||||||
SQLALCHEMY_ENGINE_OPTIONS = {
|
|
||||||
"pool_pre_ping": True,
|
|
||||||
}
|
|
||||||
|
|
||||||
KEY_FILE = '/root/flask_keys'
|
|
||||||
TOKEN_FILE = "/etc/door_tokens"
|
|
||||||
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
|
|
||||||
NFC_SOCKET = "/tmp/nfc.sock"
|
|
||||||
LOG_FILE = "/var/log/webinterface.log"
|
|
||||||
NFC_LOG = "/var/log/nfc.log"
|
|
||||||
MQTT_HOST = '10.10.21.2'
|
|
|
@ -27,15 +27,15 @@
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
<td>
|
<td>
|
||||||
{% if not data['super_admin'] %}
|
{% if not data['super_admin'] %}
|
||||||
<a href="{{ url_for('door_app.admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
|
<a href="{{ url_for('admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
|
||||||
<a href="{{ url_for('door_app.delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
<a href="{{ url_for('delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
{% if data['admin'] %}
|
{% if data['admin'] %}
|
||||||
{% if not data['super_admin'] %}
|
{% if not data['super_admin'] %}
|
||||||
<a href="{{ url_for('door_app.demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a>
|
<a href="{{ url_for('demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
{% else %}
|
{% else %}
|
||||||
<a href="{{ url_for('door_app.promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a>
|
<a href="{{ url_for('promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
|
@ -68,14 +68,14 @@
|
||||||
</div>
|
</div>
|
||||||
<div class="p-2 bg-light border">
|
<div class="p-2 bg-light border">
|
||||||
<h3>Nutzerdaten sichern:</h3>
|
<h3>Nutzerdaten sichern:</h3>
|
||||||
<form action="{{ url_for('door_app.backup_user_datastore') }}" method="get">
|
<form action="{{ url_for('backup_user_datastore') }}" method="get">
|
||||||
<input type="submit" value="Download">
|
<input type="submit" value="Download">
|
||||||
</form>
|
</form>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<div class="p-2 bg-light border">
|
<div class="p-2 bg-light border">
|
||||||
<h3>Nutzerdaten wiederherstellen:</h3>
|
<h3>Nutzerdaten wiederherstellen:</h3>
|
||||||
<form action="{{ url_for('door_app.restore_user_datastore') }}" method=post enctype=multipart/form-data>
|
<form action="{{ url_for('restore_user_datastore') }}" method=post enctype=multipart/form-data>
|
||||||
<input type=file name=file>
|
<input type=file name=file>
|
||||||
<input type=submit value="Abschicken">
|
<input type=submit value="Abschicken">
|
||||||
</form>
|
</form>
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
<body>
|
<body>
|
||||||
<nav class="navbar navbar-expand-lg navbar-light bg-light">
|
<nav class="navbar navbar-expand-lg navbar-light bg-light">
|
||||||
<div class="container-fluid">
|
<div class="container-fluid">
|
||||||
<a class="navbar-brand" href="{{ url_for('door_app.door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
|
<a class="navbar-brand" href="{{ url_for('door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
|
||||||
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent"
|
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent"
|
||||||
aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
|
aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
|
||||||
<span class="navbar-toggler-icon"></span>
|
<span class="navbar-toggler-icon"></span>
|
||||||
|
@ -29,10 +29,10 @@
|
||||||
Tokens
|
Tokens
|
||||||
</a>
|
</a>
|
||||||
<div class="dropdown-menu" aria-labelledby="navbarDropdown">
|
<div class="dropdown-menu" aria-labelledby="navbarDropdown">
|
||||||
<a class="dropdown-item" href="{{ url_for('door_app.register') }}">Token Registrierung</a>
|
<a class="dropdown-item" href="{{ url_for('register') }}">Token Registrierung</a>
|
||||||
<a class="dropdown-item" href="{{ url_for('door_app.list_tokens') }}">Token Übersicht</a>
|
<a class="dropdown-item" href="{{ url_for('list_tokens') }}">Token Übersicht</a>
|
||||||
{% if current_user.has_role('super_admin') %}
|
{% if current_user.has_role('super_admin') %}
|
||||||
<a class="dropdown-item" href="{{ url_for('door_app.token_log') }}">Token Log</a>
|
<a class="dropdown-item" href="{{ url_for('token_log') }}">Token Log</a>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
</li>
|
</li>
|
||||||
|
@ -40,7 +40,7 @@
|
||||||
|
|
||||||
{% if current_user.has_role('super_admin') %}
|
{% if current_user.has_role('super_admin') %}
|
||||||
<li class="nav-item">
|
<li class="nav-item">
|
||||||
<a class="nav-link" href="{{ url_for('door_app.manage_admins') }}">Benutzer verwalten</a>
|
<a class="nav-link" href="{{ url_for('manage_admins') }}">Benutzer verwalten</a>
|
||||||
</li>
|
</li>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
|
||||||
|
|
|
@ -24,11 +24,11 @@
|
||||||
{% if current_user.is_authenticated %}
|
{% if current_user.is_authenticated %}
|
||||||
<div class="row">
|
<div class="row">
|
||||||
<div class="col-3">
|
<div class="col-3">
|
||||||
<a href="{{ url_for('door_app.open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
|
<a href="{{ url_for('open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
|
||||||
</div>
|
</div>
|
||||||
<div class="col-1"></div>
|
<div class="col-1"></div>
|
||||||
<div class="col-3">
|
<div class="col-3">
|
||||||
<a href="{{ url_for('door_app.close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
|
<a href="{{ url_for('close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
|
||||||
</div>
|
</div>
|
||||||
<div class="col-5"></div>
|
<div class="col-5"></div>
|
||||||
</div>
|
</div>
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
und diesen zustimmt.
|
und diesen zustimmt.
|
||||||
</li>
|
</li>
|
||||||
<li>Achtung: Der Token funktioniert nicht sofort, sondern muss erst explizit aktiviert werden!
|
<li>Achtung: Der Token funktioniert nicht sofort, sondern muss erst explizit aktiviert werden!
|
||||||
Dazu in der <a href="{{ url_for('door_app.list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol
|
Dazu in der <a href="{{ url_for('list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol
|
||||||
(<img src="static/edit.png" title="Editieren" alt="Edit">) klicken und den Haken bei "Aktiv?" setzen.
|
(<img src="static/edit.png" title="Editieren" alt="Edit">) klicken und den Haken bei "Aktiv?" setzen.
|
||||||
</li>
|
</li>
|
||||||
<li>Jetzt kann der Token verwendet werden.</li>
|
<li>Jetzt kann der Token verwendet werden.</li>
|
||||||
|
|
|
@ -29,9 +29,9 @@
|
||||||
<td>{{ data[field] if data[field] }}</td>
|
<td>{{ data[field] if data[field] }}</td>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
<td>
|
<td>
|
||||||
<a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
|
<a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
|
||||||
<a href="{{ url_for('door_app.deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a>
|
<a href="{{ url_for('deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a>
|
||||||
<a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
<a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
|
@ -47,14 +47,14 @@
|
||||||
<td>{{ data[field] if data[field] }}</td>
|
<td>{{ data[field] if data[field] }}</td>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
<td>
|
<td>
|
||||||
<a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
|
<a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
|
||||||
<a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
<a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
</tbody>
|
</tbody>
|
||||||
</table>
|
</table>
|
||||||
<form action="{{ url_for('door_app.backup_tokens') }}" method="get">
|
<form action="{{ url_for('backup_tokens') }}" method="get">
|
||||||
<input type="submit" value="Token Daten sichern">
|
<input type="submit" value="Token Daten sichern">
|
||||||
</form>
|
</form>
|
||||||
{% endblock %}
|
{% endblock %}
|
|
@ -1,22 +1,30 @@
|
||||||
from flask import render_template, request, flash, redirect, session, send_file, Blueprint, current_app
|
from flask import Flask, render_template, request, flash, redirect, session, send_file
|
||||||
from werkzeug.utils import secure_filename
|
from werkzeug.utils import secure_filename
|
||||||
from flask_wtf import FlaskForm
|
from flask_wtf import FlaskForm
|
||||||
from wtforms.fields import DateField, EmailField
|
from wtforms.fields.html5 import DateField, EmailField
|
||||||
from wtforms.fields import StringField, BooleanField
|
from wtforms.fields import StringField, BooleanField
|
||||||
from wtforms.validators import DataRequired, ValidationError, EqualTo
|
from wtforms.validators import DataRequired, ValidationError, EqualTo
|
||||||
from flask_security import auth_required, hash_password, \
|
from flask_sqlalchemy import SQLAlchemy
|
||||||
|
from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password, uia_email_mapper, \
|
||||||
current_user, roles_required
|
current_user, roles_required
|
||||||
|
from flask_security.models import fsqla_v2 as fsqla
|
||||||
|
from flask_security.forms import LoginForm, Required, PasswordField
|
||||||
|
from flask_security.utils import find_user
|
||||||
from flask_security.views import change_password
|
from flask_security.views import change_password
|
||||||
|
from flask_mail import Mail
|
||||||
from email_validator import validate_email
|
from email_validator import validate_email
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import secrets
|
import secrets
|
||||||
|
import bleach
|
||||||
|
import ldap3
|
||||||
|
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import logging
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
from datetime import date, datetime, timedelta
|
from datetime import date, datetime, timedelta
|
||||||
|
from .door_handle import DoorHandle
|
||||||
from imaginaerraum_door_admin import db, security
|
|
||||||
|
|
||||||
|
|
||||||
def validate_valid_thru_date(form, field):
|
def validate_valid_thru_date(form, field):
|
||||||
|
@ -49,16 +57,227 @@ class AdminCreationForm(FlaskForm):
|
||||||
name = StringField('Name', validators=[DataRequired()])
|
name = StringField('Name', validators=[DataRequired()])
|
||||||
email = EmailField('E-Mail', validators=[DataRequired()])
|
email = EmailField('E-Mail', validators=[DataRequired()])
|
||||||
|
|
||||||
door_app = Blueprint('door_app', __name__,
|
|
||||||
template_folder='templates')
|
def uia_username_mapper(identity):
|
||||||
|
# we allow pretty much anything - but we bleach it.
|
||||||
|
return bleach.clean(identity, strip=True)
|
||||||
|
|
||||||
|
|
||||||
# we override the change_password view from flask security to only allow local users to change their passwords
|
def create_application(config):
|
||||||
# LDAP users should use the LDAP self service for changing passwords
|
# set up logging for the web app
|
||||||
# this route needs to be defined before the Flask Security setup
|
logger = logging.getLogger('webapp')
|
||||||
@door_app.route('/change', methods=['GET', 'POST'])
|
logger.setLevel(logging.INFO)
|
||||||
@auth_required()
|
|
||||||
def change_pw():
|
if config.log_file is not None:
|
||||||
|
ch = logging.FileHandler(config.log_file)
|
||||||
|
ch.setLevel(logging.INFO)
|
||||||
|
else:
|
||||||
|
# create console handler and set level to debug
|
||||||
|
ch = logging.StreamHandler()
|
||||||
|
ch.setLevel(logging.DEBUG)
|
||||||
|
# create formatter
|
||||||
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||||
|
# add formatter to ch
|
||||||
|
ch.setFormatter(formatter)
|
||||||
|
# add ch to logger
|
||||||
|
logger.addHandler(ch)
|
||||||
|
|
||||||
|
# do some checks for file existence etc.
|
||||||
|
try:
|
||||||
|
with open(config.key_file) as f:
|
||||||
|
data = f.readlines()
|
||||||
|
if 'SECRET_KEY' in data[0]:
|
||||||
|
secret_key = data[0].split()[-1]
|
||||||
|
else:
|
||||||
|
raise Exception("Could not read SECURITY_PASSWORD_SALT")
|
||||||
|
if 'SECURITY_PASSWORD_SALT' in data[1]:
|
||||||
|
security_password_salt = data[1].split()[-1]
|
||||||
|
else:
|
||||||
|
raise Exception("Could not read SECURITY_PASSWORD_SALT")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Flask keys could not be read from file at {Path(config.key_file).absolute()}. Exception: {e}. Using default values instead.")
|
||||||
|
secret_key = 'Q7PJu2fg2jabYwP-Psop6c6f2G4'
|
||||||
|
security_password_salt = '10036796768252925167749545152988277953'
|
||||||
|
|
||||||
|
if Path(config.template_folder).is_absolute():
|
||||||
|
if not Path(config.template_folder).exists():
|
||||||
|
logger.error(f'Flask template folder not found at {Path(config.template_folder).absolute()}')
|
||||||
|
else:
|
||||||
|
if not (Path(__file__).parent / config.template_folder).exists():
|
||||||
|
logger.error(f'Flask template folder not found at {(Path(__file__).parent / config.template_folder).absolute()}')
|
||||||
|
if Path(config.static_folder).is_absolute():
|
||||||
|
if not Path(config.static_folder).exists():
|
||||||
|
logger.error(f'Flask static folder not found at {Path(config.static_folder).absolute()}')
|
||||||
|
else:
|
||||||
|
if not (Path(__file__).parent / config.static_folder).exists():
|
||||||
|
logger.error(f'Flask static folder not found at {(Path(__file__).parent / config.static_folder).absolute()}')
|
||||||
|
if not Path(config.token_file).exists():
|
||||||
|
logger.warning(f"Token file not found at {Path(config.token_file).absolute()}")
|
||||||
|
|
||||||
|
# create door objects which provides access to the token file and current door state via MQTT
|
||||||
|
door = DoorHandle(token_file=config.token_file, mqtt_host=config.mqtt_host, nfc_socket=config.nfc_socket,
|
||||||
|
logger=logger)
|
||||||
|
|
||||||
|
app = Flask(__name__, template_folder=config.template_folder, static_folder=config.static_folder)
|
||||||
|
|
||||||
|
# Generate a nice key using secrets.token_urlsafe()
|
||||||
|
app.config['SECRET_KEY'] = secret_key
|
||||||
|
# Bcrypt is set as default SECURITY_PASSWORD_HASH, which requires a salt
|
||||||
|
# Generate a good salt using: secrets.SystemRandom().getrandbits(128)
|
||||||
|
app.config['SECURITY_PASSWORD_SALT'] = security_password_salt
|
||||||
|
|
||||||
|
app.config['SECURITY_USER_IDENTITY_ATTRIBUTES'] = [
|
||||||
|
{"email": {"mapper": uia_email_mapper, "case_insensitive": True}},
|
||||||
|
{"username": {"mapper": uia_username_mapper}}
|
||||||
|
]
|
||||||
|
app.config['SECURITY_CHANGEABLE'] = True
|
||||||
|
app.config['SECURITY_RECOVERABLE'] = True
|
||||||
|
app.config['SECURITY_SEND_PASSWORD_CHANGE_EMAIL'] = False
|
||||||
|
|
||||||
|
# Mail Config
|
||||||
|
app.config['MAIL_SERVER'] = config.mail_server
|
||||||
|
app.config['MAIL_PORT'] = config.mail_port
|
||||||
|
app.config['MAIL_USE_TLS'] = config.mail_use_tls
|
||||||
|
app.config['MAIL_USE_SSL'] = config.mail_use_ssl
|
||||||
|
app.config['MAIL_USERNAME'] = config.mail_username
|
||||||
|
app.config['MAIL_PASSWORD'] = config.mail_password
|
||||||
|
app.config['MAIL_DEFAULT_SENDER'] = app.config['MAIL_USERNAME']
|
||||||
|
mail = Mail(app)
|
||||||
|
|
||||||
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///admin.db'
|
||||||
|
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
|
||||||
|
# underlying engine. This option makes sure that DB connections from the
|
||||||
|
# pool are still valid. Important for entire application since
|
||||||
|
# many DBaaS options automatically close idle connections.
|
||||||
|
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
|
||||||
|
"pool_pre_ping": True,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Create database connection object
|
||||||
|
db = SQLAlchemy(app)
|
||||||
|
|
||||||
|
# Define models
|
||||||
|
fsqla.FsModels.set_db_info(db)
|
||||||
|
|
||||||
|
class Role(db.Model, fsqla.FsRoleMixin):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class User(db.Model, fsqla.FsUserMixin):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# LDAP
|
||||||
|
ldap_server = ldap3.Server(config.ldap_url)
|
||||||
|
|
||||||
|
def validate_ldap(username, password):
|
||||||
|
"""Validate the user and password through an LDAP server.
|
||||||
|
|
||||||
|
If the connection completes successfully the given user and password is authorized.
|
||||||
|
Then the permissions and additional information of the user are obtained through an LDAP search.
|
||||||
|
The data is stored in a dict which will be used later to create/update the entry for the user in the local
|
||||||
|
database.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
username : username for the LDAP server
|
||||||
|
password : password for the LDAP server
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
bool : result of the authorization process (True = success, False = failure)
|
||||||
|
dict : dictionary with information about an authorized user (contains username, email, hashed password,
|
||||||
|
roles)
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
|
||||||
|
password=password, auto_bind=True)
|
||||||
|
except ldap3.core.exceptions.LDAPBindError:
|
||||||
|
# server reachable but user unauthorized -> fail
|
||||||
|
return False, None
|
||||||
|
except LDAPSocketOpenError:
|
||||||
|
# server not reachable -> fail (but will try authorization from local database later)
|
||||||
|
return False, None
|
||||||
|
except Exception as e:
|
||||||
|
# for other Exceptions we just fail
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
# get user data and permissions from LDAP server
|
||||||
|
new_user_data = {}
|
||||||
|
new_user_data['username'] = username
|
||||||
|
new_user_data['password'] = hash_password(password)
|
||||||
|
new_user_data['roles'] = []
|
||||||
|
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
|
||||||
|
f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))',
|
||||||
|
attributes=ldap3.ALL_ATTRIBUTES)
|
||||||
|
authorized = True
|
||||||
|
if lock_permission:
|
||||||
|
new_user_data['email'] = con.entries[0].mail.value
|
||||||
|
else:
|
||||||
|
authorized = False
|
||||||
|
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
|
||||||
|
f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
|
||||||
|
if token_granting_permission:
|
||||||
|
new_user_data['roles'].append('admin')
|
||||||
|
|
||||||
|
return authorized, new_user_data
|
||||||
|
|
||||||
|
class ExtendedLoginForm(LoginForm):
|
||||||
|
email = StringField('Benutzername oder E-Mail', [Required()])
|
||||||
|
password = PasswordField('Passwort', [Required()])
|
||||||
|
remember = BooleanField('Login merken?')
|
||||||
|
|
||||||
|
def validate(self):
|
||||||
|
# search for user in the current database
|
||||||
|
user = find_user(self.email.data)
|
||||||
|
if user is not None:
|
||||||
|
# if a user is found we check if it is associated with LDAP or with the local database
|
||||||
|
if user.has_role('local'):
|
||||||
|
# try authorizing locally using Flask security user datastore
|
||||||
|
authorized = super(ExtendedLoginForm, self).validate()
|
||||||
|
|
||||||
|
if authorized:
|
||||||
|
logger.info(f"User with credentials '{self.email.data}' authorized through local database")
|
||||||
|
else:
|
||||||
|
# run LDAP authorization
|
||||||
|
# if the authorization succeeds we also get the new_user_data dict which contains information about
|
||||||
|
# the user's permissions etc.
|
||||||
|
authorized, new_user_data = validate_ldap(user.username, self.password.data)
|
||||||
|
|
||||||
|
if authorized:
|
||||||
|
logger.info(f"User with credentials '{self.email.data}' authorized through LDAP")
|
||||||
|
# update permissions and password/email to stay up to date for login with no network connection
|
||||||
|
user.email = new_user_data['email']
|
||||||
|
user.password = new_user_data['password']
|
||||||
|
for role in new_user_data['roles']:
|
||||||
|
user_datastore.add_role_to_user(user, role)
|
||||||
|
user_datastore.commit()
|
||||||
|
self.user = user
|
||||||
|
else:
|
||||||
|
self.password.errors = ['Invalid password']
|
||||||
|
else:
|
||||||
|
# this means there is no user with that email in the database
|
||||||
|
# we assume that the username was entered instead of an email and use that for authentication with LDAP
|
||||||
|
username = self.email.data
|
||||||
|
# try LDAP authorization and create a new user if it succeeds
|
||||||
|
authorized, new_user_data = validate_ldap(username, self.password.data)
|
||||||
|
|
||||||
|
if authorized:
|
||||||
|
# if there was no user in the database before we create a new user
|
||||||
|
self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
|
||||||
|
password=new_user_data['password'], roles=new_user_data['roles'])
|
||||||
|
user_datastore.commit()
|
||||||
|
logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
|
||||||
|
" successful LDAP authorization")
|
||||||
|
|
||||||
|
# if any of the authorization methods is successful we authorize the user
|
||||||
|
return authorized
|
||||||
|
|
||||||
|
# we override the change_password view from flask security to only allow local users to change their passwords
|
||||||
|
# LDAP users should use the LDAP self service for changing passwords
|
||||||
|
# this route needs to be defined before the Flask Security setup
|
||||||
|
@app.route('/change', methods=['GET', 'POST'])
|
||||||
|
@auth_required()
|
||||||
|
def change_pw():
|
||||||
if current_user.has_role('local'):
|
if current_user.has_role('local'):
|
||||||
# local users can change their password
|
# local users can change their password
|
||||||
return change_password()
|
return change_password()
|
||||||
|
@ -66,41 +285,41 @@ def change_pw():
|
||||||
# LDAP users get redirected to the LDAP self service
|
# LDAP users get redirected to the LDAP self service
|
||||||
return redirect('https://ldap.imaginaerraum.de/')
|
return redirect('https://ldap.imaginaerraum.de/')
|
||||||
|
|
||||||
|
# Setup Flask-Security
|
||||||
|
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
||||||
|
security = Security(app, user_datastore, login_form=ExtendedLoginForm)
|
||||||
|
|
||||||
# admin user management
|
# admin user management
|
||||||
@door_app.route('/manage_admins', methods=['GET', 'POST'])
|
@app.route('/manage_admins', methods=['GET', 'POST'])
|
||||||
@roles_required('super_admin')
|
@roles_required('super_admin')
|
||||||
def manage_admins():
|
def manage_admins():
|
||||||
form = AdminCreationForm()
|
form = AdminCreationForm()
|
||||||
if request.method == 'GET':
|
if request.method == 'GET':
|
||||||
users = security.datastore.user_model.query.all()
|
users = user_datastore.user_model.query.all()
|
||||||
admin_data = [{'username': u.username, 'email': u.email, 'active': u.is_active,
|
admin_data = [{'username': u.username, 'email': u.email, 'active': u.is_active,
|
||||||
'admin': u.has_role('admin'), 'super_admin': u.has_role('super_admin'),
|
'admin': u.has_role('admin'), 'super_admin': u.has_role('super_admin'),
|
||||||
} for u in users]
|
} for u in users]
|
||||||
return render_template('admins.html', admin_data=admin_data, form=form)
|
return render_template('admins.html', admin_data=admin_data, form=form)
|
||||||
elif form.validate():
|
elif form.validate():
|
||||||
if security.datastore.find_user(username=form.name.data) is not None or \
|
if user_datastore.find_user(username=form.name.data) is not None or \
|
||||||
security.datastore.find_user(email=form.email.data) is not None:
|
user_datastore.find_user(email=form.email.data) is not None:
|
||||||
flash("Ein Benutzer mit diesem Nutzernamen oder dieser E-Mail-Adresse existiert bereits!")
|
flash("Ein Benutzer mit diesem Nutzernamen oder dieser E-Mail-Adresse existiert bereits!")
|
||||||
|
return redirect('/manage_admins')
|
||||||
else:
|
else:
|
||||||
pw = secrets.token_urlsafe(16)
|
pw = secrets.token_urlsafe(16)
|
||||||
new_user = security.datastore.create_user(username=form.name.data, email=form.email.data,
|
new_user = user_datastore.create_user(username=form.name.data, email=form.email.data,
|
||||||
password=hash_password(pw))
|
password=hash_password(pw))
|
||||||
security.datastore.add_role_to_user(new_user, 'local')
|
user_datastore.add_role_to_user(new_user, 'local')
|
||||||
current_app.logger.info(
|
logger.info(
|
||||||
f"Super admin {current_user.username} created new user account for {new_user.username} <{new_user.email}>")
|
f"Super admin {current_user.username} created new user account for {new_user.username} <{new_user.email}>")
|
||||||
flash(f"Ein Account für den Nutzer {new_user.username} wurde erstellt. Verwende das Passwort {pw} um den Nutzer einzuloggen.")
|
flash(f"Ein Account für den Nutzer {new_user.username} wurde erstellt. Verwende das Passwort {pw} um den Nutzer einzuloggen.")
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
else:
|
|
||||||
flash(f"Ungültige Eingabe: {form.errors}")
|
|
||||||
|
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
|
|
||||||
|
@app.route('/delete_admins/<username>', methods=['GET', 'POST'])
|
||||||
@door_app.route('/delete_admins/<username>', methods=['GET', 'POST'])
|
@roles_required('super_admin')
|
||||||
@roles_required('super_admin')
|
def delete_admins(username):
|
||||||
def delete_admins(username):
|
user = user_datastore.find_user(username=username)
|
||||||
user = security.datastore.find_user(username=username)
|
|
||||||
if user is None:
|
if user is None:
|
||||||
flash(f"Ungültiger Nutzer {username}")
|
flash(f"Ungültiger Nutzer {username}")
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
|
@ -119,54 +338,52 @@ def delete_admins(username):
|
||||||
# return page asking the user to confirm delete
|
# return page asking the user to confirm delete
|
||||||
return render_template('delete_user.html', username=username, form=form)
|
return render_template('delete_user.html', username=username, form=form)
|
||||||
elif form.validate():
|
elif form.validate():
|
||||||
security.datastore.delete_user(user)
|
user_datastore.delete_user(user)
|
||||||
flash(f"Benutzer {username} wurde gelöscht.")
|
flash(f"Benutzer {username} wurde gelöscht.")
|
||||||
current_app.logger.info(f"Super admin {current_user.username} deleted admin user {username}")
|
logger.info(f"Super admin {current_user.username} deleted admin user {username}")
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
else:
|
else:
|
||||||
flash("Der eingegebene Nutzername stimmt nicht überein. Der Benutzer wurde nicht gelöscht!")
|
flash("Der eingegebene Nutzername stimmt nicht überein. Der Benutzer wurde nicht gelöscht!")
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
|
|
||||||
|
@app.route('/admin_toggle_active/<username>')
|
||||||
@door_app.route('/admin_toggle_active/<username>')
|
@roles_required('super_admin')
|
||||||
@roles_required('super_admin')
|
def admin_toggle_active(username):
|
||||||
def admin_toggle_active(username):
|
user = user_datastore.find_user(username=username)
|
||||||
user = security.datastore.find_user(username=username)
|
|
||||||
if user is None:
|
if user is None:
|
||||||
flash(f"Ungültiger Nutzer {username}")
|
flash(f"Ungültiger Nutzer {username}")
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
if user.has_role('super_admin'):
|
if user.has_role('super_admin'):
|
||||||
flash('Super-Admins können nicht deaktiviert werden!')
|
flash('Super-Admins können nicht deaktiviert werden!')
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
security.datastore.toggle_active(user)
|
user_datastore.toggle_active(user)
|
||||||
if user.is_active:
|
if user.is_active:
|
||||||
current_app.logger.info(f"Super admin {current_user.username} activated access for admin user {username}")
|
logger.info(f"Super admin {current_user.username} activated access for admin user {username}")
|
||||||
else:
|
else:
|
||||||
current_app.logger.info(f"Super admin {current_user.username} deactivated access for admin user {username}")
|
logger.info(f"Super admin {current_user.username} deactivated access for admin user {username}")
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
|
|
||||||
|
@app.route('/promote_admin/<username>')
|
||||||
@door_app.route('/promote_admin/<username>')
|
@roles_required('super_admin')
|
||||||
@roles_required('super_admin')
|
def promote_admin(username):
|
||||||
def promote_admin(username):
|
user = user_datastore.find_user(username=username)
|
||||||
user = security.datastore.find_user(username=username)
|
|
||||||
if user is None:
|
if user is None:
|
||||||
flash(f"Ungültiger Nutzer {username}")
|
flash(f"Ungültiger Nutzer {username}")
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
if user.has_role('admin'):
|
if user.has_role('admin'):
|
||||||
flash(f'Benutzer {username} hat bereits Admin-Rechte!')
|
flash(f'Benutzer {username} hat bereits Admin-Rechte!')
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
security.datastore.add_role_to_user(user, 'admin')
|
user_datastore.add_role_to_user(user, 'admin')
|
||||||
current_app.logger.info(f"Super admin {current_user.username} granted admin privileges to user {username}")
|
logger.info(f"Super admin {current_user.username} granted admin privileges to user {username}")
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
|
|
||||||
@door_app.route('/demote_admin/<username>')
|
@app.route('/demote_admin/<username>')
|
||||||
@roles_required('super_admin')
|
@roles_required('super_admin')
|
||||||
def demote_admin(username):
|
def demote_admin(username):
|
||||||
user = security.datastore.find_user(username=username)
|
user = user_datastore.find_user(username=username)
|
||||||
if user is None:
|
if user is None:
|
||||||
flash(f"Ungültiger Nutzer {username}")
|
flash(f"Ungültiger Nutzer {username}")
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
|
@ -174,19 +391,18 @@ def demote_admin(username):
|
||||||
flash(f'Benutzer {username} hat Super-Admin-Rechte und kann nicht verändert werden!')
|
flash(f'Benutzer {username} hat Super-Admin-Rechte und kann nicht verändert werden!')
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
if user.has_role('admin'):
|
if user.has_role('admin'):
|
||||||
security.datastore.remove_role_from_user(user, 'admin')
|
user_datastore.remove_role_from_user(user, 'admin')
|
||||||
current_app.logger.info(f"Super admin {current_user.username} revoked admin privileges of user {username}")
|
logger.info(f"Super admin {current_user.username} revoked admin privileges of user {username}")
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
else:
|
else:
|
||||||
flash(f'Benutzer {username} ist bereits kein Admin!')
|
flash(f'Benutzer {username} ist bereits kein Admin!')
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
|
|
||||||
|
@app.route('/backup_user_datastore')
|
||||||
@door_app.route('/backup_user_datastore')
|
@roles_required('super_admin')
|
||||||
@roles_required('super_admin')
|
def backup_user_datastore():
|
||||||
def backup_user_datastore():
|
|
||||||
# get list of defined admin users for backup
|
# get list of defined admin users for backup
|
||||||
users = security.datastore.user_model.query.all()
|
users = user_datastore.user_model.query.all()
|
||||||
user_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, 'password_hash': u.password,
|
user_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, 'password_hash': u.password,
|
||||||
'roles': [r.name for r in u.roles]}
|
'roles': [r.name for r in u.roles]}
|
||||||
for u in users if not u.has_role('super_admin')]
|
for u in users if not u.has_role('super_admin')]
|
||||||
|
@ -198,10 +414,9 @@ def backup_user_datastore():
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return str(e)
|
return str(e)
|
||||||
|
|
||||||
|
@app.route('/restore_user_datastore', methods=['POST'])
|
||||||
@door_app.route('/restore_user_datastore', methods=['POST'])
|
@roles_required('super_admin')
|
||||||
@roles_required('super_admin')
|
def restore_user_datastore():
|
||||||
def restore_user_datastore():
|
|
||||||
# check if the post request has the file part
|
# check if the post request has the file part
|
||||||
if 'file' not in request.files:
|
if 'file' not in request.files:
|
||||||
flash('Keine Datei ausgewählt!')
|
flash('Keine Datei ausgewählt!')
|
||||||
|
@ -228,9 +443,9 @@ def restore_user_datastore():
|
||||||
entry_valid &= type(d['roles']) == list
|
entry_valid &= type(d['roles']) == list
|
||||||
validate_email(d['email'])
|
validate_email(d['email'])
|
||||||
if entry_valid:
|
if entry_valid:
|
||||||
existing_user = security.datastore.find_user(username=d['username'], email=d['email'])
|
existing_user = user_datastore.find_user(username=d['username'], email=d['email'])
|
||||||
if existing_user is None:
|
if existing_user is None:
|
||||||
security.datastore.create_user(username=d['username'], email=d['email'],
|
user_datastore.create_user(username=d['username'], email=d['email'],
|
||||||
password=d['password_hash'], active=d['active'],
|
password=d['password_hash'], active=d['active'],
|
||||||
roles=d['roles'])
|
roles=d['roles'])
|
||||||
flash(f"Account für Benutzer '{d['username']} wurde wiederhergestellt.")
|
flash(f"Account für Benutzer '{d['username']} wurde wiederhergestellt.")
|
||||||
|
@ -248,43 +463,38 @@ def restore_user_datastore():
|
||||||
else:
|
else:
|
||||||
flash("Ungültige Dateiendung")
|
flash("Ungültige Dateiendung")
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
|
# main page
|
||||||
|
@app.route('/')
|
||||||
|
def door_lock():
|
||||||
|
return render_template('index.html', door_state=door.state, encoder_position=door.encoder_position)
|
||||||
|
|
||||||
|
# token overview
|
||||||
# main page
|
@app.route('/tokens')
|
||||||
@door_app.route('/')
|
@roles_required('admin')
|
||||||
def door_lock():
|
def list_tokens():
|
||||||
return render_template('index.html', door_state=current_app.door.state, encoder_position=current_app.door.encoder_position)
|
tokens = door.get_tokens()
|
||||||
|
|
||||||
|
|
||||||
# token overview
|
|
||||||
@door_app.route('/tokens')
|
|
||||||
@roles_required('admin')
|
|
||||||
def list_tokens():
|
|
||||||
tokens = current_app.door.get_tokens()
|
|
||||||
assigned_tokens = {t: data for t, data in tokens.items() if not data['inactive']}
|
assigned_tokens = {t: data for t, data in tokens.items() if not data['inactive']}
|
||||||
inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']}
|
inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']}
|
||||||
return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens)
|
return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens)
|
||||||
|
|
||||||
|
@app.route('/token-log')
|
||||||
@door_app.route('/token-log')
|
@roles_required('super_admin')
|
||||||
@roles_required('super_admin')
|
def token_log():
|
||||||
def token_log():
|
|
||||||
log = []
|
log = []
|
||||||
try:
|
try:
|
||||||
with open(current_app.config['NFC_LOG']) as f:
|
with open(config.nfc_log) as f:
|
||||||
log += f.readlines()
|
log += f.readlines()
|
||||||
log.reverse()
|
log.reverse()
|
||||||
log = [l.split(' - ') for l in log]
|
log = [l.split(' - ') for l in log]
|
||||||
return render_template('token_log.html', log=log)
|
return render_template('token_log.html', log=log)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(f"NFC logfile {Path(current_app.config['NFC_LOG']).absolute()} konnte nicht gelesen werden. Exception: {e}")
|
flash(f"NFC logfile {Path(config.nfc_log).absolute()} konnte nicht gelesen werden. Exception: {e}")
|
||||||
return redirect('/')
|
return redirect('/')
|
||||||
|
|
||||||
|
# routes for registering, editing and deleting tokens
|
||||||
# routes for registering, editing and deleting tokens
|
@app.route('/register-token', methods=['GET', 'POST'])
|
||||||
@door_app.route('/register-token', methods=['GET', 'POST'])
|
@roles_required('admin')
|
||||||
@roles_required('admin')
|
def register():
|
||||||
def register():
|
|
||||||
"""Register new token for locking and unlocking the door.
|
"""Register new token for locking and unlocking the door.
|
||||||
|
|
||||||
This route displays the most recently scanned invalid token as reported in the logfile and provides a form for
|
This route displays the most recently scanned invalid token as reported in the logfile and provides a form for
|
||||||
|
@ -293,7 +503,7 @@ def register():
|
||||||
If the route is called via POST the provided form data is checked and if the check succeeds the /store-token route
|
If the route is called via POST the provided form data is checked and if the check succeeds the /store-token route
|
||||||
will be called which adds the new token to the database.
|
will be called which adds the new token to the database.
|
||||||
"""
|
"""
|
||||||
token = current_app.door.get_most_recent_token()
|
token = door.get_most_recent_token()
|
||||||
|
|
||||||
recent_token = {}
|
recent_token = {}
|
||||||
if {'token', 'timestamp'}.issubset(set(token.keys())):
|
if {'token', 'timestamp'}.issubset(set(token.keys())):
|
||||||
|
@ -311,7 +521,7 @@ def register():
|
||||||
return render_template('register.html', token=recent_token, form=form)
|
return render_template('register.html', token=recent_token, form=form)
|
||||||
elif request.method == 'POST' and form.validate():
|
elif request.method == 'POST' and form.validate():
|
||||||
# store data in session cookie
|
# store data in session cookie
|
||||||
session['token'] = current_app.door.get_most_recent_token()['token']
|
session['token'] = door.get_most_recent_token()['token']
|
||||||
session['name'] = form.name.data
|
session['name'] = form.name.data
|
||||||
session['email'] = form.email.data
|
session['email'] = form.email.data
|
||||||
session['organization'] = form.organization.data
|
session['organization'] = form.organization.data
|
||||||
|
@ -324,10 +534,9 @@ def register():
|
||||||
else:
|
else:
|
||||||
return render_template('register.html', token=recent_token, form=form)
|
return render_template('register.html', token=recent_token, form=form)
|
||||||
|
|
||||||
|
@app.route('/edit-token/<token>', methods=['GET', 'POST'])
|
||||||
@door_app.route('/edit-token/<token>', methods=['GET', 'POST'])
|
@roles_required('admin')
|
||||||
@roles_required('admin')
|
def edit_token(token):
|
||||||
def edit_token(token):
|
|
||||||
"""Edit data in the token file (name, email, valid_thru date, active/inactive).
|
"""Edit data in the token file (name, email, valid_thru date, active/inactive).
|
||||||
|
|
||||||
If the route is accessed via GET it will provide a form for editing the currently stored data for the user.
|
If the route is accessed via GET it will provide a form for editing the currently stored data for the user.
|
||||||
|
@ -343,7 +552,7 @@ def edit_token(token):
|
||||||
form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed
|
form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed
|
||||||
# to it before
|
# to it before
|
||||||
if request.method == 'GET':
|
if request.method == 'GET':
|
||||||
tokens = current_app.door.get_tokens()
|
tokens = door.get_tokens()
|
||||||
if token in tokens:
|
if token in tokens:
|
||||||
# set default for form according to values from the token file
|
# set default for form according to values from the token file
|
||||||
et = tokens[token]
|
et = tokens[token]
|
||||||
|
@ -380,33 +589,31 @@ def edit_token(token):
|
||||||
else:
|
else:
|
||||||
return render_template('edit.html', token=token, form=form)
|
return render_template('edit.html', token=token, form=form)
|
||||||
|
|
||||||
|
@app.route('/store-token')
|
||||||
@door_app.route('/store-token')
|
@roles_required('admin')
|
||||||
@roles_required('admin')
|
def store_token():
|
||||||
def store_token():
|
|
||||||
"""Store token to the token file on disk.
|
"""Store token to the token file on disk.
|
||||||
|
|
||||||
This will use the token id and the associated data stored in the session cookie (filled by register_token() or
|
This will use the token id and the associated data stored in the session cookie (filled by register_token() or
|
||||||
edit_token()) and create/modify a token and store the new token file to disk.
|
edit_token()) and create/modify a token and store the new token file to disk.
|
||||||
"""
|
"""
|
||||||
token = session['token']
|
token = session['token']
|
||||||
tokens = current_app.door.get_tokens()
|
tokens = door.get_tokens()
|
||||||
tokens[token] = {'name': session['name'],
|
tokens[token] = {'name': session['name'],
|
||||||
'email': session['email'],
|
'email': session['email'],
|
||||||
'valid_thru': session['valid_thru'],
|
'valid_thru': session['valid_thru'],
|
||||||
'inactive': session['inactive'],
|
'inactive': session['inactive'],
|
||||||
'organization': session['organization']}
|
'organization': session['organization']}
|
||||||
try:
|
try:
|
||||||
current_app.door.store_tokens(tokens)
|
door.store_tokens(tokens)
|
||||||
current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}")
|
logger.info(f"Token {token} stored in database by admin user {current_user.username}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(f"Error during store_tokens. Exception: {e}")
|
flash(f"Error during store_tokens. Exception: {e}")
|
||||||
return redirect('/tokens')
|
return redirect('/tokens')
|
||||||
|
|
||||||
|
@app.route('/delete-token/<token>', methods=['GET', 'POST'])
|
||||||
@door_app.route('/delete-token/<token>', methods=['GET', 'POST'])
|
@roles_required('admin')
|
||||||
@roles_required('admin')
|
def delete_token(token):
|
||||||
def delete_token(token):
|
|
||||||
"""Delete the given token from the token file and store the new token file to disk
|
"""Delete the given token from the token file and store the new token file to disk
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
|
@ -414,7 +621,7 @@ def delete_token(token):
|
||||||
token : str
|
token : str
|
||||||
The token to delete from the database.
|
The token to delete from the database.
|
||||||
"""
|
"""
|
||||||
tokens = current_app.door.get_tokens()
|
tokens = door.get_tokens()
|
||||||
|
|
||||||
if token in tokens:
|
if token in tokens:
|
||||||
token_to_delete = tokens[token]
|
token_to_delete = tokens[token]
|
||||||
|
@ -430,8 +637,8 @@ def delete_token(token):
|
||||||
# form validation successful -> can delete the token
|
# form validation successful -> can delete the token
|
||||||
tokens.pop(token)
|
tokens.pop(token)
|
||||||
try:
|
try:
|
||||||
current_app.door.store_tokens(tokens)
|
door.store_tokens(tokens)
|
||||||
current_app.logger.info(f"Token {token} was deleted from database by admin user {current_user.username}")
|
logger.info(f"Token {token} was deleted from database by admin user {current_user.username}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(f"Error during store_tokens. Exception: {e}")
|
flash(f"Error during store_tokens. Exception: {e}")
|
||||||
flash(f"Token {token} wurde gelöscht!")
|
flash(f"Token {token} wurde gelöscht!")
|
||||||
|
@ -445,10 +652,9 @@ def delete_token(token):
|
||||||
flash(f'Ungültiger Token {token} für Löschung.')
|
flash(f'Ungültiger Token {token} für Löschung.')
|
||||||
return redirect('/tokens')
|
return redirect('/tokens')
|
||||||
|
|
||||||
|
@app.route('/deactivate-token/<token>')
|
||||||
@door_app.route('/deactivate-token/<token>')
|
@roles_required('admin')
|
||||||
@roles_required('admin')
|
def deactivate_token(token):
|
||||||
def deactivate_token(token):
|
|
||||||
"""Deactivate access for the given token. This updates the token file on disk.
|
"""Deactivate access for the given token. This updates the token file on disk.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
|
@ -456,22 +662,21 @@ def deactivate_token(token):
|
||||||
token : str
|
token : str
|
||||||
The token to deactivate.
|
The token to deactivate.
|
||||||
"""
|
"""
|
||||||
tokens = current_app.door.get_tokens()
|
tokens = door.get_tokens()
|
||||||
if token in tokens:
|
if token in tokens:
|
||||||
tokens[token]['inactive'] = True
|
tokens[token]['inactive'] = True
|
||||||
try:
|
try:
|
||||||
current_app.door.store_tokens(tokens)
|
door.store_tokens(tokens)
|
||||||
current_app.logger.info(f"Token {token} deactivated by admin user {current_user.username}")
|
logger.info(f"Token {token} deactivated by admin user {current_user.username}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(f"Error during store_tokens. Exception: {e}")
|
flash(f"Error during store_tokens. Exception: {e}")
|
||||||
return redirect('/tokens')
|
return redirect('/tokens')
|
||||||
|
|
||||||
|
@app.route('/backup_tokens')
|
||||||
@door_app.route('/backup_tokens')
|
@roles_required('admin')
|
||||||
@roles_required('admin')
|
def backup_tokens():
|
||||||
def backup_tokens():
|
|
||||||
# get list of defined admin users for backup
|
# get list of defined admin users for backup
|
||||||
tokens = current_app.door.get_tokens()
|
tokens = door.get_tokens()
|
||||||
try:
|
try:
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
file = Path(tmpdir, 'token_data.txt')
|
file = Path(tmpdir, 'token_data.txt')
|
||||||
|
@ -480,25 +685,68 @@ def backup_tokens():
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return str(e)
|
return str(e)
|
||||||
|
|
||||||
|
@app.route('/open')
|
||||||
|
@auth_required()
|
||||||
|
def open_door():
|
||||||
|
|
||||||
@door_app.route('/open')
|
|
||||||
@auth_required()
|
|
||||||
def open_door():
|
|
||||||
try:
|
try:
|
||||||
current_app.door.open_door(user=current_user.username)
|
door.open_door(user=current_user.username)
|
||||||
current_app.logger.info(f"Door opened by admin user {current_user.username}")
|
logger.info(f"Door opened by admin user {current_user.username}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(f'Could not open door. Exception: {e}')
|
flash(f'Could not open door. Exception: {e}')
|
||||||
return redirect('/')
|
return redirect('/')
|
||||||
|
|
||||||
|
# routes for opening and closing the door via the web interface
|
||||||
# routes for opening and closing the door via the web interface
|
@app.route('/close')
|
||||||
@door_app.route('/close')
|
@auth_required()
|
||||||
@auth_required()
|
def close_door():
|
||||||
def close_door():
|
|
||||||
try:
|
try:
|
||||||
current_app.door.close_door(user=current_user.username)
|
door.close_door(user=current_user.username)
|
||||||
current_app.logger.info(f"Door closed by admin user {current_user.username}")
|
logger.info(f"Door closed by admin user {current_user.username}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(f'Could not close door. Exception: {e}')
|
flash(f'Could not close door. Exception: {e}')
|
||||||
return redirect('/')
|
return redirect('/')
|
||||||
|
|
||||||
|
# setup user database when starting the app
|
||||||
|
with app.app_context():
|
||||||
|
new_admin_data = []
|
||||||
|
if config.admin_file is not None:
|
||||||
|
if not Path(config.admin_file).exists():
|
||||||
|
logger.warning(f"Admin user creation file not found at {config.admin_file}")
|
||||||
|
else:
|
||||||
|
# store data for new admins in memory s.t. the file can be deleted afterwards
|
||||||
|
with open(config.admin_file) as f:
|
||||||
|
for i, line in enumerate(f.readlines()):
|
||||||
|
if not line.strip().startswith('#'):
|
||||||
|
try:
|
||||||
|
user, email, pw = line.split()
|
||||||
|
validate_email(email)
|
||||||
|
new_admin_data.append({'username': user, 'email': email, 'password': pw})
|
||||||
|
except Exception as e:
|
||||||
|
print(
|
||||||
|
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
|
||||||
|
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
|
||||||
|
|
||||||
|
# create admin users (only if they don't exists already)
|
||||||
|
def create_super_admins(new_admin_data):
|
||||||
|
db.create_all()
|
||||||
|
super_admin_role = user_datastore.find_or_create_role('super_admin') # root admin = can create other admins
|
||||||
|
admin_role = user_datastore.find_or_create_role('admin') # 'normal' admin
|
||||||
|
local_role = user_datastore.find_or_create_role('local') # LDAP user or local user
|
||||||
|
|
||||||
|
for d in new_admin_data:
|
||||||
|
if user_datastore.find_user(email=d['email'], username=d['username']) is None:
|
||||||
|
roles = [super_admin_role, admin_role]
|
||||||
|
if not d['password'] == 'LDAP':
|
||||||
|
roles.append(local_role)
|
||||||
|
logger.info(f"New super admin user created with username '{d['username']}' and email '{d['email']}', roles = {[r.name for r in roles]}")
|
||||||
|
|
||||||
|
# create new admin (only if admin does not already exist)
|
||||||
|
new_admin = user_datastore.create_user(email=d['email'], username=d['username'],
|
||||||
|
password=hash_password(d['password']),
|
||||||
|
roles=roles)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
create_super_admins(new_admin_data)
|
||||||
|
|
||||||
|
return app
|
||||||
|
|
5
requirements.txt
Normal file
5
requirements.txt
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
flask~=1.1.2
|
||||||
|
Flask-Security-Too~=4.0.0
|
||||||
|
WTForms~=2.3.3
|
||||||
|
paho-mqtt~=1.5.1
|
||||||
|
bleach~=3.3.0
|
32
setup.cfg
32
setup.cfg
|
@ -15,35 +15,3 @@ classifiers =
|
||||||
Programming Language :: Python :: 3
|
Programming Language :: Python :: 3
|
||||||
License :: OSI Approved :: GNU General Public License v3 (GPLv3)
|
License :: OSI Approved :: GNU General Public License v3 (GPLv3)
|
||||||
Operating System :: OS Independent
|
Operating System :: OS Independent
|
||||||
|
|
||||||
[options]
|
|
||||||
python_requires = >=3.8
|
|
||||||
install_requires =
|
|
||||||
bleach
|
|
||||||
Flask
|
|
||||||
Flask-Mail
|
|
||||||
Flask-Security-Too
|
|
||||||
Flask-SQLAlchemy
|
|
||||||
Flask-WTF
|
|
||||||
email_validator
|
|
||||||
paho-mqtt
|
|
||||||
ldap3
|
|
||||||
wtforms
|
|
||||||
|
|
||||||
include_package_data = True
|
|
||||||
packages = find:
|
|
||||||
setup_requires =
|
|
||||||
wheel
|
|
||||||
tests_require = pytest>=3
|
|
||||||
zip_safe = False
|
|
||||||
scripts= bin/launch_webadmin
|
|
||||||
|
|
||||||
[options.extras_require]
|
|
||||||
dev =
|
|
||||||
pytest
|
|
||||||
pytest-cov
|
|
||||||
pytest-flask
|
|
||||||
pytest-mock
|
|
||||||
flake8
|
|
||||||
selenium
|
|
||||||
beautifulsoup4
|
|
16
setup.py
16
setup.py
|
@ -1,3 +1,17 @@
|
||||||
from setuptools import setup
|
from setuptools import setup
|
||||||
|
|
||||||
setup()
|
setup(install_requires=[
|
||||||
|
'bleach',
|
||||||
|
'Flask',
|
||||||
|
'Flask-Mail',
|
||||||
|
'Flask-Security-Too',
|
||||||
|
'Flask-SQLAlchemy',
|
||||||
|
'Flask-WTF',
|
||||||
|
'email_validator',
|
||||||
|
'paho-mqtt',
|
||||||
|
'ldap3',
|
||||||
|
],
|
||||||
|
include_package_data=True,
|
||||||
|
scripts=['bin/launch_webadmin'],
|
||||||
|
packages=['imaginaerraum_door_admin'],
|
||||||
|
zip_safe=False)
|
||||||
|
|
|
@ -1,21 +0,0 @@
|
||||||
import pytest
|
|
||||||
|
|
||||||
from selenium.webdriver import Chrome
|
|
||||||
from imaginaerraum_door_admin import create_app
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
|
||||||
def app():
|
|
||||||
"""Fixture to launch the webapp"""
|
|
||||||
app = create_app()
|
|
||||||
|
|
||||||
return app
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def browser():
|
|
||||||
"""Fixture for a selenium browser to access the webapp."""
|
|
||||||
driver = Chrome()
|
|
||||||
driver.implicitly_wait(10)
|
|
||||||
yield driver
|
|
||||||
driver.quit()
|
|
|
@ -1,119 +0,0 @@
|
||||||
import pytest
|
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
from imaginaerraum_door_admin.door_handle import DoorHandle
|
|
||||||
import re
|
|
||||||
|
|
||||||
def test_login(browser, live_server):
|
|
||||||
response = browser.get(f'http://localhost:{live_server.port}')
|
|
||||||
|
|
||||||
assert '<h1>Space Zugangsverwaltung</h1>' in browser.page_source
|
|
||||||
|
|
||||||
response = browser.get(f'http://localhost:{live_server.port}/login')
|
|
||||||
|
|
||||||
email_form = browser.find_element_by_id('email').send_keys('gandalf@shire.me')
|
|
||||||
password_form = browser.find_element_by_id('password').send_keys('shadowfax')
|
|
||||||
submit_button = browser.find_element_by_id('submit').click()
|
|
||||||
|
|
||||||
assert 'Tür öffnen' in browser.page_source
|
|
||||||
|
|
||||||
|
|
||||||
def extract_csrf_token(response):
|
|
||||||
soup = BeautifulSoup(response.data)
|
|
||||||
csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value']
|
|
||||||
return csrf_token
|
|
||||||
|
|
||||||
|
|
||||||
def headless_login(client, user='gandalf@shire.me', password='shadowfax'):
|
|
||||||
# extract csrf token from the login page source
|
|
||||||
response = client.get('/login')
|
|
||||||
csrf_token = extract_csrf_token(response)
|
|
||||||
|
|
||||||
# send login information
|
|
||||||
payload = {
|
|
||||||
'csrf_token': csrf_token,
|
|
||||||
'email': user,
|
|
||||||
'password': password
|
|
||||||
}
|
|
||||||
return client.post('/login', data=payload, follow_redirects=True)
|
|
||||||
|
|
||||||
|
|
||||||
def test_login_headless(client):
|
|
||||||
response = headless_login(client)
|
|
||||||
soup = BeautifulSoup(response.data)
|
|
||||||
|
|
||||||
# make sure login succeeded -> Tür öffnen button will appear
|
|
||||||
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def client_authenticated(client):
|
|
||||||
# log in using admin account for testing
|
|
||||||
headless_login(client)
|
|
||||||
|
|
||||||
yield client
|
|
||||||
|
|
||||||
|
|
||||||
def test_open_door_button(client_authenticated, mocker):
|
|
||||||
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.open_door')
|
|
||||||
|
|
||||||
# visit route for open
|
|
||||||
client_authenticated.get('/open')
|
|
||||||
|
|
||||||
# make sure the open method was called
|
|
||||||
DoorHandle.open_door.assert_called_once_with(user='gandalf')
|
|
||||||
|
|
||||||
|
|
||||||
def test_close_door_button(client_authenticated, mocker):
|
|
||||||
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.close_door')
|
|
||||||
|
|
||||||
# visit route for open
|
|
||||||
client_authenticated.get('/close')
|
|
||||||
|
|
||||||
# make sure the open method was called
|
|
||||||
DoorHandle.close_door.assert_called_once_with(user='gandalf')
|
|
||||||
|
|
||||||
|
|
||||||
def test_manage_admins(client_authenticated):
|
|
||||||
# visit admin management page
|
|
||||||
response = client_authenticated.get('/manage_admins')
|
|
||||||
|
|
||||||
assert "Nutzer Übersicht" in response.data.decode()
|
|
||||||
assert "gandalf" in response.data.decode()
|
|
||||||
assert "gandalf@shire.me" in response.data.decode()
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_admin(client_authenticated):
|
|
||||||
# visit admin management page
|
|
||||||
response = client_authenticated.get('/manage_admins')
|
|
||||||
csrf_token = extract_csrf_token(response)
|
|
||||||
|
|
||||||
# post data for creating a new admin
|
|
||||||
payload = {'name': 'bilbo',
|
|
||||||
'email': 'bilbo@shire.me',
|
|
||||||
'csrf_token': csrf_token}
|
|
||||||
response = client_authenticated.post('/manage_admins', data=payload,
|
|
||||||
follow_redirects=True)
|
|
||||||
|
|
||||||
# after the new admin user is created, we should have been redirected to the
|
|
||||||
# /manage_admin page. there, the password for login is displayed
|
|
||||||
# we test if the newly created user can log in with that password
|
|
||||||
# extract password displayed on the page
|
|
||||||
match = re.search('Passwort (?P<password>.*) um', response.data.decode())
|
|
||||||
assert match is not None
|
|
||||||
extracted_password = match['password']
|
|
||||||
|
|
||||||
# log out current user
|
|
||||||
response = client_authenticated.get('/logout')
|
|
||||||
|
|
||||||
# try to log in new user using the extracted password
|
|
||||||
response = headless_login(client_authenticated, user='bilbo',
|
|
||||||
password=extracted_password)
|
|
||||||
# - see if it works
|
|
||||||
soup = BeautifulSoup(response.data)
|
|
||||||
|
|
||||||
# make sure login succeeded
|
|
||||||
# -> username should be displayed
|
|
||||||
assert 'Benutzer <span>bilbo</span>' in soup.decode()
|
|
||||||
# -> Tür öffnen button will appear
|
|
||||||
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user