reformatting
This commit is contained in:
parent
2f40732d0b
commit
fa7c878cab
46
imaginaerraum_door_admin/forms.py
Normal file
46
imaginaerraum_door_admin/forms.py
Normal file
|
@ -0,0 +1,46 @@
|
|||
from flask import flash
|
||||
from flask_wtf import FlaskForm
|
||||
from wtforms.fields import DateField, EmailField
|
||||
from wtforms.fields import StringField, BooleanField
|
||||
from wtforms.validators import DataRequired, ValidationError, EqualTo
|
||||
from datetime import date
|
||||
|
||||
|
||||
def validate_valid_thru_date(form, field):
|
||||
if form.limit_validity.data:
|
||||
# only check date format if limited validity of token is set
|
||||
try:
|
||||
if not field.data >= date.today():
|
||||
raise ValueError
|
||||
except ValueError as e:
|
||||
flash("Ungültiges Datum")
|
||||
raise ValidationError
|
||||
return True
|
||||
|
||||
|
||||
class TokenForm(FlaskForm):
|
||||
name = StringField("Name", validators=[DataRequired()])
|
||||
email = EmailField("E-Mail", validators=[DataRequired()])
|
||||
organization = StringField("Organization", validators=[DataRequired()])
|
||||
limit_validity = BooleanField("Gültigkeit begrenzen?")
|
||||
valid_thru = DateField("Gültig bis", validators=[validate_valid_thru_date])
|
||||
active = BooleanField("Aktiv?")
|
||||
dsgvo = BooleanField(
|
||||
"Einwilligung Nutzungsbedingungen erfragt?", validators=[DataRequired()]
|
||||
)
|
||||
|
||||
|
||||
class ConfirmDeleteForm(FlaskForm):
|
||||
name = StringField(
|
||||
"Name",
|
||||
validators=[
|
||||
DataRequired(),
|
||||
EqualTo("name_confirm", "Name stimmt nicht überein"),
|
||||
],
|
||||
)
|
||||
name_confirm = StringField("Name confirm")
|
||||
|
||||
|
||||
class AdminCreationForm(FlaskForm):
|
||||
name = StringField("Name", validators=[DataRequired()])
|
||||
email = EmailField("E-Mail", validators=[DataRequired()])
|
|
@ -1,11 +1,19 @@
|
|||
from flask import render_template, request, flash, redirect, session, send_file, Blueprint, current_app
|
||||
from flask import (
|
||||
render_template,
|
||||
request,
|
||||
flash,
|
||||
redirect,
|
||||
send_file,
|
||||
Blueprint,
|
||||
current_app,
|
||||
)
|
||||
from werkzeug.utils import secure_filename
|
||||
from flask_wtf import FlaskForm
|
||||
from wtforms.fields import DateField, EmailField
|
||||
from wtforms.fields import StringField, BooleanField
|
||||
from wtforms.validators import DataRequired, ValidationError, EqualTo
|
||||
from flask_security import auth_required, hash_password, \
|
||||
current_user, roles_required
|
||||
from flask_security import (
|
||||
auth_required,
|
||||
hash_password,
|
||||
current_user,
|
||||
roles_required
|
||||
)
|
||||
from flask_security.views import change_password
|
||||
from email_validator import validate_email
|
||||
|
||||
|
@ -17,207 +25,231 @@ import tempfile
|
|||
from datetime import date, datetime, timedelta
|
||||
|
||||
from imaginaerraum_door_admin import db, security
|
||||
from imaginaerraum_door_admin.forms import (
|
||||
AdminCreationForm,
|
||||
ConfirmDeleteForm,
|
||||
TokenForm,
|
||||
)
|
||||
|
||||
|
||||
def validate_valid_thru_date(form, field):
|
||||
if form.limit_validity.data: # only check date format if limited validity of token is set
|
||||
try:
|
||||
if not field.data >= date.today():
|
||||
raise ValueError
|
||||
except ValueError as e:
|
||||
flash("Ungültiges Datum")
|
||||
raise ValidationError
|
||||
return True
|
||||
door_app = Blueprint("door_app", __name__, template_folder="templates")
|
||||
|
||||
|
||||
class TokenForm(FlaskForm):
|
||||
name = StringField('Name', validators=[DataRequired()])
|
||||
email = EmailField('E-Mail', validators=[DataRequired()])
|
||||
organization = StringField('Organization', validators=[DataRequired()])
|
||||
limit_validity = BooleanField('Gültigkeit begrenzen?')
|
||||
valid_thru = DateField('Gültig bis', validators=[validate_valid_thru_date])
|
||||
active = BooleanField('Aktiv?')
|
||||
dsgvo = BooleanField('Einwilligung Nutzungsbedingungen erfragt?', validators=[DataRequired()])
|
||||
|
||||
|
||||
class ConfirmDeleteForm(FlaskForm):
|
||||
name = StringField('Name', validators=[DataRequired(), EqualTo('name_confirm', 'Name stimmt nicht überein')])
|
||||
name_confirm = StringField('Name confirm')
|
||||
|
||||
|
||||
class AdminCreationForm(FlaskForm):
|
||||
name = StringField('Name', validators=[DataRequired()])
|
||||
email = EmailField('E-Mail', validators=[DataRequired()])
|
||||
|
||||
door_app = Blueprint('door_app', __name__,
|
||||
template_folder='templates')
|
||||
|
||||
|
||||
# we override the change_password view from flask security to only allow local users to change their passwords
|
||||
# we override the change_password view from flask security to only allow local
|
||||
# users to change their passwords
|
||||
# LDAP users should use the LDAP self service for changing passwords
|
||||
# this route needs to be defined before the Flask Security setup
|
||||
@door_app.route('/change', methods=['GET', 'POST'])
|
||||
@door_app.route("/change", methods=["GET", "POST"])
|
||||
@auth_required()
|
||||
def change_pw():
|
||||
if current_user.has_role('local'):
|
||||
if current_user.has_role("local"):
|
||||
# local users can change their password
|
||||
return change_password()
|
||||
else:
|
||||
# LDAP users get redirected to the LDAP self service
|
||||
return redirect('https://ldap.imaginaerraum.de/')
|
||||
return redirect("https://ldap.imaginaerraum.de/")
|
||||
|
||||
|
||||
# admin user management
|
||||
@door_app.route('/manage_admins', methods=['GET', 'POST'])
|
||||
@roles_required('super_admin')
|
||||
@door_app.route("/manage_admins", methods=["GET", "POST"])
|
||||
@roles_required("super_admin")
|
||||
def manage_admins():
|
||||
form = AdminCreationForm()
|
||||
if request.method == 'GET':
|
||||
if request.method == "GET":
|
||||
users = security.datastore.user_model.query.all()
|
||||
admin_data = [{'username': u.username, 'email': u.email, 'active': u.is_active,
|
||||
'admin': u.has_role('admin'), 'super_admin': u.has_role('super_admin'),
|
||||
} for u in users]
|
||||
return render_template('admins.html', admin_data=admin_data, form=form)
|
||||
admin_data = [
|
||||
{
|
||||
"username": u.username,
|
||||
"email": u.email,
|
||||
"active": u.is_active,
|
||||
"admin": u.has_role("admin"),
|
||||
"super_admin": u.has_role("super_admin"),
|
||||
}
|
||||
for u in users
|
||||
]
|
||||
return render_template("admins.html", admin_data=admin_data, form=form)
|
||||
elif form.validate():
|
||||
if security.datastore.find_user(username=form.name.data) is not None or \
|
||||
security.datastore.find_user(email=form.email.data) is not None:
|
||||
flash("Ein Benutzer mit diesem Nutzernamen oder dieser E-Mail-Adresse existiert bereits!")
|
||||
if (
|
||||
security.datastore.find_user(username=form.name.data) is not None
|
||||
or security.datastore.find_user(email=form.email.data) is not None
|
||||
):
|
||||
flash(
|
||||
"Ein Benutzer mit diesem Nutzernamen oder dieser E-Mail-Adresse"
|
||||
" existiert bereits!"
|
||||
)
|
||||
else:
|
||||
pw = secrets.token_urlsafe(16)
|
||||
new_user = security.datastore.create_user(username=form.name.data, email=form.email.data,
|
||||
password=hash_password(pw))
|
||||
security.datastore.add_role_to_user(new_user, 'local')
|
||||
new_user = security.datastore.create_user(
|
||||
username=form.name.data,
|
||||
email=form.email.data,
|
||||
password=hash_password(pw),
|
||||
)
|
||||
security.datastore.add_role_to_user(new_user, "local")
|
||||
current_app.logger.info(
|
||||
f"Super admin {current_user.username} created new user account for {new_user.username} <{new_user.email}>")
|
||||
flash(f"Ein Account für den Nutzer {new_user.username} wurde erstellt. Verwende das Passwort {pw} um den Nutzer einzuloggen.")
|
||||
f"Super admin {current_user.username} created new user account "
|
||||
f"for {new_user.username} <{new_user.email}>"
|
||||
)
|
||||
flash(
|
||||
f"Ein Account für den Nutzer {new_user.username} wurde "
|
||||
f"erstellt. Verwende das Passwort {pw} um den Nutzer "
|
||||
f"einzuloggen."
|
||||
)
|
||||
db.session.commit()
|
||||
else:
|
||||
flash(f"Ungültige Eingabe: {form.errors}")
|
||||
|
||||
return redirect('/manage_admins')
|
||||
return redirect("/manage_admins")
|
||||
|
||||
|
||||
@door_app.route('/delete_admins/<username>', methods=['GET', 'POST'])
|
||||
@roles_required('super_admin')
|
||||
@door_app.route("/delete_admins/<username>", methods=["GET", "POST"])
|
||||
@roles_required("super_admin")
|
||||
def delete_admins(username):
|
||||
user = security.datastore.find_user(username=username)
|
||||
if user is None:
|
||||
flash(f"Ungültiger Nutzer {username}")
|
||||
return redirect('/manage_admins')
|
||||
if user.has_role('super_admin'):
|
||||
flash('Super-Admins können nicht gelöscht werden!')
|
||||
return redirect('/manage_admins')
|
||||
return redirect("/manage_admins")
|
||||
if user.has_role("super_admin"):
|
||||
flash("Super-Admins können nicht gelöscht werden!")
|
||||
return redirect("/manage_admins")
|
||||
if user.is_active:
|
||||
flash('Aktive Nutzer können nicht gelöscht werden! Bitte den Benutzer zuerst deaktivieren.')
|
||||
return redirect('/manage_admins')
|
||||
flash(
|
||||
"Aktive Nutzer können nicht gelöscht werden! Bitte den Benutzer "
|
||||
"zuerst deaktivieren."
|
||||
)
|
||||
return redirect("/manage_admins")
|
||||
|
||||
# set up form for confirming deletion
|
||||
form = ConfirmDeleteForm()
|
||||
form.name_confirm.data = username
|
||||
|
||||
if request.method == 'GET':
|
||||
if request.method == "GET":
|
||||
# return page asking the user to confirm delete
|
||||
return render_template('delete_user.html', username=username, form=form)
|
||||
return render_template("delete_user.html", username=username, form=form)
|
||||
elif form.validate():
|
||||
security.datastore.delete_user(user)
|
||||
flash(f"Benutzer {username} wurde gelöscht.")
|
||||
current_app.logger.info(f"Super admin {current_user.username} deleted admin user {username}")
|
||||
current_app.logger.info(
|
||||
f"Super admin {current_user.username} deleted admin user {username}"
|
||||
)
|
||||
db.session.commit()
|
||||
return redirect('/manage_admins')
|
||||
return redirect("/manage_admins")
|
||||
else:
|
||||
flash("Der eingegebene Nutzername stimmt nicht überein. Der Benutzer wurde nicht gelöscht!")
|
||||
return redirect('/manage_admins')
|
||||
flash(
|
||||
"Der eingegebene Nutzername stimmt nicht überein. Der Benutzer "
|
||||
"wurde nicht gelöscht!"
|
||||
)
|
||||
return redirect("/manage_admins")
|
||||
|
||||
|
||||
@door_app.route('/admin_toggle_active/<username>')
|
||||
@roles_required('super_admin')
|
||||
@door_app.route("/admin_toggle_active/<username>")
|
||||
@roles_required("super_admin")
|
||||
def admin_toggle_active(username):
|
||||
user = security.datastore.find_user(username=username)
|
||||
if user is None:
|
||||
flash(f"Ungültiger Nutzer {username}")
|
||||
return redirect('/manage_admins')
|
||||
if user.has_role('super_admin'):
|
||||
flash('Super-Admins können nicht deaktiviert werden!')
|
||||
return redirect('/manage_admins')
|
||||
return redirect("/manage_admins")
|
||||
if user.has_role("super_admin"):
|
||||
flash("Super-Admins können nicht deaktiviert werden!")
|
||||
return redirect("/manage_admins")
|
||||
security.datastore.toggle_active(user)
|
||||
if user.is_active:
|
||||
current_app.logger.info(f"Super admin {current_user.username} activated access for admin user {username}")
|
||||
current_app.logger.info(
|
||||
f"Super admin {current_user.username} activated access for admin "
|
||||
f"user {username}"
|
||||
)
|
||||
else:
|
||||
current_app.logger.info(f"Super admin {current_user.username} deactivated access for admin user {username}")
|
||||
current_app.logger.info(
|
||||
f"Super admin {current_user.username} deactivated access for admin "
|
||||
f"user {username}"
|
||||
)
|
||||
db.session.commit()
|
||||
return redirect('/manage_admins')
|
||||
return redirect("/manage_admins")
|
||||
|
||||
|
||||
@door_app.route('/promote_admin/<username>')
|
||||
@roles_required('super_admin')
|
||||
@door_app.route("/promote_admin/<username>")
|
||||
@roles_required("super_admin")
|
||||
def promote_admin(username):
|
||||
user = security.datastore.find_user(username=username)
|
||||
if user is None:
|
||||
flash(f"Ungültiger Nutzer {username}")
|
||||
return redirect('/manage_admins')
|
||||
if user.has_role('admin'):
|
||||
flash(f'Benutzer {username} hat bereits Admin-Rechte!')
|
||||
return redirect('/manage_admins')
|
||||
security.datastore.add_role_to_user(user, 'admin')
|
||||
current_app.logger.info(f"Super admin {current_user.username} granted admin privileges to user {username}")
|
||||
return redirect("/manage_admins")
|
||||
if user.has_role("admin"):
|
||||
flash(f"Benutzer {username} hat bereits Admin-Rechte!")
|
||||
return redirect("/manage_admins")
|
||||
security.datastore.add_role_to_user(user, "admin")
|
||||
current_app.logger.info(
|
||||
f"Super admin {current_user.username} granted admin privileges to "
|
||||
f"user {username}"
|
||||
)
|
||||
db.session.commit()
|
||||
return redirect('/manage_admins')
|
||||
return redirect("/manage_admins")
|
||||
|
||||
|
||||
@door_app.route('/demote_admin/<username>')
|
||||
@roles_required('super_admin')
|
||||
@door_app.route("/demote_admin/<username>")
|
||||
@roles_required("super_admin")
|
||||
def demote_admin(username):
|
||||
user = security.datastore.find_user(username=username)
|
||||
if user is None:
|
||||
flash(f"Ungültiger Nutzer {username}")
|
||||
return redirect('/manage_admins')
|
||||
if user.has_role('super_admin'):
|
||||
flash(f'Benutzer {username} hat Super-Admin-Rechte und kann nicht '
|
||||
'verändert werden!')
|
||||
return redirect('/manage_admins')
|
||||
if user.has_role('admin'):
|
||||
security.datastore.remove_role_from_user(user, 'admin')
|
||||
current_app.logger.info(f"Super admin {current_user.username} revoked "
|
||||
f"admin privileges of user {username}")
|
||||
return redirect("/manage_admins")
|
||||
if user.has_role("super_admin"):
|
||||
flash(
|
||||
f"Benutzer {username} hat Super-Admin-Rechte und kann nicht "
|
||||
"verändert werden!"
|
||||
)
|
||||
return redirect("/manage_admins")
|
||||
if user.has_role("admin"):
|
||||
security.datastore.remove_role_from_user(user, "admin")
|
||||
current_app.logger.info(
|
||||
f"Super admin {current_user.username} revoked "
|
||||
f"admin privileges of user {username}"
|
||||
)
|
||||
db.session.commit()
|
||||
else:
|
||||
flash(f'Benutzer {username} ist bereits kein Admin!')
|
||||
return redirect('/manage_admins')
|
||||
flash(f"Benutzer {username} ist bereits kein Admin!")
|
||||
return redirect("/manage_admins")
|
||||
|
||||
|
||||
@door_app.route('/backup_user_datastore')
|
||||
@roles_required('super_admin')
|
||||
@door_app.route("/backup_user_datastore")
|
||||
@roles_required("super_admin")
|
||||
def backup_user_datastore():
|
||||
# get list of defined admin users for backup
|
||||
users = security.datastore.user_model.query.all()
|
||||
user_data = [{'username': u.username, 'email': u.email,
|
||||
'active': u.is_active, 'password_hash': u.password,
|
||||
'roles': [r.name for r in u.roles]}
|
||||
for u in users if not u.has_role('super_admin')]
|
||||
user_data = [
|
||||
{
|
||||
"username": u.username,
|
||||
"email": u.email,
|
||||
"active": u.is_active,
|
||||
"password_hash": u.password,
|
||||
"roles": [r.name for r in u.roles],
|
||||
}
|
||||
for u in users
|
||||
if not u.has_role("super_admin")
|
||||
]
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
file = Path(tmpdir, 'user_data.txt')
|
||||
file = Path(tmpdir, "user_data.txt")
|
||||
file.write_text(json.dumps(user_data))
|
||||
return send_file(file, as_attachment=True, cache_timeout=-1)
|
||||
except Exception as e:
|
||||
return str(e)
|
||||
|
||||
|
||||
@door_app.route('/restore_user_datastore', methods=['POST'])
|
||||
@roles_required('super_admin')
|
||||
@door_app.route("/restore_user_datastore", methods=["POST"])
|
||||
@roles_required("super_admin")
|
||||
def restore_user_datastore():
|
||||
# check if the post request has the file part
|
||||
if 'file' not in request.files:
|
||||
flash('Keine Datei ausgewählt!')
|
||||
if "file" not in request.files:
|
||||
flash("Keine Datei ausgewählt!")
|
||||
return redirect(request.url)
|
||||
file = request.files['file']
|
||||
file = request.files["file"]
|
||||
# if user does not select file, browser also
|
||||
# submit an empty part without filename
|
||||
if file.filename == '':
|
||||
flash('Keine Datei ausgewählt!')
|
||||
return redirect('/manage_admins')
|
||||
if file.filename == "":
|
||||
flash("Keine Datei ausgewählt!")
|
||||
return redirect("/manage_admins")
|
||||
filename = secure_filename(file.filename)
|
||||
if file and filename.endswith('.txt'):
|
||||
if file and filename.endswith(".txt"):
|
||||
data = file.stream.read()
|
||||
try:
|
||||
# check validity of user data
|
||||
|
@ -226,63 +258,97 @@ def restore_user_datastore():
|
|||
valid &= all(type(d) == dict for d in user_data)
|
||||
if valid:
|
||||
for d in user_data:
|
||||
entry_valid = set(d.keys()) == { 'active', 'email', 'password_hash', 'username', 'roles'}
|
||||
entry_valid &= all(len(d[key]) > 0 for key in ['email', 'password_hash', 'username'])
|
||||
entry_valid &= type(d['active']) == bool
|
||||
entry_valid &= type(d['roles']) == list
|
||||
validate_email(d['email'])
|
||||
entry_valid = set(d.keys()) == {
|
||||
"active",
|
||||
"email",
|
||||
"password_hash",
|
||||
"username",
|
||||
"roles",
|
||||
}
|
||||
entry_valid &= all(
|
||||
len(d[key]) > 0
|
||||
for key in ["email", "password_hash", "username"]
|
||||
)
|
||||
entry_valid &= type(d["active"]) == bool
|
||||
entry_valid &= type(d["roles"]) == list
|
||||
validate_email(d["email"])
|
||||
if entry_valid:
|
||||
existing_user = security.datastore.find_user(username=d['username'], email=d['email'])
|
||||
existing_user = security.datastore.find_user(
|
||||
username=d["username"], email=d["email"]
|
||||
)
|
||||
if existing_user is None:
|
||||
security.datastore.create_user(username=d['username'], email=d['email'],
|
||||
password=d['password_hash'], active=d['active'],
|
||||
roles=d['roles'])
|
||||
flash(f"Account für Benutzer '{d['username']} wurde wiederhergestellt.")
|
||||
security.datastore.create_user(
|
||||
username=d["username"],
|
||||
email=d["email"],
|
||||
password=d["password_hash"],
|
||||
active=d["active"],
|
||||
roles=d["roles"],
|
||||
)
|
||||
flash(
|
||||
f"Account für Benutzer '{d['username']} wurde "
|
||||
f"wiederhergestellt."
|
||||
)
|
||||
else:
|
||||
flash(f"Benutzer '{d['username']} existiert bereits. Eintrag wird übersprungen.")
|
||||
flash(
|
||||
f"Benutzer '{d['username']} existiert bereits."
|
||||
f" Eintrag wird übersprungen."
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Ungültige Daten für User Entry {d}")
|
||||
else:
|
||||
raise ValueError("Admin User Datei hat ungültiges Format.")
|
||||
except Exception as e:
|
||||
flash(f"Die Datei konnte nicht gelesen werden. Exception: {e}")
|
||||
return redirect('/manage_admins')
|
||||
return redirect("/manage_admins")
|
||||
flash("Benutzer aus Datei gelesen.")
|
||||
db.session.commit()
|
||||
else:
|
||||
flash("Ungültige Dateiendung")
|
||||
return redirect('/manage_admins')
|
||||
return redirect("/manage_admins")
|
||||
|
||||
|
||||
# main page
|
||||
@door_app.route('/')
|
||||
@door_app.route("/")
|
||||
def door_lock():
|
||||
return render_template('index.html', door_state=current_app.door.state, encoder_position=current_app.door.encoder_position)
|
||||
return render_template(
|
||||
"index.html",
|
||||
door_state=current_app.door.state,
|
||||
encoder_position=current_app.door.encoder_position,
|
||||
)
|
||||
|
||||
|
||||
# token overview
|
||||
@door_app.route('/tokens')
|
||||
@roles_required('admin')
|
||||
@door_app.route("/tokens")
|
||||
@roles_required("admin")
|
||||
def list_tokens():
|
||||
tokens = current_app.door.get_tokens()
|
||||
assigned_tokens = {t: data for t, data in tokens.items() if not data['inactive']}
|
||||
inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']}
|
||||
return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens)
|
||||
assigned_tokens = {t: data for t, data in tokens.items()
|
||||
if not data["inactive"]}
|
||||
inactive_tokens = {t: data for t, data in tokens.items()
|
||||
if data["inactive"]}
|
||||
return render_template(
|
||||
"tokens.html",
|
||||
assigned_tokens=assigned_tokens,
|
||||
inactive_tokens=inactive_tokens
|
||||
)
|
||||
|
||||
|
||||
@door_app.route('/token-log')
|
||||
@roles_required('super_admin')
|
||||
@door_app.route("/token-log")
|
||||
@roles_required("super_admin")
|
||||
def token_log():
|
||||
log = []
|
||||
try:
|
||||
with open(current_app.config['NFC_LOG']) as f:
|
||||
with open(current_app.config["NFC_LOG"]) as f:
|
||||
log += f.readlines()
|
||||
log.reverse()
|
||||
log = [l.split(' - ') for l in log]
|
||||
return render_template('token_log.html', log=log)
|
||||
log = [l.split(" - ") for l in log]
|
||||
return render_template("token_log.html", log=log)
|
||||
except Exception as e:
|
||||
flash(f"NFC logfile {Path(current_app.config['NFC_LOG']).absolute()} konnte nicht gelesen werden. Exception: {e}")
|
||||
return redirect('/')
|
||||
flash(
|
||||
f"NFC logfile {Path(current_app.config['NFC_LOG']).absolute()} "
|
||||
f"konnte nicht gelesen werden. Exception: {e}"
|
||||
)
|
||||
return redirect("/")
|
||||
|
||||
|
||||
def store_token(token_data):
|
||||
|
@ -291,69 +357,82 @@ def store_token(token_data):
|
|||
This will use the token id and the associated data and create/modify a
|
||||
token and store the new token file to disk.
|
||||
"""
|
||||
token = token_data['token']
|
||||
token = token_data["token"]
|
||||
tokens = current_app.door.get_tokens()
|
||||
tokens[token] = {'name': token_data['name'],
|
||||
'email': token_data['email'],
|
||||
'valid_thru': token_data['valid_thru'],
|
||||
'inactive': token_data['inactive'],
|
||||
'organization': token_data['organization']}
|
||||
tokens[token] = {
|
||||
"name": token_data["name"],
|
||||
"email": token_data["email"],
|
||||
"valid_thru": token_data["valid_thru"],
|
||||
"inactive": token_data["inactive"],
|
||||
"organization": token_data["organization"],
|
||||
}
|
||||
try:
|
||||
current_app.door.store_tokens(tokens)
|
||||
current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}")
|
||||
current_app.logger.info(
|
||||
f"Token {token} stored in database by admin user "
|
||||
f"{current_user.username}"
|
||||
)
|
||||
except Exception as e:
|
||||
flash(f"Error during store_tokens. Exception: {e}")
|
||||
|
||||
|
||||
# routes for registering, editing and deleting tokens
|
||||
@door_app.route('/register-token', methods=['GET', 'POST'])
|
||||
@roles_required('admin')
|
||||
@door_app.route("/register-token", methods=["GET", "POST"])
|
||||
@roles_required("admin")
|
||||
def register():
|
||||
"""Register new token for locking and unlocking the door.
|
||||
|
||||
This route displays the most recently scanned invalid token as reported in the logfile and provides a form for
|
||||
entering user info (name, email, valid thru date (optional)) for the new token.
|
||||
This route displays the most recently scanned invalid token as reported in
|
||||
the logfile and provides a form for entering user info (name, email, valid
|
||||
thru date (optional)) for the new token.
|
||||
|
||||
If the route is called via POST the provided form data is checked and if the check succeeds the /store-token route
|
||||
will be called which adds the new token to the database.
|
||||
If the route is called via POST the provided form data is checked and if
|
||||
the check succeeds the /store-token route will be called which adds the new
|
||||
token to the database.
|
||||
"""
|
||||
token = current_app.door.get_most_recent_token()
|
||||
|
||||
recent_token = {}
|
||||
if {'token', 'timestamp'}.issubset(set(token.keys())):
|
||||
dt = datetime.utcnow() - token['timestamp']
|
||||
if {"token", "timestamp"}.issubset(set(token.keys())):
|
||||
dt = datetime.utcnow() - token["timestamp"]
|
||||
if dt < timedelta(minutes=10):
|
||||
recent_token = token
|
||||
recent_token['timedelta_minutes'] = int(dt.total_seconds() / 60.0)
|
||||
recent_token["timedelta_minutes"] = int(dt.total_seconds() / 60.0)
|
||||
|
||||
form = TokenForm()
|
||||
if request.method == 'GET':
|
||||
# set default valid thru date to today to make sure form validity check passes
|
||||
if request.method == "GET":
|
||||
# set default valid thru date to today to make sure form validity check
|
||||
# passes
|
||||
# (will not be used if limited validity is disabled)
|
||||
form.valid_thru.data = date.today()
|
||||
elif request.method == 'POST' and form.validate():
|
||||
elif request.method == "POST" and form.validate():
|
||||
token_data = {
|
||||
'token': current_app.door.get_most_recent_token()['token'],
|
||||
'name': form.name.data, 'email': form.email.data,
|
||||
'organization': form.organization.data,
|
||||
'inactive': not form.active.data,
|
||||
'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else ''
|
||||
"token": current_app.door.get_most_recent_token()["token"],
|
||||
"name": form.name.data,
|
||||
"email": form.email.data,
|
||||
"organization": form.organization.data,
|
||||
"inactive": not form.active.data,
|
||||
"valid_thru": form.valid_thru.data.isoformat()
|
||||
if form.limit_validity.data
|
||||
else "",
|
||||
}
|
||||
store_token(token_data)
|
||||
return redirect('/tokens')
|
||||
return redirect("/tokens")
|
||||
else:
|
||||
flash(f'Token konnte nicht registiert werden. Fehler: {form.errors}')
|
||||
return render_template('register.html', token=recent_token, form=form)
|
||||
flash(f"Token konnte nicht registiert werden. Fehler: {form.errors}")
|
||||
return render_template("register.html", token=recent_token, form=form)
|
||||
|
||||
|
||||
@door_app.route('/edit-token/<token>', methods=['GET', 'POST'])
|
||||
@roles_required('admin')
|
||||
@door_app.route("/edit-token/<token>", methods=["GET", "POST"])
|
||||
@roles_required("admin")
|
||||
def edit_token(token):
|
||||
"""Edit data in the token file (name, email, valid_thru date, active/inactive).
|
||||
"""Edit data in the token file (name, email, valid_thru date, active).
|
||||
|
||||
If the route is accessed via GET it will provide a form for editing the currently stored data for the user.
|
||||
If the route is accessed via POST it will check if the form data is good and then store the modified user data in
|
||||
the database (by redirecting to the /store-token route)
|
||||
If the route is accessed via GET it will provide a form for editing the
|
||||
currently stored data for the user.
|
||||
If the route is accessed via POST it will check if the form data is good
|
||||
and then store the modified user data in the database (by redirecting to
|
||||
the /store-token route)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
@ -361,51 +440,60 @@ def edit_token(token):
|
|||
The token for which data should be edited.
|
||||
"""
|
||||
form = TokenForm(request.form)
|
||||
form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed
|
||||
# to it before
|
||||
if request.method == 'GET':
|
||||
form.dsgvo.validators = (
|
||||
[]
|
||||
)
|
||||
# we skip the validation of the DSGVO checkbox here because we assume
|
||||
# the user agreed to it before
|
||||
if request.method == "GET":
|
||||
tokens = current_app.door.get_tokens()
|
||||
if token in tokens:
|
||||
# set default for form according to values from the token file
|
||||
et = tokens[token]
|
||||
form.active.data = not et['inactive']
|
||||
form.name.data = et['name'] if et['name'] else ''
|
||||
form.email.data = et['email'] if et['email'] else ''
|
||||
form.organization.data = et['organization'] if et['organization'] else ''
|
||||
form.active.data = not et["inactive"]
|
||||
form.name.data = et["name"] if et["name"] else ""
|
||||
form.email.data = et["email"] if et["email"] else ""
|
||||
form.organization.data = et["organization"] \
|
||||
if et["organization"] else ""
|
||||
|
||||
# for the valid thru date we use today's date in case there is not valid date in the database
|
||||
# for the valid thru date we use today's date in case there is not
|
||||
# valid date in the database
|
||||
try:
|
||||
form.valid_thru.data = date.fromisoformat(et['valid_thru'])
|
||||
form.valid_thru.data = date.fromisoformat(et["valid_thru"])
|
||||
form.limit_validity.data = True
|
||||
except Exception:
|
||||
form.valid_thru.data = date.today()
|
||||
|
||||
return render_template('edit.html', token=token, form=form)
|
||||
return render_template("edit.html", token=token, form=form)
|
||||
else:
|
||||
# flash an error message if the route is accessed with an invalid token
|
||||
flash(f'Ungültiger Token {token}!')
|
||||
return redirect('/tokens')
|
||||
elif request.method == 'POST':
|
||||
# flash an error message if the route is accessed with an invalid
|
||||
# token
|
||||
flash(f"Ungültiger Token {token}!")
|
||||
return redirect("/tokens")
|
||||
elif request.method == "POST":
|
||||
if form.validate():
|
||||
# store data in token_data cookie
|
||||
token_data = {'token': token,
|
||||
'name': form.name.data,
|
||||
'organization': form.organization.data,
|
||||
'email': form.email.data,
|
||||
'inactive': not form.active.data,
|
||||
'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else ''
|
||||
token_data = {
|
||||
"token": token,
|
||||
"name": form.name.data,
|
||||
"organization": form.organization.data,
|
||||
"email": form.email.data,
|
||||
"inactive": not form.active.data,
|
||||
"valid_thru": form.valid_thru.data.isoformat()
|
||||
if form.limit_validity.data
|
||||
else "",
|
||||
}
|
||||
store_token(token_data)
|
||||
return redirect('/tokens')
|
||||
return redirect("/tokens")
|
||||
else:
|
||||
flash(f'Token konnte nicht editiert werden. Fehler: {form.errors}')
|
||||
return render_template('edit.html', token=token, form=form)
|
||||
flash(f"Token konnte nicht editiert werden. Fehler: {form.errors}")
|
||||
return render_template("edit.html", token=token, form=form)
|
||||
|
||||
|
||||
@door_app.route('/delete-token/<token>', methods=['GET', 'POST'])
|
||||
@roles_required('admin')
|
||||
@door_app.route("/delete-token/<token>", methods=["GET", "POST"])
|
||||
@roles_required("admin")
|
||||
def delete_token(token):
|
||||
"""Delete the given token from the token file and store the new token file to disk
|
||||
"""Delete the given token from the token file and store the new token file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
@ -415,29 +503,31 @@ def delete_token(token):
|
|||
tokens = current_app.door.get_tokens()
|
||||
|
||||
if token not in tokens:
|
||||
flash(f'Ungültiger Token {token} für Löschung.')
|
||||
return redirect('/tokens')
|
||||
flash(f"Ungültiger Token {token} für Löschung.")
|
||||
return redirect("/tokens")
|
||||
|
||||
token_to_delete = tokens[token]
|
||||
|
||||
# set up form for confirming deletion
|
||||
form = ConfirmDeleteForm()
|
||||
form.name_confirm.data = token_to_delete['name']
|
||||
form.name_confirm.data = token_to_delete["name"]
|
||||
|
||||
if request.method == 'GET':
|
||||
if request.method == "GET":
|
||||
# return page asking the user to confirm delete
|
||||
return render_template('delete.html', token=token_to_delete, form=form)
|
||||
return render_template("delete.html", token=token_to_delete, form=form)
|
||||
elif form.validate():
|
||||
# form validation successful -> can delete the token
|
||||
tokens.pop(token)
|
||||
try:
|
||||
current_app.door.store_tokens(tokens)
|
||||
current_app.logger.info(f"Token {token} was deleted from database "
|
||||
f"by admin user {current_user.username}")
|
||||
current_app.logger.info(
|
||||
f"Token {token} was deleted from database "
|
||||
f"by admin user {current_user.username}"
|
||||
)
|
||||
except Exception as e:
|
||||
flash(f"Error during store_tokens. Exception: {e}")
|
||||
flash(f"Token {token} wurde gelöscht!")
|
||||
return redirect('/tokens')
|
||||
return redirect("/tokens")
|
||||
else:
|
||||
# form validation failed -> return to token overview and flash message
|
||||
flash(
|
||||
|
@ -445,13 +535,15 @@ def delete_token(token):
|
|||
f"Der Token {token} von {token_to_delete['name']} wurde nicht "
|
||||
"gelöscht."
|
||||
)
|
||||
return redirect('/tokens')
|
||||
return redirect("/tokens")
|
||||
|
||||
|
||||
@door_app.route('/deactivate-token/<token>')
|
||||
@roles_required('admin')
|
||||
@door_app.route("/deactivate-token/<token>")
|
||||
@roles_required("admin")
|
||||
def deactivate_token(token):
|
||||
"""Deactivate access for the given token. This updates the token file on disk.
|
||||
"""Deactivate access for the given token.
|
||||
|
||||
This updates the token file on disk.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
@ -461,50 +553,54 @@ def deactivate_token(token):
|
|||
tokens = current_app.door.get_tokens()
|
||||
|
||||
if token not in tokens:
|
||||
flash(f'Ungültiger Token {token} für Deaktivierung.')
|
||||
return redirect('/tokens')
|
||||
flash(f"Ungültiger Token {token} für Deaktivierung.")
|
||||
return redirect("/tokens")
|
||||
|
||||
tokens[token]['inactive'] = True
|
||||
tokens[token]["inactive"] = True
|
||||
try:
|
||||
current_app.door.store_tokens(tokens)
|
||||
current_app.logger.info(f"Token {token} deactivated by admin user {current_user.username}")
|
||||
current_app.logger.info(
|
||||
f"Token {token} deactivated by admin user {current_user.username}"
|
||||
)
|
||||
except Exception as e:
|
||||
flash(f"Error during store_tokens. Exception: {e}")
|
||||
return redirect('/tokens')
|
||||
return redirect("/tokens")
|
||||
|
||||
|
||||
@door_app.route('/backup_tokens')
|
||||
@roles_required('admin')
|
||||
@door_app.route("/backup_tokens")
|
||||
@roles_required("admin")
|
||||
def backup_tokens():
|
||||
# get list of defined admin users for backup
|
||||
tokens = current_app.door.get_tokens()
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
file = Path(tmpdir, 'token_data.txt')
|
||||
file = Path(tmpdir, "token_data.txt")
|
||||
file.write_text(json.dumps(tokens))
|
||||
return send_file(file, as_attachment=True, cache_timeout=-1)
|
||||
except Exception as e:
|
||||
return str(e)
|
||||
|
||||
|
||||
@door_app.route('/open')
|
||||
@door_app.route("/open")
|
||||
@auth_required()
|
||||
def open_door():
|
||||
try:
|
||||
current_app.door.open_door(user=current_user.username)
|
||||
current_app.logger.info(f"Door opened by admin user {current_user.username}")
|
||||
current_app.logger.info(f"Door opened by admin user "
|
||||
f"{current_user.username}")
|
||||
except Exception as e:
|
||||
flash(f'Could not open door. Exception: {e}')
|
||||
return redirect('/')
|
||||
flash(f"Could not open door. Exception: {e}")
|
||||
return redirect("/")
|
||||
|
||||
|
||||
# routes for opening and closing the door via the web interface
|
||||
@door_app.route('/close')
|
||||
@door_app.route("/close")
|
||||
@auth_required()
|
||||
def close_door():
|
||||
try:
|
||||
current_app.door.close_door(user=current_user.username)
|
||||
current_app.logger.info(f"Door closed by admin user {current_user.username}")
|
||||
current_app.logger.info(f"Door closed by admin user "
|
||||
f"{current_user.username}")
|
||||
except Exception as e:
|
||||
flash(f'Could not close door. Exception: {e}')
|
||||
return redirect('/')
|
||||
flash(f"Could not close door. Exception: {e}")
|
||||
return redirect("/")
|
||||
|
|
Loading…
Reference in New Issue
Block a user