Compare commits
5 Commits
4197446a00
...
a71f68ade3
Author | SHA1 | Date | |
---|---|---|---|
a71f68ade3 | |||
312549ac15 | |||
f021f7494f | |||
8cdf549c4c | |||
e707f4bd87 |
imaginaerraum_door_admin
57
imaginaerraum_door_admin/templates/admins.html
Normal file
57
imaginaerraum_door_admin/templates/admins.html
Normal file
|
@ -0,0 +1,57 @@
|
|||
{% extends 'base.html' %}
|
||||
{% block header %}
|
||||
{% block title %}<h1>Admin Übersicht</h1>{% endblock %}
|
||||
|
||||
<script src="../static/jquery-3.6.0.js"></script>
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<table border="1">
|
||||
<td>Username</td>
|
||||
<td>E-Mail</td>
|
||||
<td>Aktiv</td>
|
||||
<td>Aktionen</td>
|
||||
{% for data in admin_data %}
|
||||
<tr>
|
||||
{% for field in ['username', 'email', 'active'] %}
|
||||
<td>{{ data[field] if data[field] }}</td>
|
||||
{% endfor %}
|
||||
<td>
|
||||
<a href="{{ url_for('admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
|
||||
<a href="{{ url_for('delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
||||
</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</table>
|
||||
<div>
|
||||
<p>
|
||||
Neuen Admin erstellen:
|
||||
</p>
|
||||
<form method="POST">
|
||||
<table>
|
||||
{{ form.csrf_token }}
|
||||
<tr>
|
||||
<td>{{ form.name.label }}</td>
|
||||
<td>{{ form.name(size=20) }}</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>{{ form.email.label }}</td>
|
||||
<td>{{ form.email(size=20) }}</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td></td>
|
||||
<td>
|
||||
<input type="submit" value="Abschicken">
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
</form>
|
||||
</div>
|
||||
<form action="{{ url_for('backup_user_datastore') }}" method="get">
|
||||
<input type="submit" value="Admin Daten sichern">
|
||||
</form>
|
||||
<form action="{{ url_for('restore_user_datastore') }}" method=post enctype=multipart/form-data>
|
||||
<input type=file name=file>
|
||||
<input type=submit value="Admin Daten wiederherstellen">
|
||||
</form>
|
||||
{% endblock %}
|
|
@ -8,6 +8,9 @@
|
|||
<li><a href="{{ url_for('list_tokens') }}">Token Übersicht</a>
|
||||
<li><a href="{{ url_for('open_door') }}">Tür öffnen</a>
|
||||
<li><a href="{{ url_for('close_door') }}">Tür schließen</a>
|
||||
{% if current_user.has_role('super_admin') %}
|
||||
<li><a href="{{ url_for('manage_admins') }}">Admins verwalten</a>
|
||||
{% endif %}
|
||||
{% if current_user.is_authenticated %}
|
||||
<li><a href="{{ url_for('security.change_password') }}">Passwort ändern</a>
|
||||
<li><a href="{{ url_for('security.logout') }}">Benutzer <span>{{ current_user.username }}</span> ausloggen</a>
|
||||
|
|
27
imaginaerraum_door_admin/templates/delete_admin.html
Normal file
27
imaginaerraum_door_admin/templates/delete_admin.html
Normal file
|
@ -0,0 +1,27 @@
|
|||
{% extends 'base.html' %}
|
||||
{% block header %}
|
||||
{% block title %}<h1>Admin Benutzer löschen</h1>{% endblock %}
|
||||
<script src="../static/jquery-3.6.0.js"></script>
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<div>
|
||||
Achtung, Admin '{{ username }}' wird gelöscht.
|
||||
Bitte zur Bestätigung den Nutzernamen eingeben:
|
||||
<form method="POST">
|
||||
<table>
|
||||
{{ form.csrf_token }}
|
||||
<tr>
|
||||
<td>{{ form.name.label }}</td>
|
||||
<td>{{ form.name(size=20) }}</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td></td>
|
||||
<td>
|
||||
<input type="submit" value="Bestätigen">
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
</form>
|
||||
</div>
|
||||
{% endblock %}
|
|
@ -39,4 +39,7 @@
|
|||
</tr>
|
||||
{% endfor %}
|
||||
</table>
|
||||
<form action="{{ url_for('backup_tokens') }}" method="get">
|
||||
<input type="submit" value="Token Daten sichern">
|
||||
</form>
|
||||
{% endblock %}
|
|
@ -1,21 +1,27 @@
|
|||
import os
|
||||
from flask import Flask, render_template, request, flash, redirect, session, url_for
|
||||
from flask import Flask, render_template, request, flash, redirect, session, send_file
|
||||
from werkzeug.utils import secure_filename
|
||||
from flask_wtf import FlaskForm
|
||||
from wtforms.fields.html5 import DateField, EmailField
|
||||
from wtforms.fields import StringField, BooleanField
|
||||
from wtforms.validators import DataRequired, ValidationError, EqualTo
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password, uia_email_mapper, current_user
|
||||
from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password, uia_email_mapper, \
|
||||
current_user, roles_required
|
||||
from flask_security.models import fsqla_v2 as fsqla
|
||||
from flask_security.forms import LoginForm, Required, PasswordField
|
||||
from flask_security.utils import find_user, verify_password
|
||||
from flask_mail import Mail
|
||||
from email_validator import validate_email
|
||||
from email_validator import validate_email, EmailNotValidError
|
||||
|
||||
import json
|
||||
import secrets
|
||||
import bleach
|
||||
import ldap3
|
||||
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
|
||||
from pathlib import Path
|
||||
import logging
|
||||
import tempfile
|
||||
|
||||
from datetime import date
|
||||
from .door_handle import DoorHandle
|
||||
|
@ -41,14 +47,22 @@ class TokenForm(FlaskForm):
|
|||
active = BooleanField('Aktiv?')
|
||||
dsgvo = BooleanField('Einwilligung Nutzungsbedingungen erfragt?', validators=[DataRequired()])
|
||||
|
||||
class TokenDeleteForm(FlaskForm):
|
||||
|
||||
class ConfirmDeleteForm(FlaskForm):
|
||||
name = StringField('Name', validators=[DataRequired(), EqualTo('name_confirm', 'Name stimmt nicht überein')])
|
||||
name_confirm = StringField('Name confirm')
|
||||
|
||||
|
||||
class AdminCreationForm(FlaskForm):
|
||||
name = StringField('Name', validators=[DataRequired()])
|
||||
email = EmailField('E-Mail', validators=[DataRequired()])
|
||||
|
||||
|
||||
def uia_username_mapper(identity):
|
||||
# we allow pretty much anything - but we bleach it.
|
||||
return bleach.clean(identity, strip=True)
|
||||
|
||||
|
||||
def create_application(config):
|
||||
# set up logging for the web app
|
||||
logger = logging.getLogger('webapp')
|
||||
|
@ -76,22 +90,6 @@ def create_application(config):
|
|||
if not Path(config.token_file).exists():
|
||||
logger.warning(f"Token file not found at {config.token_file}")
|
||||
|
||||
new_admin_data = []
|
||||
if config.admin_file is not None:
|
||||
if not Path(config.admin_file).exists():
|
||||
logger.warning(f"Admin user creation file not found at {config.admin_file}")
|
||||
else:
|
||||
# store data for new admins in memory s.t. the file can be deleted afterwards
|
||||
with open(config.admin_file) as f:
|
||||
for i, d in enumerate(f.readlines()):
|
||||
try:
|
||||
user, email, pw = d.split()
|
||||
validate_email(email)
|
||||
new_admin_data.append({'username': user, 'email': email, 'password': pw})
|
||||
except Exception as e:
|
||||
print(f"Error while parsing line {i} in admin config file. Config file should contain lines of "
|
||||
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
|
||||
|
||||
# create door objects which provides access to the token file and current door state via MQTT
|
||||
door = DoorHandle(token_file=config.token_file, mqtt_host=config.mqtt_host, nfc_socket=config.nfc_socket,
|
||||
logger=logger)
|
||||
|
@ -146,9 +144,9 @@ def create_application(config):
|
|||
|
||||
# LDAP
|
||||
ldap_server = ldap3.Server(config.ldap_url)
|
||||
local_ldap_cache = {} # dict for caching LDAP authorization locally (stores username + hashed password)
|
||||
local_ldap_cache = {} # dict for caching LDAP authorization locally (stores username + hashed password)
|
||||
|
||||
def validate_ldap(user, password):
|
||||
def validate_ldap(username, password):
|
||||
"""Validate the user and password through an LDAP server.
|
||||
|
||||
If the connection completes successfully the given user and password is authorized and the password is stored
|
||||
|
@ -167,19 +165,22 @@ def create_application(config):
|
|||
"""
|
||||
|
||||
try:
|
||||
con = ldap3.Connection(ldap_server, user="uid=%s,ou=Users,dc=imaginaerraum,dc=de" % (user.username,),
|
||||
password=password, auto_bind=True)
|
||||
con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
|
||||
password=password, auto_bind=True)
|
||||
except ldap3.core.exceptions.LDAPBindError:
|
||||
# server reachable but user unauthorized -> fail
|
||||
return False
|
||||
except LDAPSocketOpenError:
|
||||
# server not reachable -> try cached authorization data
|
||||
return user.username in local_ldap_cache and verify_password(password, local_ldap_cache[user.username])
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
# for other Exceptions we just fail
|
||||
return False
|
||||
|
||||
# TODO check if user has permission to edit tokens
|
||||
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de', f'(&(uid={user.username})(memberof=cn=Members,ou=Groups,dc=imaginaerraum,dc=de))')
|
||||
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
|
||||
f'(&(uid={user.username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
|
||||
# if LDAP authorization succeeds we cache the password locally (in memory) to allow LDAP authentication even if
|
||||
# the server is not reachable
|
||||
local_ldap_cache[user.username] = hash_password(password)
|
||||
|
@ -190,17 +191,32 @@ def create_application(config):
|
|||
password = PasswordField('Passwort', [Required()])
|
||||
|
||||
def validate(self):
|
||||
# try authorizing locally using Flask security user datastore
|
||||
authorized = super(ExtendedLoginForm, self).validate()
|
||||
# try authorizing using LDAP
|
||||
# authorization in LDAP uses username -> get username associated with email from the database
|
||||
try:
|
||||
# if an email (instead of a username) was entered for authentication we check if there already is a user
|
||||
# with that email in the database
|
||||
validate_email(self.email.data)
|
||||
user = find_user(self.email.data)
|
||||
if user is not None:
|
||||
username = user.username
|
||||
else:
|
||||
# this means there is no user with that email in the database
|
||||
username = None
|
||||
except EmailNotValidError:
|
||||
# else we use the entered credentials as username
|
||||
username = self.email.data
|
||||
|
||||
authorized = validate_ldap(username, self.password.data)
|
||||
|
||||
if authorized:
|
||||
logger.info(f"Admin user with credentials '{self.email.data}' authorized through LDAP")
|
||||
|
||||
if not authorized:
|
||||
# try authorizing using LDAP
|
||||
# authorization in LDAP uses username -> get username associated with email from the database
|
||||
user = find_user(self.email.data)
|
||||
authorized = validate_ldap(user, self.password.data)
|
||||
if authorized:
|
||||
logger.info(f"Admin user with credentials '{self.email.data}' authorized through LDAP")
|
||||
else:
|
||||
# try authorizing locally using Flask security user datastore
|
||||
authorized = super(ExtendedLoginForm, self).validate()
|
||||
|
||||
if authorized:
|
||||
logger.info(f"Admin user with credentials '{self.email.data}' authorized through local database")
|
||||
|
||||
# if any of the authorization methods is successful we authorize the user
|
||||
|
@ -210,28 +226,148 @@ def create_application(config):
|
|||
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
||||
security = Security(app, user_datastore, login_form=ExtendedLoginForm)
|
||||
|
||||
# create admin users (only if they don't exists already)
|
||||
def create_admins(new_admin_data):
|
||||
for d in new_admin_data:
|
||||
if user_datastore.find_user(email=d['email'], username=d['username']) is None:
|
||||
logger.info(f"New admin user created with username '{d['username']}' and email '{d['email']}'")
|
||||
# create new admin (only if admin does not already exist)
|
||||
user_datastore.create_user(email=d['email'], username=d['username'], password=hash_password(d['password']))
|
||||
# admin user management
|
||||
@app.route('/manage_admins', methods=['GET', 'POST'])
|
||||
@roles_required('super_admin')
|
||||
def manage_admins():
|
||||
form = AdminCreationForm()
|
||||
if request.method == 'GET':
|
||||
users = user_datastore.user_model.query.all()
|
||||
admin_data = [{'username': u.username, 'email': u.email, 'active': u.is_active} for u in users]
|
||||
return render_template('admins.html', admin_data=admin_data, form=form)
|
||||
elif form.validate():
|
||||
if user_datastore.find_user(username=form.name.data) is not None or \
|
||||
user_datastore.find_user(email=form.email.data) is not None:
|
||||
flash("A user with the same name or email is already registered!")
|
||||
return redirect('/manage_admins')
|
||||
else:
|
||||
pw = secrets.token_urlsafe(16)
|
||||
new_user = user_datastore.create_user(username=form.name.data, email=form.email.data, roles=['admin'],
|
||||
password=hash_password(pw))
|
||||
logger.info(
|
||||
f"Super admin {current_user.username} created new admin account for {new_user.username} <{new_user.email}>")
|
||||
flash(
|
||||
f"An account for the new admin user {new_user.username} has been created. Use the randomly generated password {pw} to log in.")
|
||||
db.session.commit()
|
||||
return redirect('/manage_admins')
|
||||
|
||||
# Create a user to test with
|
||||
@app.before_first_request
|
||||
def create_user():
|
||||
db.create_all()
|
||||
if len(new_admin_data) > 0:
|
||||
# create admin accounts from given file
|
||||
create_admins(new_admin_data)
|
||||
@app.route('/delete_admins/<username>', methods=['GET', 'POST'])
|
||||
@roles_required('super_admin')
|
||||
def delete_admins(username):
|
||||
user = user_datastore.find_user(username=username)
|
||||
if user is None:
|
||||
flash(f"Invalid user {username}")
|
||||
return redirect('/manage_admins')
|
||||
if user.has_role('super_admin'):
|
||||
flash('Cannot delete super admins!')
|
||||
return redirect('/manage_admins')
|
||||
if user.is_active:
|
||||
flash('Cannot delete active users. Please deactivate user first!')
|
||||
return redirect('/manage_admins')
|
||||
|
||||
# set up form for confirming deletion
|
||||
form = ConfirmDeleteForm()
|
||||
form.name_confirm.data = username
|
||||
|
||||
if request.method == 'GET':
|
||||
# return page asking the user to confirm delete
|
||||
return render_template('delete_admin.html', username=username, form=form)
|
||||
elif form.validate():
|
||||
user_datastore.delete_user(user)
|
||||
flash(f"Username {username} was deleted.")
|
||||
logger.info(f"Super admin {current_user.username} deleted admin user {username}")
|
||||
db.session.commit()
|
||||
return redirect('/manage_admins')
|
||||
else:
|
||||
flash("Username does not match. User was not deleted!")
|
||||
return redirect('/manage_admins')
|
||||
|
||||
@app.route('/admin_toggle_active/<username>')
|
||||
@roles_required('super_admin')
|
||||
def admin_toggle_active(username):
|
||||
user = user_datastore.find_user(username=username)
|
||||
if user is None:
|
||||
flash(f"Invalid user {username}")
|
||||
return redirect('/manage_admins')
|
||||
if user.has_role('super_admin'):
|
||||
flash('Cannot deactivate super admins!')
|
||||
return redirect('/manage_admins')
|
||||
user_datastore.toggle_active(user)
|
||||
if user.is_active:
|
||||
logger.info(f"Super admin {current_user.username} activated access for admin user {username}")
|
||||
else:
|
||||
logger.info(f"Super admin {current_user.username} deactivated access for admin user {username}")
|
||||
db.session.commit()
|
||||
return redirect('/manage_admins')
|
||||
|
||||
@app.route('/backup_user_datastore')
|
||||
@roles_required('super_admin')
|
||||
def backup_user_datastore():
|
||||
# get list of defined admin users for backup
|
||||
users = user_datastore.user_model.query.all()
|
||||
user_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, 'password_hash': u.password}
|
||||
for u in users if not u.has_role('super_admin')]
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
file = Path(tmpdir, 'user_data.txt')
|
||||
file.write_text(json.dumps(user_data))
|
||||
return send_file(file, as_attachment=True, cache_timeout=-1)
|
||||
except Exception as e:
|
||||
return str(e)
|
||||
|
||||
@app.route('/restore_user_datastore', methods=['POST'])
|
||||
@roles_required('super_admin')
|
||||
def restore_user_datastore():
|
||||
# check if the post request has the file part
|
||||
if 'file' not in request.files:
|
||||
flash('Keine Datei ausgewählt!')
|
||||
return redirect(request.url)
|
||||
file = request.files['file']
|
||||
# if user does not select file, browser also
|
||||
# submit an empty part without filename
|
||||
if file.filename == '':
|
||||
flash('Keine Datei ausgewählt!')
|
||||
return redirect('/manage_admins')
|
||||
filename = secure_filename(file.filename)
|
||||
if file and filename.endswith('.txt'):
|
||||
data = file.stream.read()
|
||||
try:
|
||||
# check validity of user data
|
||||
user_data = json.loads(data)
|
||||
valid = type(user_data) == list
|
||||
valid &= all(type(d) == dict for d in user_data)
|
||||
if valid:
|
||||
for d in user_data:
|
||||
entry_valid = set(d.keys()) == { 'active', 'email', 'password_hash', 'username'}
|
||||
entry_valid &= all(len(d[key]) > 0 for key in ['email', 'password_hash', 'username'])
|
||||
entry_valid &= type(d['active']) == bool
|
||||
validate_email(d['email'])
|
||||
if entry_valid:
|
||||
existing_user = user_datastore.find_user(username=d['username'], email=d['email'])
|
||||
if existing_user is None:
|
||||
user_datastore.create_user(username=d['username'], email=d['email'], password=d['password_hash'],
|
||||
roles=['admin'], active=d['active'])
|
||||
flash(f"Admin Account für Benutzer '{d['username']} wurde wiederhergestellt.")
|
||||
else:
|
||||
flash(f"Admin '{d['username']} existiert bereits. Eintrag wird übersprungen.")
|
||||
else:
|
||||
raise ValueError(f"Ungültige Daten für User Entry {d}")
|
||||
else:
|
||||
raise ValueError("Admin User Datei hat ungültiges Format.")
|
||||
except Exception as e:
|
||||
flash(f"Die Datei konnte nicht gelesen werden. Exception: {e}")
|
||||
return redirect('/manage_admins')
|
||||
flash("Admin Benutzer aus Datei gelesen.")
|
||||
db.session.commit()
|
||||
else:
|
||||
flash("Ungültige Dateiendung")
|
||||
return redirect('/manage_admins')
|
||||
# main page
|
||||
@app.route('/')
|
||||
def door_lock():
|
||||
return render_template('index.html', door_state=door.state, encoder_position=door.encoder_position)
|
||||
|
||||
|
||||
# token overview
|
||||
@app.route('/tokens')
|
||||
@auth_required()
|
||||
def list_tokens():
|
||||
|
@ -240,7 +376,7 @@ def create_application(config):
|
|||
inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']}
|
||||
return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens)
|
||||
|
||||
|
||||
# routes for registering, editing and deleting tokens
|
||||
@app.route('/register-token', methods=['GET', 'POST'])
|
||||
@auth_required()
|
||||
def register():
|
||||
|
@ -269,11 +405,10 @@ def create_application(config):
|
|||
else:
|
||||
session['valid_thru'] = ''
|
||||
session['inactive'] = not form.active.data
|
||||
return redirect(f'/store-token')
|
||||
return redirect('/store-token')
|
||||
else:
|
||||
return render_template('register.html', token=door.get_most_recent_token(), form=form)
|
||||
|
||||
|
||||
@app.route('/edit-token/<token>', methods=['GET', 'POST'])
|
||||
@auth_required()
|
||||
def edit_token(token):
|
||||
|
@ -290,7 +425,7 @@ def create_application(config):
|
|||
"""
|
||||
form = TokenForm(request.form)
|
||||
form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed
|
||||
# to it before
|
||||
# to it before
|
||||
if request.method == 'GET':
|
||||
tokens = door.get_tokens()
|
||||
if token in tokens:
|
||||
|
@ -329,7 +464,6 @@ def create_application(config):
|
|||
else:
|
||||
return render_template('edit.html', token=token, form=form)
|
||||
|
||||
|
||||
@app.route('/store-token')
|
||||
@auth_required()
|
||||
def store_token():
|
||||
|
@ -352,7 +486,6 @@ def create_application(config):
|
|||
flash(f"Error during store_tokens. Exception: {e}")
|
||||
return redirect('/tokens')
|
||||
|
||||
|
||||
@app.route('/delete-token/<token>', methods=['GET', 'POST'])
|
||||
@auth_required()
|
||||
def delete_token(token):
|
||||
|
@ -369,7 +502,7 @@ def create_application(config):
|
|||
token_to_delete = tokens[token]
|
||||
|
||||
# set up form for confirming deletion
|
||||
form = TokenDeleteForm()
|
||||
form = ConfirmDeleteForm()
|
||||
form.name_confirm.data = token_to_delete['name']
|
||||
|
||||
if request.method == 'GET':
|
||||
|
@ -387,13 +520,13 @@ def create_application(config):
|
|||
return redirect('/tokens')
|
||||
else:
|
||||
# form validation failed -> return to token overview and flash message
|
||||
flash(f"Der eingegebene Name stimmt nicht überein. Der Token {token} von {token_to_delete['name']} wurde nicht gelöscht.")
|
||||
flash(
|
||||
f"Der eingegebene Name stimmt nicht überein. Der Token {token} von {token_to_delete['name']} wurde nicht gelöscht.")
|
||||
return redirect('/tokens')
|
||||
else:
|
||||
flash(f'Ungültiger Token {token} für Löschung.')
|
||||
return redirect('/tokens')
|
||||
|
||||
|
||||
@app.route('/deactivate-token/<token>')
|
||||
@auth_required()
|
||||
def deactivate_token(token):
|
||||
|
@ -414,6 +547,19 @@ def create_application(config):
|
|||
flash(f"Error during store_tokens. Exception: {e}")
|
||||
return redirect('/tokens')
|
||||
|
||||
@app.route('/backup_tokens')
|
||||
@auth_required()
|
||||
def backup_tokens():
|
||||
# get list of defined admin users for backup
|
||||
tokens = door.get_tokens()
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
file = Path(tmpdir, 'token_data.txt')
|
||||
file.write_text(json.dumps(tokens))
|
||||
return send_file(file, as_attachment=True, cache_timeout=-1)
|
||||
except Exception as e:
|
||||
return str(e)
|
||||
|
||||
@app.route('/open')
|
||||
@auth_required()
|
||||
def open_door():
|
||||
|
@ -424,6 +570,7 @@ def create_application(config):
|
|||
flash(f'Could not open door. Exception: {e}')
|
||||
return redirect('/')
|
||||
|
||||
# routes for opening and closing the door via the web interface
|
||||
@app.route('/close')
|
||||
@auth_required()
|
||||
def close_door():
|
||||
|
@ -434,4 +581,40 @@ def create_application(config):
|
|||
flash(f'Could not close door. Exception: {e}')
|
||||
return redirect('/')
|
||||
|
||||
# setup user database when starting the app
|
||||
with app.app_context():
|
||||
new_admin_data = []
|
||||
if config.admin_file is not None:
|
||||
if not Path(config.admin_file).exists():
|
||||
logger.warning(f"Admin user creation file not found at {config.admin_file}")
|
||||
else:
|
||||
# store data for new admins in memory s.t. the file can be deleted afterwards
|
||||
with open(config.admin_file) as f:
|
||||
for i, d in enumerate(f.readlines()):
|
||||
try:
|
||||
user, email, pw = d.split()
|
||||
validate_email(email)
|
||||
new_admin_data.append({'username': user, 'email': email, 'password': pw})
|
||||
except Exception as e:
|
||||
print(
|
||||
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
|
||||
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
|
||||
|
||||
# create admin users (only if they don't exists already)
|
||||
def create_super_admins(new_admin_data):
|
||||
db.create_all()
|
||||
super_admin_role = user_datastore.find_or_create_role('super_admin') # root admin = can create other admins
|
||||
admin_role = user_datastore.find_or_create_role('admin') # 'normal' admin
|
||||
|
||||
for d in new_admin_data:
|
||||
if user_datastore.find_user(email=d['email'], username=d['username']) is None:
|
||||
logger.info(f"New admin user created with username '{d['username']}' and email '{d['email']}'")
|
||||
# create new admin (only if admin does not already exist)
|
||||
new_admin = user_datastore.create_user(email=d['email'], username=d['username'],
|
||||
password=hash_password(d['password']),
|
||||
roles=[super_admin_role, admin_role])
|
||||
db.session.commit()
|
||||
|
||||
create_super_admins(new_admin_data)
|
||||
|
||||
return app
|
||||
|
|
Loading…
Reference in New Issue
Block a user