From a104a3d00fa25c2041b2c73205c1acd717cf0c93 Mon Sep 17 00:00:00 2001 From: Simon Pirkelmann Date: Thu, 27 Jan 2022 23:46:45 +0100 Subject: [PATCH] attach door object to flask application and use application's logger --- imaginaerraum_door_admin/auth.py | 7 +++-- imaginaerraum_door_admin/webapp.py | 49 +++++++++++++++--------------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/imaginaerraum_door_admin/auth.py b/imaginaerraum_door_admin/auth.py index aa75bfd..e4614cb 100644 --- a/imaginaerraum_door_admin/auth.py +++ b/imaginaerraum_door_admin/auth.py @@ -1,4 +1,5 @@ from wtforms.fields import StringField, BooleanField +from flask import current_app from flask_security import hash_password from flask_security.forms import LoginForm, Required, PasswordField from flask_security.utils import find_user @@ -21,7 +22,7 @@ class ExtendedLoginForm(LoginForm): authorized = super(ExtendedLoginForm, self).validate() if authorized: - logger.info(f"User with credentials '{self.email.data}' authorized through local database") + current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database") else: # run LDAP authorization # if the authorization succeeds we also get the new_user_data dict which contains information about @@ -29,7 +30,7 @@ class ExtendedLoginForm(LoginForm): authorized, new_user_data = validate_ldap(user.username, self.password.data) if authorized: - logger.info(f"User with credentials '{self.email.data}' authorized through LDAP") + current_app.logger.info(f"User with credentials '{self.email.data}' authorized through LDAP") # update permissions and password/email to stay up to date for login with no network connection user.email = new_user_data['email'] user.password = new_user_data['password'] @@ -51,7 +52,7 @@ class ExtendedLoginForm(LoginForm): self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'], password=new_user_data['password'], roles=new_user_data['roles']) user_datastore.commit() - logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after" + current_app.logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after" " successful LDAP authorization") # if any of the authorization methods is successful we authorize the user diff --git a/imaginaerraum_door_admin/webapp.py b/imaginaerraum_door_admin/webapp.py index 96df82d..de610e6 100644 --- a/imaginaerraum_door_admin/webapp.py +++ b/imaginaerraum_door_admin/webapp.py @@ -85,7 +85,7 @@ def manage_admins(): new_user = user_datastore.create_user(username=form.name.data, email=form.email.data, password=hash_password(pw)) user_datastore.add_role_to_user(new_user, 'local') - logger.info( + current_app.logger.info( f"Super admin {current_user.username} created new user account for {new_user.username} <{new_user.email}>") flash(f"Ein Account für den Nutzer {new_user.username} wurde erstellt. Verwende das Passwort {pw} um den Nutzer einzuloggen.") db.session.commit() @@ -115,7 +115,7 @@ def delete_admins(username): elif form.validate(): user_datastore.delete_user(user) flash(f"Benutzer {username} wurde gelöscht.") - logger.info(f"Super admin {current_user.username} deleted admin user {username}") + current_app.logger.info(f"Super admin {current_user.username} deleted admin user {username}") db.session.commit() return redirect('/manage_admins') else: @@ -134,9 +134,9 @@ def admin_toggle_active(username): return redirect('/manage_admins') user_datastore.toggle_active(user) if user.is_active: - logger.info(f"Super admin {current_user.username} activated access for admin user {username}") + current_app.logger.info(f"Super admin {current_user.username} activated access for admin user {username}") else: - logger.info(f"Super admin {current_user.username} deactivated access for admin user {username}") + current_app.logger.info(f"Super admin {current_user.username} deactivated access for admin user {username}") db.session.commit() return redirect('/manage_admins') @@ -151,7 +151,7 @@ def promote_admin(username): flash(f'Benutzer {username} hat bereits Admin-Rechte!') return redirect('/manage_admins') user_datastore.add_role_to_user(user, 'admin') - logger.info(f"Super admin {current_user.username} granted admin privileges to user {username}") + current_app.logger.info(f"Super admin {current_user.username} granted admin privileges to user {username}") db.session.commit() return redirect('/manage_admins') @@ -167,7 +167,7 @@ def demote_admin(username): return redirect('/manage_admins') if user.has_role('admin'): user_datastore.remove_role_from_user(user, 'admin') - logger.info(f"Super admin {current_user.username} revoked admin privileges of user {username}") + current_app.logger.info(f"Super admin {current_user.username} revoked admin privileges of user {username}") db.session.commit() else: flash(f'Benutzer {username} ist bereits kein Admin!') @@ -248,7 +248,7 @@ def door_lock(): @door_app.route('/tokens') @roles_required('admin') def list_tokens(): - tokens = door.get_tokens() + tokens = current_app.door.get_tokens() assigned_tokens = {t: data for t, data in tokens.items() if not data['inactive']} inactive_tokens = {t: data for t, data in tokens.items() if data['inactive']} return render_template('tokens.html', assigned_tokens=assigned_tokens, inactive_tokens=inactive_tokens) @@ -279,7 +279,7 @@ def register(): If the route is called via POST the provided form data is checked and if the check succeeds the /store-token route will be called which adds the new token to the database. """ - token = door.get_most_recent_token() + token = current_app.door.get_most_recent_token() recent_token = {} if {'token', 'timestamp'}.issubset(set(token.keys())): @@ -297,7 +297,7 @@ def register(): return render_template('register.html', token=recent_token, form=form) elif request.method == 'POST' and form.validate(): # store data in session cookie - session['token'] = door.get_most_recent_token()['token'] + session['token'] = current_app.door.get_most_recent_token()['token'] session['name'] = form.name.data session['email'] = form.email.data session['organization'] = form.organization.data @@ -328,7 +328,7 @@ def edit_token(token): form.dsgvo.validators = [] # we skip the validation of the DSGVO checkbox here because we assume the user agreed # to it before if request.method == 'GET': - tokens = door.get_tokens() + tokens = current_app.door.get_tokens() if token in tokens: # set default for form according to values from the token file et = tokens[token] @@ -374,15 +374,15 @@ def store_token(): edit_token()) and create/modify a token and store the new token file to disk. """ token = session['token'] - tokens = door.get_tokens() + tokens = current_app.door.get_tokens() tokens[token] = {'name': session['name'], 'email': session['email'], 'valid_thru': session['valid_thru'], 'inactive': session['inactive'], 'organization': session['organization']} try: - door.store_tokens(tokens) - logger.info(f"Token {token} stored in database by admin user {current_user.username}") + current_app.door.store_tokens(tokens) + current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}") except Exception as e: flash(f"Error during store_tokens. Exception: {e}") return redirect('/tokens') @@ -397,7 +397,7 @@ def delete_token(token): token : str The token to delete from the database. """ - tokens = door.get_tokens() + tokens = current_app.door.get_tokens() if token in tokens: token_to_delete = tokens[token] @@ -413,8 +413,8 @@ def delete_token(token): # form validation successful -> can delete the token tokens.pop(token) try: - door.store_tokens(tokens) - logger.info(f"Token {token} was deleted from database by admin user {current_user.username}") + current_app.door.store_tokens(tokens) + current_app.logger.info(f"Token {token} was deleted from database by admin user {current_user.username}") except Exception as e: flash(f"Error during store_tokens. Exception: {e}") flash(f"Token {token} wurde gelöscht!") @@ -438,12 +438,12 @@ def deactivate_token(token): token : str The token to deactivate. """ - tokens = door.get_tokens() + tokens = current_app.door.get_tokens() if token in tokens: tokens[token]['inactive'] = True try: - door.store_tokens(tokens) - logger.info(f"Token {token} deactivated by admin user {current_user.username}") + current_app.door.store_tokens(tokens) + current_app.logger.info(f"Token {token} deactivated by admin user {current_user.username}") except Exception as e: flash(f"Error during store_tokens. Exception: {e}") return redirect('/tokens') @@ -452,7 +452,7 @@ def deactivate_token(token): @roles_required('admin') def backup_tokens(): # get list of defined admin users for backup - tokens = door.get_tokens() + tokens = current_app.door.get_tokens() try: with tempfile.TemporaryDirectory() as tmpdir: file = Path(tmpdir, 'token_data.txt') @@ -464,10 +464,9 @@ def backup_tokens(): @door_app.route('/open') @auth_required() def open_door(): - try: - door.open_door(user=current_user.username) - logger.info(f"Door opened by admin user {current_user.username}") + current_app.door.open_door(user=current_user.username) + current_app.logger.info(f"Door opened by admin user {current_user.username}") except Exception as e: flash(f'Could not open door. Exception: {e}') return redirect('/') @@ -477,8 +476,8 @@ def open_door(): @auth_required() def close_door(): try: - door.close_door(user=current_user.username) - logger.info(f"Door closed by admin user {current_user.username}") + current_app.door.close_door(user=current_user.username) + current_app.logger.info(f"Door closed by admin user {current_user.username}") except Exception as e: flash(f'Could not close door. Exception: {e}') return redirect('/')