Compare commits

...

2 Commits

Author SHA1 Message Date
Simon Pirkelmann 1320fc55ca added test for ldap validation 2022-02-01 23:03:35 +01:00
Simon Pirkelmann 33779e31b4 generalized ldap authentication 2022-02-01 23:03:06 +01:00
4 changed files with 137 additions and 64 deletions

View File

@ -3,9 +3,6 @@ from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
from email_validator import validate_email
import ldap3
from pathlib import Path
#from .webapp import door_app
@ -148,8 +145,6 @@ def create_app():
from . webapp import door_app
app.register_blueprint(door_app)
ldap_server = ldap3.Server(app.config['LDAP_URL'])
# Setup Flask-Security
from .auth import ExtendedLoginForm, User, Role

View File

@ -31,7 +31,8 @@ class ExtendedLoginForm(LoginForm):
# search for user in the current database
user = find_user(self.email.data)
if user is not None:
# if a user is found we check if it is associated with LDAP or with the local database
# if a user is found we check if it is associated with LDAP or with
# the local database
if user.has_role('local'):
# try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate()
@ -40,13 +41,17 @@ class ExtendedLoginForm(LoginForm):
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database")
else:
# run LDAP authorization
# if the authorization succeeds we also get the new_user_data dict which contains information about
# if the authorization succeeds we also get the new_user_data
# dict which contains information about
# the user's permissions etc.
authorized, new_user_data = validate_ldap(user.username, self.password.data)
authorized, new_user_data = self.validate_ldap()
if authorized:
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through LDAP")
# update permissions and password/email to stay up to date for login with no network connection
current_app.logger.info(
f"User with credentials '{self.email.data}' authorized "
f"through LDAP")
# update permissions and password/email to stay up to date
# for login with no network connection
user.email = new_user_data['email']
user.password = new_user_data['password']
for role in new_user_data['roles']:
@ -57,13 +62,15 @@ class ExtendedLoginForm(LoginForm):
self.password.errors = ['Invalid password']
else:
# this means there is no user with that email in the database
# we assume that the username was entered instead of an email and use that for authentication with LDAP
# we assume that the username was entered instead of an email and
# use that for authentication with LDAP
username = self.email.data
# try LDAP authorization and create a new user if it succeeds
authorized, new_user_data = validate_ldap(username, self.password.data)
authorized, new_user_data = self.validate_ldap()
if authorized:
# if there was no user in the database before we create a new user
# if there was no user in the database before we create a new
# user
self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
password=new_user_data['password'], roles=new_user_data['roles'])
security.datastore.commit()
@ -74,55 +81,63 @@ class ExtendedLoginForm(LoginForm):
return authorized
def validate_ldap(username, password):
"""Validate the user and password through an LDAP server.
def validate_ldap(self):
"""Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized.
Then the permissions and additional information of the user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update the entry for the user in the local
database.
If the connection completes successfully the given user and password is authorized.
Then the permissions and additional information of the user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update the entry for the user in the local
database.
Parameters
----------
username : username for the LDAP server
password : password for the LDAP server
Parameters
----------
Returns
-------
bool : result of the authorization process (True = success, False = failure)
dict : dictionary with information about an authorized user (contains username, email, hashed password,
roles)
"""
Returns
-------
bool : result of the authorization process (True = success, False = failure)
dict : dictionary with information about an authorized user (contains username, email, hashed password,
roles)
"""
ldap_server = ldap3.Server(current_app.config['LDAP_URL'])
ldap_user_group = current_app.config['LDAP_USER_GROUP']
ldap_domain = current_app.config['LDAP_DOMAIN']
ldap_domain_ext = current_app.config['LDAP_DOMAIN_EXT']
try:
con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError:
# server not reachable -> fail (but will try authorization from local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
return False, None
username = self.email.data
password = self.password.data
# get user data and permissions from LDAP server
new_user_data = {}
new_user_data['username'] = username
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))',
attributes=ldap3.ALL_ATTRIBUTES)
authorized = True
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
authorized = False
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
if token_granting_permission:
new_user_data['roles'].append('admin')
try:
user = f"uid={username},ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
con = ldap3.Connection(ldap_server,
user=user,
password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError as e:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError as e:
# server not reachable -> fail (but will try authorization from local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
return False, None
return authorized, new_user_data
# get user data and permissions from LDAP server
new_user_data = {}
new_user_data['username'] = username
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
search_base = f"ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
search_filter = f"(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))"
lock_permission = con.search(search_base, search_filter,
attributes=ldap3.ALL_ATTRIBUTES)
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
return False, None
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))'
token_granting_permission = con.search(search_base, search_filter)
if token_granting_permission:
new_user_data['roles'].append('admin')
return True, new_user_data

View File

@ -48,7 +48,12 @@ class DefaultConfig(object):
KEY_FILE = '/root/flask_keys'
TOKEN_FILE = "/etc/door_tokens"
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
LDAP_USER_GROUP = 'Users'
LDAP_DOMAIN = 'imaginaerraum'
LDAP_DOMAIN_EXT = 'de'
NFC_SOCKET = "/tmp/nfc.sock"
LOG_FILE = "/var/log/webinterface.log"
NFC_LOG = "/var/log/nfc.log"

View File

@ -4,6 +4,7 @@ import pytest
from bs4 import BeautifulSoup
from flask_security.utils import find_user
from imaginaerraum_door_admin.door_handle import DoorHandle
from imaginaerraum_door_admin.auth import ExtendedLoginForm
import re
import secrets
import pathlib
@ -53,16 +54,73 @@ def test_login_headless(client):
for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
def test_validate_ldap(client, mocker):
# mock ldap Connection object to simulate successful authentication
import ldap3
def init_success(self, *args, **kwargs):
pass
def init_LDAPBindError(self, *args, **kwargs):
raise ldap3.core.exceptions.LDAPBindError()
def init_LDAPSocketOpenError(self, *args, **kwargs):
raise ldap3.core.exceptions.LDAPSocketOpenError()
def init_Exception(self, *args, **kwargs):
raise Exception()
def search_success(self, *args, **kwargs):
return True
def search_failure(self, *args, **kwargs):
return False
mocker.patch.object(ldap3.Connection, '__init__', init_success)
mocker.patch.object(ldap3.Connection, 'search', search_success)
mock_entries = mocker.MagicMock()
mock_entries[0].mail.value = 'user@example.com'
mocker.patch.object(ldap3.Connection, 'entries', mock_entries)
with client.application.app_context():
# test successful login
form = ExtendedLoginForm()
form.email.data = 'user'
form.password.data = 'password'
result = form.validate_ldap()
assert result[0]
assert result[1]['username'] == 'user'
assert result[1]['email'] == 'user@example.com'
assert result[1]['roles'] == ['admin']
# test failing ldap search
mocker.patch.object(ldap3.Connection, 'search', search_failure)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
# test some errors in ldap Connection and authentication
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPBindError)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPSocketOpenError)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
mocker.patch.object(ldap3.Connection, '__init__', init_Exception)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
def test_login_ldap(client, temp_user, mocker):
# mock ldap validation for admin user
def mock_validate(username, password):
auth = username == temp_user['username'] and password == temp_user['password']
def mock_validate(self):
auth = self.email.data == temp_user['username'] and self.password.data == temp_user['password']
user_data = {'username': temp_user['username'],
'email': temp_user['email'],
'roles': ['admin'],
'password': temp_user['password']}
return auth, user_data
mocker.patch('imaginaerraum_door_admin.auth.validate_ldap', mock_validate)
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
user = find_user(temp_user['username'])
# remove local role so that ldap authentication is the default
@ -84,14 +142,14 @@ def test_login_ldap(client, temp_user, mocker):
def test_login_ldap_new_user(client, mocker):
# mock ldap validation for admin user
def mock_validate(username, password):
def mock_validate(self):
auth = True
user_data = {'username': 'Balrog',
'email': 'balrog@moria.me',
'roles': ['admin'],
'password': 'youshallnotpass'}
return auth, user_data
mocker.patch('imaginaerraum_door_admin.auth.validate_ldap', mock_validate)
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
# initially, the Balrog user should not exist
user = find_user('Balrog')