from wtforms.fields import StringField, BooleanField from flask import current_app from flask_security import hash_password from flask_security.forms import LoginForm, Required, PasswordField from flask_security.utils import find_user import ldap3 from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError class ExtendedLoginForm(LoginForm): email = StringField('Benutzername oder E-Mail', [Required()]) password = PasswordField('Passwort', [Required()]) remember = BooleanField('Login merken?') def validate(self): # search for user in the current database user = find_user(self.email.data) if user is not None: # if a user is found we check if it is associated with LDAP or with the local database if user.has_role('local'): # try authorizing locally using Flask security user datastore authorized = super(ExtendedLoginForm, self).validate() if authorized: current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database") else: # run LDAP authorization # if the authorization succeeds we also get the new_user_data dict which contains information about # the user's permissions etc. authorized, new_user_data = validate_ldap(user.username, self.password.data) if authorized: current_app.logger.info(f"User with credentials '{self.email.data}' authorized through LDAP") # update permissions and password/email to stay up to date for login with no network connection user.email = new_user_data['email'] user.password = new_user_data['password'] for role in new_user_data['roles']: user_datastore.add_role_to_user(user, role) user_datastore.commit() self.user = user else: self.password.errors = ['Invalid password'] else: # this means there is no user with that email in the database # we assume that the username was entered instead of an email and use that for authentication with LDAP username = self.email.data # try LDAP authorization and create a new user if it succeeds authorized, new_user_data = validate_ldap(username, self.password.data) if authorized: # if there was no user in the database before we create a new user self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'], password=new_user_data['password'], roles=new_user_data['roles']) user_datastore.commit() current_app.logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after" " successful LDAP authorization") # if any of the authorization methods is successful we authorize the user return authorized def validate_ldap(username, password): """Validate the user and password through an LDAP server. If the connection completes successfully the given user and password is authorized. Then the permissions and additional information of the user are obtained through an LDAP search. The data is stored in a dict which will be used later to create/update the entry for the user in the local database. Parameters ---------- username : username for the LDAP server password : password for the LDAP server Returns ------- bool : result of the authorization process (True = success, False = failure) dict : dictionary with information about an authorized user (contains username, email, hashed password, roles) """ try: con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de", password=password, auto_bind=True) except ldap3.core.exceptions.LDAPBindError: # server reachable but user unauthorized -> fail return False, None except LDAPSocketOpenError: # server not reachable -> fail (but will try authorization from local database later) return False, None except Exception as e: # for other Exceptions we just fail return False, None # get user data and permissions from LDAP server new_user_data = {} new_user_data['username'] = username new_user_data['password'] = hash_password(password) new_user_data['roles'] = [] lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de', f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))', attributes=ldap3.ALL_ATTRIBUTES) authorized = True if lock_permission: new_user_data['email'] = con.entries[0].mail.value else: authorized = False token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de', f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))') if token_granting_permission: new_user_data['roles'].append('admin') return authorized, new_user_data