from wtforms.fields import StringField, BooleanField from flask import current_app from flask_security import hash_password from flask_security.forms import LoginForm, Required, PasswordField from flask_security.utils import lookup_identity from flask_security.models import fsqla_v2 as fsqla import ldap3 from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError from imaginaerraum_door_admin import db, security # Define models fsqla.FsModels.set_db_info(db) class Role(db.Model, fsqla.FsRoleMixin): pass class User(db.Model, fsqla.FsUserMixin): pass class ExtendedLoginForm(LoginForm): email = StringField('Benutzername oder E-Mail', [Required()]) password = PasswordField('Passwort', [Required()]) remember = BooleanField('Login merken?') def validate(self): # search for user in the current database user = lookup_identity(self.email.data) if user is not None: # if a user is found we check if it is associated with LDAP or with # the local database if user.has_role('local'): # try authorizing locally using Flask security user datastore authorized = super(ExtendedLoginForm, self).validate() if authorized: current_app.logger.info( f"User with credentials '{self.email.data}' authorized " f"through local database" ) else: # run LDAP authorization # if the authorization succeeds we also get the new_user_data # dict which contains information about # the user's permissions etc. authorized, new_user_data = self.validate_ldap() if authorized: current_app.logger.info( f"User with credentials '{self.email.data}' authorized " f"through LDAP") # update permissions and password/email to stay up to date # for login with no network connection user.email = new_user_data['email'] user.password = new_user_data['password'] for role in new_user_data['roles']: security.datastore.add_role_to_user(user, role) security.datastore.commit() self.user = user else: self.password.errors = ['Invalid password'] else: # this means there is no user with that email in the database # we assume that the username was entered instead of an email and # use that for authentication with LDAP username = self.email.data # try LDAP authorization and create a new user if it succeeds authorized, new_user_data = self.validate_ldap() if authorized: # if there was no user in the database before we create a new # user self.user = security.datastore.create_user( username=new_user_data['username'], email=new_user_data['email'], password=new_user_data['password'], roles=new_user_data['roles'] ) security.datastore.commit() current_app.logger.info( f"New admin user '{new_user_data['username']} " f"<{new_user_data['email']}>' created after successful " f"LDAP authorization" ) # if any of the authorization methods is successful we authorize # the user return authorized def validate_ldap(self): """Validate the user and password through an LDAP server. If the connection completes successfully the given user and password is authorized. Then the permissions and additional information of the user are obtained through an LDAP search. The data is stored in a dict which will be used later to create/update the entry for the user in the local database. Parameters ---------- Returns ------- bool : result of the authorization process (True = success, False = failure) dict : dictionary with information about an authorized user (contains username, email, hashed password, roles) """ ldap_server = ldap3.Server(current_app.config['LDAP_URL']) ldap_user_group = current_app.config['LDAP_USER_GROUP'] ldap_domain = current_app.config['LDAP_DOMAIN'] ldap_domain_ext = current_app.config['LDAP_DOMAIN_EXT'] username = self.email.data password = self.password.data try: user = f"uid={username},ou={ldap_user_group},dc={ldap_domain}," \ f"dc={ldap_domain_ext}" con = ldap3.Connection( ldap_server, user=user, password=password, auto_bind=True ) except ldap3.core.exceptions.LDAPBindError as e: # server reachable but user unauthorized -> fail return False, None except LDAPSocketOpenError as e: # server not reachable -> fail (but will try authorization from # local database later) return False, None except Exception as e: # for other Exceptions we just fail return False, None # get user data and permissions from LDAP server new_user_data = {} new_user_data['username'] = username new_user_data['password'] = hash_password(password) new_user_data['roles'] = [] search_base = f"ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}" search_filter = f"(&(uid={username})(memberof=cn=Keyholders," \ f"ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))" lock_permission = con.search( search_base, search_filter, attributes=ldap3.ALL_ATTRIBUTES ) if lock_permission: new_user_data['email'] = con.entries[0].mail.value else: return False, None search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,' \ f'dc={ldap_domain},dc={ldap_domain_ext}))' token_granting_permission = con.search(search_base, search_filter) if token_granting_permission: new_user_data['roles'].append('admin') return True, new_user_data