From ccce39d1a09381d627426f4ffa24acbd2e715c9f Mon Sep 17 00:00:00 2001 From: Simon Pirkelmann Date: Tue, 6 Apr 2021 22:39:21 +0200 Subject: [PATCH] restructured LDAP authorization procedure --- imaginaerraum_door_admin/webapp.py | 91 +++++++++++++++++++----------- 1 file changed, 57 insertions(+), 34 deletions(-) diff --git a/imaginaerraum_door_admin/webapp.py b/imaginaerraum_door_admin/webapp.py index af44cfb..df249f8 100644 --- a/imaginaerraum_door_admin/webapp.py +++ b/imaginaerraum_door_admin/webapp.py @@ -144,24 +144,25 @@ def create_application(config): # LDAP ldap_server = ldap3.Server(config.ldap_url) - local_ldap_cache = {} # dict for caching LDAP authorization locally (stores username + hashed password) def validate_ldap(username, password): """Validate the user and password through an LDAP server. - If the connection completes successfully the given user and password is authorized and the password is stored - locally for future authorization without internet connectivity. - If the server is not reachable we check the password against a locally stored password (if the user previously - authorized through LDAP). + If the connection completes successfully the given user and password is authorized. + Then the permissions and additional information of the user are obtained through an LDAP search. + The data is stored in a dict which will be used later to create/update the entry for the user in the local + database. Parameters ---------- - user : username for the LDAP server + username : username for the LDAP server password : password for the LDAP server Returns ------- bool : result of the authorization process (True = success, False = failure) + dict : dictionary with information about an authorized user (contains username, email, hashed password, + roles) """ try: @@ -169,55 +170,77 @@ def create_application(config): password=password, auto_bind=True) except ldap3.core.exceptions.LDAPBindError: # server reachable but user unauthorized -> fail - return False + return False, None except LDAPSocketOpenError: - # server not reachable -> try cached authorization data - return user.username in local_ldap_cache and verify_password(password, local_ldap_cache[user.username]) + # server not reachable -> fail (but will try authorization from local database later) + return False, None except Exception as e: # for other Exceptions we just fail - return False + return False, None - # TODO check if user has permission to edit tokens - lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de', f'(&(uid={user.username})(memberof=cn=Members,ou=Groups,dc=imaginaerraum,dc=de))') + # get user data and permissions from LDAP server + new_user_data = {} + new_user_data['username'] = username + new_user_data['password'] = hash_password(password) + new_user_data['roles'] = [] + lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de', + f'(&(uid={username})(memberof=cn=Members,ou=Groups,dc=imaginaerraum,dc=de))', + attributes=ldap3.ALL_ATTRIBUTES) + if lock_permission: + new_user_data['email'] = con.entries[0].mail.value + new_user_data['roles'].append('admin') + else: + new_user_data['email'] = None token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de', - f'(&(uid={user.username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))') - # if LDAP authorization succeeds we cache the password locally (in memory) to allow LDAP authentication even if - # the server is not reachable - local_ldap_cache[user.username] = hash_password(password) - return True + f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))') + if token_granting_permission: + new_user_data['roles'].append('super_admin') + + return True, new_user_data class ExtendedLoginForm(LoginForm): email = StringField('Benutzername oder E-Mail', [Required()]) password = PasswordField('Passwort', [Required()]) def validate(self): - # try authorizing using LDAP - # authorization in LDAP uses username -> get username associated with email from the database - try: - # if an email (instead of a username) was entered for authentication we check if there already is a user - # with that email in the database - validate_email(self.email.data) - user = find_user(self.email.data) - if user is not None: - username = user.username - else: - # this means there is no user with that email in the database - username = None - except EmailNotValidError: - # else we use the entered credentials as username + # search for user in the current database + user = find_user(self.email.data) + if user is not None: + # if a user is found we use that username for LDAP authentication + username = user.username + else: + # this means there is no user with that email in the database + # we assume that the username was entered instead of an email and use that for authentication with LDAP username = self.email.data - authorized = validate_ldap(username, self.password.data) + # run LDAP authorization + # if the authorization succeeds we also get the new_user_data dict which contains information about the + # user's permissions etc. + authorized, new_user_data = validate_ldap(username, self.password.data) if authorized: + if user is None: + # if there was no user in the database before we create a new user + user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'], + password=new_user_data['password'], roles=new_user_data['roles']) + logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after" + " successful LDAP authorization") + else: # for existing users we update permissions and password/email to stay up to date + user.email = new_user_data['email'] + user.password = new_user_data['password'] + for role in new_user_data['roles']: + user_datastore.add_role_to_user(user, role) + user_datastore.commit() + + self.user = find_user(self.email.data) logger.info(f"Admin user with credentials '{self.email.data}' authorized through LDAP") if not authorized: # try authorizing locally using Flask security user datastore authorized = super(ExtendedLoginForm, self).validate() - if authorized: - logger.info(f"Admin user with credentials '{self.email.data}' authorized through local database") + if authorized: + logger.info(f"Admin user with credentials '{self.email.data}' authorized through local database") # if any of the authorization methods is successful we authorize the user return authorized