fixed formatting

blueprint_refactoring
Simon Pirkelmann 2022-09-12 22:20:40 +02:00
parent 5fb652c1d2
commit 3a6ea5926b
1 changed files with 43 additions and 23 deletions

View File

@ -38,7 +38,10 @@ class ExtendedLoginForm(LoginForm):
authorized = super(ExtendedLoginForm, self).validate()
if authorized:
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database")
current_app.logger.info(
f"User with credentials '{self.email.data}' authorized "
f"through local database"
)
else:
# run LDAP authorization
# if the authorization succeeds we also get the new_user_data
@ -71,32 +74,41 @@ class ExtendedLoginForm(LoginForm):
if authorized:
# if there was no user in the database before we create a new
# user
self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
password=new_user_data['password'], roles=new_user_data['roles'])
self.user = security.datastore.create_user(
username=new_user_data['username'],
email=new_user_data['email'],
password=new_user_data['password'],
roles=new_user_data['roles']
)
security.datastore.commit()
current_app.logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
" successful LDAP authorization")
current_app.logger.info(
f"New admin user '{new_user_data['username']} "
f"<{new_user_data['email']}>' created after successful "
f"LDAP authorization"
)
# if any of the authorization methods is successful we authorize the user
# if any of the authorization methods is successful we authorize
# the user
return authorized
def validate_ldap(self):
"""Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized.
Then the permissions and additional information of the user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update the entry for the user in the local
database.
If the connection completes successfully the given user and password
is authorized. Then the permissions and additional information of the
user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update
the entry for the user in the local database.
Parameters
----------
Returns
-------
bool : result of the authorization process (True = success, False = failure)
dict : dictionary with information about an authorized user (contains username, email, hashed password,
roles)
bool : result of the authorization process (True = success,
False = failure)
dict : dictionary with information about an authorized user
(contains username, email, hashed password, roles)
"""
ldap_server = ldap3.Server(current_app.config['LDAP_URL'])
ldap_user_group = current_app.config['LDAP_USER_GROUP']
@ -107,15 +119,20 @@ class ExtendedLoginForm(LoginForm):
password = self.password.data
try:
user = f"uid={username},ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
con = ldap3.Connection(ldap_server,
user=user,
password=password, auto_bind=True)
user = f"uid={username},ou={ldap_user_group},dc={ldap_domain}," \
f"dc={ldap_domain_ext}"
con = ldap3.Connection(
ldap_server,
user=user,
password=password,
auto_bind=True
)
except ldap3.core.exceptions.LDAPBindError as e:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError as e:
# server not reachable -> fail (but will try authorization from local database later)
# server not reachable -> fail (but will try authorization from
# local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
@ -127,15 +144,18 @@ class ExtendedLoginForm(LoginForm):
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
search_base = f"ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
search_filter = f"(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))"
lock_permission = con.search(search_base, search_filter,
attributes=ldap3.ALL_ATTRIBUTES)
search_filter = f"(&(uid={username})(memberof=cn=Keyholders," \
f"ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))"
lock_permission = con.search(
search_base, search_filter, attributes=ldap3.ALL_ATTRIBUTES
)
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
return False, None
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))'
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,' \
f'dc={ldap_domain},dc={ldap_domain_ext}))'
token_granting_permission = con.search(search_base, search_filter)
if token_granting_permission:
new_user_data['roles'].append('admin')