generalized ldap authentication

blueprint_refactoring
Simon Pirkelmann 2022-02-01 23:03:06 +01:00
parent 2855163948
commit 33779e31b4
3 changed files with 74 additions and 59 deletions

View File

@ -3,9 +3,6 @@ from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
from email_validator import validate_email
import ldap3
from pathlib import Path
#from .webapp import door_app
@ -148,8 +145,6 @@ def create_app():
from . webapp import door_app
app.register_blueprint(door_app)
ldap_server = ldap3.Server(app.config['LDAP_URL'])
# Setup Flask-Security
from .auth import ExtendedLoginForm, User, Role

View File

@ -31,7 +31,8 @@ class ExtendedLoginForm(LoginForm):
# search for user in the current database
user = find_user(self.email.data)
if user is not None:
# if a user is found we check if it is associated with LDAP or with the local database
# if a user is found we check if it is associated with LDAP or with
# the local database
if user.has_role('local'):
# try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate()
@ -40,13 +41,17 @@ class ExtendedLoginForm(LoginForm):
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database")
else:
# run LDAP authorization
# if the authorization succeeds we also get the new_user_data dict which contains information about
# if the authorization succeeds we also get the new_user_data
# dict which contains information about
# the user's permissions etc.
authorized, new_user_data = validate_ldap(user.username, self.password.data)
authorized, new_user_data = self.validate_ldap()
if authorized:
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through LDAP")
# update permissions and password/email to stay up to date for login with no network connection
current_app.logger.info(
f"User with credentials '{self.email.data}' authorized "
f"through LDAP")
# update permissions and password/email to stay up to date
# for login with no network connection
user.email = new_user_data['email']
user.password = new_user_data['password']
for role in new_user_data['roles']:
@ -57,13 +62,15 @@ class ExtendedLoginForm(LoginForm):
self.password.errors = ['Invalid password']
else:
# this means there is no user with that email in the database
# we assume that the username was entered instead of an email and use that for authentication with LDAP
# we assume that the username was entered instead of an email and
# use that for authentication with LDAP
username = self.email.data
# try LDAP authorization and create a new user if it succeeds
authorized, new_user_data = validate_ldap(username, self.password.data)
authorized, new_user_data = self.validate_ldap()
if authorized:
# if there was no user in the database before we create a new user
# if there was no user in the database before we create a new
# user
self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
password=new_user_data['password'], roles=new_user_data['roles'])
security.datastore.commit()
@ -74,55 +81,63 @@ class ExtendedLoginForm(LoginForm):
return authorized
def validate_ldap(username, password):
"""Validate the user and password through an LDAP server.
def validate_ldap(self):
"""Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized.
Then the permissions and additional information of the user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update the entry for the user in the local
database.
If the connection completes successfully the given user and password is authorized.
Then the permissions and additional information of the user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update the entry for the user in the local
database.
Parameters
----------
username : username for the LDAP server
password : password for the LDAP server
Parameters
----------
Returns
-------
bool : result of the authorization process (True = success, False = failure)
dict : dictionary with information about an authorized user (contains username, email, hashed password,
roles)
"""
Returns
-------
bool : result of the authorization process (True = success, False = failure)
dict : dictionary with information about an authorized user (contains username, email, hashed password,
roles)
"""
ldap_server = ldap3.Server(current_app.config['LDAP_URL'])
ldap_user_group = current_app.config['LDAP_USER_GROUP']
ldap_domain = current_app.config['LDAP_DOMAIN']
ldap_domain_ext = current_app.config['LDAP_DOMAIN_EXT']
try:
con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError:
# server not reachable -> fail (but will try authorization from local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
return False, None
username = self.email.data
password = self.password.data
# get user data and permissions from LDAP server
new_user_data = {}
new_user_data['username'] = username
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))',
attributes=ldap3.ALL_ATTRIBUTES)
authorized = True
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
authorized = False
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
if token_granting_permission:
new_user_data['roles'].append('admin')
try:
user = f"uid={username},ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
con = ldap3.Connection(ldap_server,
user=user,
password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError as e:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError as e:
# server not reachable -> fail (but will try authorization from local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
return False, None
return authorized, new_user_data
# get user data and permissions from LDAP server
new_user_data = {}
new_user_data['username'] = username
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
search_base = f"ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
search_filter = f"(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))"
lock_permission = con.search(search_base, search_filter,
attributes=ldap3.ALL_ATTRIBUTES)
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
return False, None
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))'
token_granting_permission = con.search(search_base, search_filter)
if token_granting_permission:
new_user_data['roles'].append('admin')
return True, new_user_data

View File

@ -48,7 +48,12 @@ class DefaultConfig(object):
KEY_FILE = '/root/flask_keys'
TOKEN_FILE = "/etc/door_tokens"
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
LDAP_USER_GROUP = 'Users'
LDAP_DOMAIN = 'imaginaerraum'
LDAP_DOMAIN_EXT = 'de'
NFC_SOCKET = "/tmp/nfc.sock"
LOG_FILE = "/var/log/webinterface.log"
NFC_LOG = "/var/log/nfc.log"