restructured LDAP authorization procedure
This commit is contained in:
parent
a71f68ade3
commit
ccce39d1a0
|
@ -144,24 +144,25 @@ def create_application(config):
|
|||
|
||||
# LDAP
|
||||
ldap_server = ldap3.Server(config.ldap_url)
|
||||
local_ldap_cache = {} # dict for caching LDAP authorization locally (stores username + hashed password)
|
||||
|
||||
def validate_ldap(username, password):
|
||||
"""Validate the user and password through an LDAP server.
|
||||
|
||||
If the connection completes successfully the given user and password is authorized and the password is stored
|
||||
locally for future authorization without internet connectivity.
|
||||
If the server is not reachable we check the password against a locally stored password (if the user previously
|
||||
authorized through LDAP).
|
||||
If the connection completes successfully the given user and password is authorized.
|
||||
Then the permissions and additional information of the user are obtained through an LDAP search.
|
||||
The data is stored in a dict which will be used later to create/update the entry for the user in the local
|
||||
database.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
user : username for the LDAP server
|
||||
username : username for the LDAP server
|
||||
password : password for the LDAP server
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool : result of the authorization process (True = success, False = failure)
|
||||
dict : dictionary with information about an authorized user (contains username, email, hashed password,
|
||||
roles)
|
||||
"""
|
||||
|
||||
try:
|
||||
|
@ -169,55 +170,77 @@ def create_application(config):
|
|||
password=password, auto_bind=True)
|
||||
except ldap3.core.exceptions.LDAPBindError:
|
||||
# server reachable but user unauthorized -> fail
|
||||
return False
|
||||
return False, None
|
||||
except LDAPSocketOpenError:
|
||||
# server not reachable -> try cached authorization data
|
||||
return user.username in local_ldap_cache and verify_password(password, local_ldap_cache[user.username])
|
||||
# server not reachable -> fail (but will try authorization from local database later)
|
||||
return False, None
|
||||
except Exception as e:
|
||||
# for other Exceptions we just fail
|
||||
return False
|
||||
return False, None
|
||||
|
||||
# TODO check if user has permission to edit tokens
|
||||
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de', f'(&(uid={user.username})(memberof=cn=Members,ou=Groups,dc=imaginaerraum,dc=de))')
|
||||
# get user data and permissions from LDAP server
|
||||
new_user_data = {}
|
||||
new_user_data['username'] = username
|
||||
new_user_data['password'] = hash_password(password)
|
||||
new_user_data['roles'] = []
|
||||
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
|
||||
f'(&(uid={username})(memberof=cn=Members,ou=Groups,dc=imaginaerraum,dc=de))',
|
||||
attributes=ldap3.ALL_ATTRIBUTES)
|
||||
if lock_permission:
|
||||
new_user_data['email'] = con.entries[0].mail.value
|
||||
new_user_data['roles'].append('admin')
|
||||
else:
|
||||
new_user_data['email'] = None
|
||||
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
|
||||
f'(&(uid={user.username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
|
||||
# if LDAP authorization succeeds we cache the password locally (in memory) to allow LDAP authentication even if
|
||||
# the server is not reachable
|
||||
local_ldap_cache[user.username] = hash_password(password)
|
||||
return True
|
||||
f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
|
||||
if token_granting_permission:
|
||||
new_user_data['roles'].append('super_admin')
|
||||
|
||||
return True, new_user_data
|
||||
|
||||
class ExtendedLoginForm(LoginForm):
|
||||
email = StringField('Benutzername oder E-Mail', [Required()])
|
||||
password = PasswordField('Passwort', [Required()])
|
||||
|
||||
def validate(self):
|
||||
# try authorizing using LDAP
|
||||
# authorization in LDAP uses username -> get username associated with email from the database
|
||||
try:
|
||||
# if an email (instead of a username) was entered for authentication we check if there already is a user
|
||||
# with that email in the database
|
||||
validate_email(self.email.data)
|
||||
user = find_user(self.email.data)
|
||||
if user is not None:
|
||||
username = user.username
|
||||
else:
|
||||
# this means there is no user with that email in the database
|
||||
username = None
|
||||
except EmailNotValidError:
|
||||
# else we use the entered credentials as username
|
||||
# search for user in the current database
|
||||
user = find_user(self.email.data)
|
||||
if user is not None:
|
||||
# if a user is found we use that username for LDAP authentication
|
||||
username = user.username
|
||||
else:
|
||||
# this means there is no user with that email in the database
|
||||
# we assume that the username was entered instead of an email and use that for authentication with LDAP
|
||||
username = self.email.data
|
||||
|
||||
authorized = validate_ldap(username, self.password.data)
|
||||
# run LDAP authorization
|
||||
# if the authorization succeeds we also get the new_user_data dict which contains information about the
|
||||
# user's permissions etc.
|
||||
authorized, new_user_data = validate_ldap(username, self.password.data)
|
||||
|
||||
if authorized:
|
||||
if user is None:
|
||||
# if there was no user in the database before we create a new user
|
||||
user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
|
||||
password=new_user_data['password'], roles=new_user_data['roles'])
|
||||
logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
|
||||
" successful LDAP authorization")
|
||||
else: # for existing users we update permissions and password/email to stay up to date
|
||||
user.email = new_user_data['email']
|
||||
user.password = new_user_data['password']
|
||||
for role in new_user_data['roles']:
|
||||
user_datastore.add_role_to_user(user, role)
|
||||
user_datastore.commit()
|
||||
|
||||
self.user = find_user(self.email.data)
|
||||
logger.info(f"Admin user with credentials '{self.email.data}' authorized through LDAP")
|
||||
|
||||
if not authorized:
|
||||
# try authorizing locally using Flask security user datastore
|
||||
authorized = super(ExtendedLoginForm, self).validate()
|
||||
|
||||
if authorized:
|
||||
logger.info(f"Admin user with credentials '{self.email.data}' authorized through local database")
|
||||
if authorized:
|
||||
logger.info(f"Admin user with credentials '{self.email.data}' authorized through local database")
|
||||
|
||||
# if any of the authorization methods is successful we authorize the user
|
||||
return authorized
|
||||
|
|
Loading…
Reference in New Issue
Block a user