worked on ldap validation

This commit is contained in:
Simon Pirkelmann 2021-04-06 17:53:41 +02:00
parent 312549ac15
commit a71f68ade3

View File

@ -12,7 +12,7 @@ from flask_security.models import fsqla_v2 as fsqla
from flask_security.forms import LoginForm, Required, PasswordField from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import find_user, verify_password from flask_security.utils import find_user, verify_password
from flask_mail import Mail from flask_mail import Mail
from email_validator import validate_email from email_validator import validate_email, EmailNotValidError
import json import json
import secrets import secrets
@ -146,7 +146,7 @@ def create_application(config):
ldap_server = ldap3.Server(config.ldap_url) ldap_server = ldap3.Server(config.ldap_url)
local_ldap_cache = {} # dict for caching LDAP authorization locally (stores username + hashed password) local_ldap_cache = {} # dict for caching LDAP authorization locally (stores username + hashed password)
def validate_ldap(user, password): def validate_ldap(username, password):
"""Validate the user and password through an LDAP server. """Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized and the password is stored If the connection completes successfully the given user and password is authorized and the password is stored
@ -165,7 +165,7 @@ def create_application(config):
""" """
try: try:
con = ldap3.Connection(ldap_server, user="uid=%s,ou=Users,dc=imaginaerraum,dc=de" % (user.username,), con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
password=password, auto_bind=True) password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError: except ldap3.core.exceptions.LDAPBindError:
# server reachable but user unauthorized -> fail # server reachable but user unauthorized -> fail
@ -173,11 +173,14 @@ def create_application(config):
except LDAPSocketOpenError: except LDAPSocketOpenError:
# server not reachable -> try cached authorization data # server not reachable -> try cached authorization data
return user.username in local_ldap_cache and verify_password(password, local_ldap_cache[user.username]) return user.username in local_ldap_cache and verify_password(password, local_ldap_cache[user.username])
except Exception: except Exception as e:
# for other Exceptions we just fail # for other Exceptions we just fail
return False return False
# TODO check if user has permission to edit tokens # TODO check if user has permission to edit tokens
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de', f'(&(uid={user.username})(memberof=cn=Members,ou=Groups,dc=imaginaerraum,dc=de))')
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={user.username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
# if LDAP authorization succeeds we cache the password locally (in memory) to allow LDAP authentication even if # if LDAP authorization succeeds we cache the password locally (in memory) to allow LDAP authentication even if
# the server is not reachable # the server is not reachable
local_ldap_cache[user.username] = hash_password(password) local_ldap_cache[user.username] = hash_password(password)
@ -188,17 +191,32 @@ def create_application(config):
password = PasswordField('Passwort', [Required()]) password = PasswordField('Passwort', [Required()])
def validate(self): def validate(self):
# try authorizing using LDAP
# authorization in LDAP uses username -> get username associated with email from the database
try:
# if an email (instead of a username) was entered for authentication we check if there already is a user
# with that email in the database
validate_email(self.email.data)
user = find_user(self.email.data)
if user is not None:
username = user.username
else:
# this means there is no user with that email in the database
username = None
except EmailNotValidError:
# else we use the entered credentials as username
username = self.email.data
authorized = validate_ldap(username, self.password.data)
if authorized:
logger.info(f"Admin user with credentials '{self.email.data}' authorized through LDAP")
if not authorized:
# try authorizing locally using Flask security user datastore # try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate() authorized = super(ExtendedLoginForm, self).validate()
if not authorized:
# try authorizing using LDAP
# authorization in LDAP uses username -> get username associated with email from the database
user = find_user(self.email.data)
authorized = validate_ldap(user, self.password.data)
if authorized: if authorized:
logger.info(f"Admin user with credentials '{self.email.data}' authorized through LDAP")
else:
logger.info(f"Admin user with credentials '{self.email.data}' authorized through local database") logger.info(f"Admin user with credentials '{self.email.data}' authorized through local database")
# if any of the authorization methods is successful we authorize the user # if any of the authorization methods is successful we authorize the user