make database and security objects global so we can access them in routes

This commit is contained in:
Simon Pirkelmann 2022-01-29 23:48:58 +01:00
parent 97957e389c
commit 2879a69445
2 changed files with 46 additions and 26 deletions

View File

@ -9,11 +9,12 @@ import ldap3
from pathlib import Path
from .webapp import door_app
#from .webapp import door_app
from .door_handle import DoorHandle
from .auth import ExtendedLoginForm
security = Security()
db = SQLAlchemy()
# create admin users (only if they don't exists already)
@ -143,11 +144,8 @@ def create_app():
# Mail Config
#mail = Mail(app)
from . import webapp
#app.register_blueprint
# Create database connection object
db = SQLAlchemy(app)
db.init_app(app)
# Define models
fsqla.FsModels.set_db_info(db)
@ -158,6 +156,7 @@ def create_app():
class User(db.Model, fsqla.FsUserMixin):
pass
from . webapp import door_app
app.register_blueprint(door_app)
ldap_server = ldap3.Server(app.config['LDAP_URL'])

View File

@ -16,6 +16,9 @@ import tempfile
from datetime import date, datetime, timedelta
from imaginaerraum_door_admin import db, security
def validate_valid_thru_date(form, field):
if form.limit_validity.data: # only check date format if limited validity of token is set
try:
@ -50,7 +53,6 @@ door_app = Blueprint('door_app', __name__,
template_folder='templates')
# we override the change_password view from flask security to only allow local users to change their passwords
# LDAP users should use the LDAP self service for changing passwords
# this route needs to be defined before the Flask Security setup
@ -64,37 +66,41 @@ def change_pw():
# LDAP users get redirected to the LDAP self service
return redirect('https://ldap.imaginaerraum.de/')
# admin user management
@door_app.route('/manage_admins', methods=['GET', 'POST'])
@roles_required('super_admin')
def manage_admins():
form = AdminCreationForm()
if request.method == 'GET':
users = user_datastore.user_model.query.all()
users = security.datastore.user_model.query.all()
admin_data = [{'username': u.username, 'email': u.email, 'active': u.is_active,
'admin': u.has_role('admin'), 'super_admin': u.has_role('super_admin'),
} for u in users]
return render_template('admins.html', admin_data=admin_data, form=form)
elif form.validate():
if user_datastore.find_user(username=form.name.data) is not None or \
user_datastore.find_user(email=form.email.data) is not None:
if security.datastore.find_user(username=form.name.data) is not None or \
security.datastore.find_user(email=form.email.data) is not None:
flash("Ein Benutzer mit diesem Nutzernamen oder dieser E-Mail-Adresse existiert bereits!")
return redirect('/manage_admins')
else:
pw = secrets.token_urlsafe(16)
new_user = user_datastore.create_user(username=form.name.data, email=form.email.data,
new_user = security.datastore.create_user(username=form.name.data, email=form.email.data,
password=hash_password(pw))
user_datastore.add_role_to_user(new_user, 'local')
security.datastore.add_role_to_user(new_user, 'local')
current_app.logger.info(
f"Super admin {current_user.username} created new user account for {new_user.username} <{new_user.email}>")
flash(f"Ein Account für den Nutzer {new_user.username} wurde erstellt. Verwende das Passwort {pw} um den Nutzer einzuloggen.")
db.session.commit()
else:
flash(f"Ungültige Eingabe: {form.errors}")
return redirect('/manage_admins')
@door_app.route('/delete_admins/<username>', methods=['GET', 'POST'])
@roles_required('super_admin')
def delete_admins(username):
user = user_datastore.find_user(username=username)
user = security.datastore.find_user(username=username)
if user is None:
flash(f"Ungültiger Nutzer {username}")
return redirect('/manage_admins')
@ -113,7 +119,7 @@ def delete_admins(username):
# return page asking the user to confirm delete
return render_template('delete_user.html', username=username, form=form)
elif form.validate():
user_datastore.delete_user(user)
security.datastore.delete_user(user)
flash(f"Benutzer {username} wurde gelöscht.")
current_app.logger.info(f"Super admin {current_user.username} deleted admin user {username}")
db.session.commit()
@ -122,17 +128,18 @@ def delete_admins(username):
flash("Der eingegebene Nutzername stimmt nicht überein. Der Benutzer wurde nicht gelöscht!")
return redirect('/manage_admins')
@door_app.route('/admin_toggle_active/<username>')
@roles_required('super_admin')
def admin_toggle_active(username):
user = user_datastore.find_user(username=username)
user = security.datastore.find_user(username=username)
if user is None:
flash(f"Ungültiger Nutzer {username}")
return redirect('/manage_admins')
if user.has_role('super_admin'):
flash('Super-Admins können nicht deaktiviert werden!')
return redirect('/manage_admins')
user_datastore.toggle_active(user)
security.datastore.toggle_active(user)
if user.is_active:
current_app.logger.info(f"Super admin {current_user.username} activated access for admin user {username}")
else:
@ -140,17 +147,18 @@ def admin_toggle_active(username):
db.session.commit()
return redirect('/manage_admins')
@door_app.route('/promote_admin/<username>')
@roles_required('super_admin')
def promote_admin(username):
user = user_datastore.find_user(username=username)
user = security.datastore.find_user(username=username)
if user is None:
flash(f"Ungültiger Nutzer {username}")
return redirect('/manage_admins')
if user.has_role('admin'):
flash(f'Benutzer {username} hat bereits Admin-Rechte!')
return redirect('/manage_admins')
user_datastore.add_role_to_user(user, 'admin')
security.datastore.add_role_to_user(user, 'admin')
current_app.logger.info(f"Super admin {current_user.username} granted admin privileges to user {username}")
db.session.commit()
return redirect('/manage_admins')
@ -158,7 +166,7 @@ def promote_admin(username):
@door_app.route('/demote_admin/<username>')
@roles_required('super_admin')
def demote_admin(username):
user = user_datastore.find_user(username=username)
user = security.datastore.find_user(username=username)
if user is None:
flash(f"Ungültiger Nutzer {username}")
return redirect('/manage_admins')
@ -166,18 +174,19 @@ def demote_admin(username):
flash(f'Benutzer {username} hat Super-Admin-Rechte und kann nicht verändert werden!')
return redirect('/manage_admins')
if user.has_role('admin'):
user_datastore.remove_role_from_user(user, 'admin')
security.datastore.remove_role_from_user(user, 'admin')
current_app.logger.info(f"Super admin {current_user.username} revoked admin privileges of user {username}")
db.session.commit()
else:
flash(f'Benutzer {username} ist bereits kein Admin!')
return redirect('/manage_admins')
@door_app.route('/backup_user_datastore')
@roles_required('super_admin')
def backup_user_datastore():
# get list of defined admin users for backup
users = user_datastore.user_model.query.all()
users = security.datastore.user_model.query.all()
user_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, 'password_hash': u.password,
'roles': [r.name for r in u.roles]}
for u in users if not u.has_role('super_admin')]
@ -189,6 +198,7 @@ def backup_user_datastore():
except Exception as e:
return str(e)
@door_app.route('/restore_user_datastore', methods=['POST'])
@roles_required('super_admin')
def restore_user_datastore():
@ -218,9 +228,9 @@ def restore_user_datastore():
entry_valid &= type(d['roles']) == list
validate_email(d['email'])
if entry_valid:
existing_user = user_datastore.find_user(username=d['username'], email=d['email'])
existing_user = security.datastore.find_user(username=d['username'], email=d['email'])
if existing_user is None:
user_datastore.create_user(username=d['username'], email=d['email'],
security.datastore.create_user(username=d['username'], email=d['email'],
password=d['password_hash'], active=d['active'],
roles=d['roles'])
flash(f"Account für Benutzer '{d['username']} wurde wiederhergestellt.")
@ -238,12 +248,14 @@ def restore_user_datastore():
else:
flash("Ungültige Dateiendung")
return redirect('/manage_admins')
# main page
@door_app.route('/')
def door_lock():
with current_app.app_context():
return render_template('index.html', door_state=current_app.door.state, encoder_position=current_app.door.encoder_position)
# token overview
@door_app.route('/tokens')
@roles_required('admin')
@ -253,6 +265,7 @@ def list_tokens():
inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']}
return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens)
@door_app.route('/token-log')
@roles_required('super_admin')
def token_log():
@ -267,6 +280,7 @@ def token_log():
flash(f"NFC logfile {Path(current_app.config['NFC_LOG']).absolute()} konnte nicht gelesen werden. Exception: {e}")
return redirect('/')
# routes for registering, editing and deleting tokens
@door_app.route('/register-token', methods=['GET', 'POST'])
@roles_required('admin')
@ -310,6 +324,7 @@ def register():
else:
return render_template('register.html', token=recent_token, form=form)
@door_app.route('/edit-token/<token>', methods=['GET', 'POST'])
@roles_required('admin')
def edit_token(token):
@ -365,6 +380,7 @@ def edit_token(token):
else:
return render_template('edit.html', token=token, form=form)
@door_app.route('/store-token')
@roles_required('admin')
def store_token():
@ -387,6 +403,7 @@ def store_token():
flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens')
@door_app.route('/delete-token/<token>', methods=['GET', 'POST'])
@roles_required('admin')
def delete_token(token):
@ -428,6 +445,7 @@ def delete_token(token):
flash(f'Ungültiger Token {token} für Löschung.')
return redirect('/tokens')
@door_app.route('/deactivate-token/<token>')
@roles_required('admin')
def deactivate_token(token):
@ -448,6 +466,7 @@ def deactivate_token(token):
flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens')
@door_app.route('/backup_tokens')
@roles_required('admin')
def backup_tokens():
@ -461,6 +480,7 @@ def backup_tokens():
except Exception as e:
return str(e)
@door_app.route('/open')
@auth_required()
def open_door():
@ -471,6 +491,7 @@ def open_door():
flash(f'Could not open door. Exception: {e}')
return redirect('/')
# routes for opening and closing the door via the web interface
@door_app.route('/close')
@auth_required()