diff --git a/imaginaerraum_door_admin/webapp.py b/imaginaerraum_door_admin/webapp.py index 3d633dc..28ab475 100644 --- a/imaginaerraum_door_admin/webapp.py +++ b/imaginaerraum_door_admin/webapp.py @@ -10,6 +10,7 @@ from flask_security import Security, SQLAlchemyUserDatastore, auth_required, has from flask_security.models import fsqla_v2 as fsqla from flask_security.forms import LoginForm, Required, PasswordField from flask_security.utils import find_user +from flask_security.views import change_password from flask_mail import Mail from email_validator import validate_email @@ -162,7 +163,7 @@ def create_application(config): pass class User(db.Model, fsqla.FsUserMixin): - username = db.Column(db.String(255)) + pass # LDAP ldap_server = ldap3.Server(config.ldap_url) @@ -228,45 +229,61 @@ def create_application(config): # search for user in the current database user = find_user(self.email.data) if user is not None: - # if a user is found we use that username for LDAP authentication - username = user.username + # if a user is found we check if it is associated with LDAP or with the local database + if user.has_role('local'): + # try authorizing locally using Flask security user datastore + authorized = super(ExtendedLoginForm, self).validate() + + if authorized: + logger.info(f"User with credentials '{self.email.data}' authorized through local database") + else: + # run LDAP authorization + # if the authorization succeeds we also get the new_user_data dict which contains information about + # the user's permissions etc. + authorized, new_user_data = validate_ldap(user.username, self.password.data) + + if authorized: + logger.info(f"User with credentials '{self.email.data}' authorized through LDAP") + # update permissions and password/email to stay up to date for login with no network connection + user.email = new_user_data['email'] + user.password = new_user_data['password'] + for role in new_user_data['roles']: + user_datastore.add_role_to_user(user, role) + user_datastore.commit() + self.user = user + else: + self.password.errors = ['Invalid password'] else: # this means there is no user with that email in the database # we assume that the username was entered instead of an email and use that for authentication with LDAP username = self.email.data - - # run LDAP authorization - # if the authorization succeeds we also get the new_user_data dict which contains information about the - # user's permissions etc. - authorized, new_user_data = validate_ldap(username, self.password.data) - - if authorized: - if user is None: - # if there was no user in the database before we create a new user - user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'], - password=new_user_data['password'], roles=new_user_data['roles']) - logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after" - " successful LDAP authorization") - else: # for existing users we update permissions and password/email to stay up to date - user.email = new_user_data['email'] - user.password = new_user_data['password'] - for role in new_user_data['roles']: - user_datastore.add_role_to_user(user, role) - user_datastore.commit() - - self.user = find_user(self.email.data) - logger.info(f"User with credentials '{self.email.data}' authorized through LDAP") - - if not authorized: - # try authorizing locally using Flask security user datastore - authorized = super(ExtendedLoginForm, self).validate() + # try LDAP authorization and create a new user if it succeeds + authorized, new_user_data = validate_ldap(username, self.password.data) if authorized: - logger.info(f"User with credentials '{self.email.data}' authorized through local database") + # if there was no user in the database before we create a new user + self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'], + password=new_user_data['password'], roles=new_user_data['roles']) + user_datastore.commit() + logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after" + " successful LDAP authorization") # if any of the authorization methods is successful we authorize the user return authorized + # we override the change_password view from flask security to only allow local users to change their passwords + # LDAP users should use the LDAP self service for changing passwords + # this route needs to be defined before the Flask Security setup + @app.route('/change', methods=['GET', 'POST']) + @auth_required() + def change_pw(): + if current_user.has_role('local'): + # local users can change their password + return change_password() + else: + # LDAP users get redirected to the LDAP self service + return redirect('https://ldap.imaginaerraum.de/') + # Setup Flask-Security user_datastore = SQLAlchemyUserDatastore(db, User, Role) security = Security(app, user_datastore, login_form=ExtendedLoginForm) @@ -686,29 +703,35 @@ def create_application(config): else: # store data for new admins in memory s.t. the file can be deleted afterwards with open(config.admin_file) as f: - for i, d in enumerate(f.readlines()): - try: - user, email, pw = d.split() - validate_email(email) - new_admin_data.append({'username': user, 'email': email, 'password': pw}) - except Exception as e: - print( - f"Error while parsing line {i} in admin config file. Config file should contain lines of " - f"' \\n'\n Exception: {e}\nAdmin account could not be created.") + for i, line in enumerate(f.readlines()): + if not line.strip().startswith('#'): + try: + user, email, pw = line.split() + validate_email(email) + new_admin_data.append({'username': user, 'email': email, 'password': pw}) + except Exception as e: + print( + f"Error while parsing line {i} in admin config file. Config file should contain lines of " + f"' \\n'\n Exception: {e}\nAdmin account could not be created.") # create admin users (only if they don't exists already) def create_super_admins(new_admin_data): db.create_all() super_admin_role = user_datastore.find_or_create_role('super_admin') # root admin = can create other admins admin_role = user_datastore.find_or_create_role('admin') # 'normal' admin + local_role = user_datastore.find_or_create_role('local') # LDAP user or local user for d in new_admin_data: if user_datastore.find_user(email=d['email'], username=d['username']) is None: - logger.info(f"New super admin user created with username '{d['username']}' and email '{d['email']}'") + roles = [super_admin_role, admin_role] + if not d['password'] == 'LDAP': + roles.append(local_role) + logger.info(f"New super admin user created with username '{d['username']}' and email '{d['email']}', roles = {[r.name for r in roles]}") + # create new admin (only if admin does not already exist) new_admin = user_datastore.create_user(email=d['email'], username=d['username'], password=hash_password(d['password']), - roles=[super_admin_role, admin_role]) + roles=roles) db.session.commit() create_super_admins(new_admin_data)