Compare commits

...

2 Commits

Author SHA1 Message Date
1320fc55ca added test for ldap validation 2022-02-01 23:03:35 +01:00
33779e31b4 generalized ldap authentication 2022-02-01 23:03:06 +01:00
4 changed files with 137 additions and 64 deletions

View File

@ -3,9 +3,6 @@ from flask import Flask
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
from flask_security import Security, SQLAlchemyUserDatastore, hash_password from flask_security import Security, SQLAlchemyUserDatastore, hash_password
from email_validator import validate_email from email_validator import validate_email
import ldap3
from pathlib import Path from pathlib import Path
#from .webapp import door_app #from .webapp import door_app
@ -148,8 +145,6 @@ def create_app():
from . webapp import door_app from . webapp import door_app
app.register_blueprint(door_app) app.register_blueprint(door_app)
ldap_server = ldap3.Server(app.config['LDAP_URL'])
# Setup Flask-Security # Setup Flask-Security
from .auth import ExtendedLoginForm, User, Role from .auth import ExtendedLoginForm, User, Role

View File

@ -31,7 +31,8 @@ class ExtendedLoginForm(LoginForm):
# search for user in the current database # search for user in the current database
user = find_user(self.email.data) user = find_user(self.email.data)
if user is not None: if user is not None:
# if a user is found we check if it is associated with LDAP or with the local database # if a user is found we check if it is associated with LDAP or with
# the local database
if user.has_role('local'): if user.has_role('local'):
# try authorizing locally using Flask security user datastore # try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate() authorized = super(ExtendedLoginForm, self).validate()
@ -40,13 +41,17 @@ class ExtendedLoginForm(LoginForm):
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database") current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database")
else: else:
# run LDAP authorization # run LDAP authorization
# if the authorization succeeds we also get the new_user_data dict which contains information about # if the authorization succeeds we also get the new_user_data
# dict which contains information about
# the user's permissions etc. # the user's permissions etc.
authorized, new_user_data = validate_ldap(user.username, self.password.data) authorized, new_user_data = self.validate_ldap()
if authorized: if authorized:
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through LDAP") current_app.logger.info(
# update permissions and password/email to stay up to date for login with no network connection f"User with credentials '{self.email.data}' authorized "
f"through LDAP")
# update permissions and password/email to stay up to date
# for login with no network connection
user.email = new_user_data['email'] user.email = new_user_data['email']
user.password = new_user_data['password'] user.password = new_user_data['password']
for role in new_user_data['roles']: for role in new_user_data['roles']:
@ -57,13 +62,15 @@ class ExtendedLoginForm(LoginForm):
self.password.errors = ['Invalid password'] self.password.errors = ['Invalid password']
else: else:
# this means there is no user with that email in the database # this means there is no user with that email in the database
# we assume that the username was entered instead of an email and use that for authentication with LDAP # we assume that the username was entered instead of an email and
# use that for authentication with LDAP
username = self.email.data username = self.email.data
# try LDAP authorization and create a new user if it succeeds # try LDAP authorization and create a new user if it succeeds
authorized, new_user_data = validate_ldap(username, self.password.data) authorized, new_user_data = self.validate_ldap()
if authorized: if authorized:
# if there was no user in the database before we create a new user # if there was no user in the database before we create a new
# user
self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'], self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
password=new_user_data['password'], roles=new_user_data['roles']) password=new_user_data['password'], roles=new_user_data['roles'])
security.datastore.commit() security.datastore.commit()
@ -74,55 +81,63 @@ class ExtendedLoginForm(LoginForm):
return authorized return authorized
def validate_ldap(username, password): def validate_ldap(self):
"""Validate the user and password through an LDAP server. """Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized. If the connection completes successfully the given user and password is authorized.
Then the permissions and additional information of the user are obtained through an LDAP search. Then the permissions and additional information of the user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update the entry for the user in the local The data is stored in a dict which will be used later to create/update the entry for the user in the local
database. database.
Parameters Parameters
---------- ----------
username : username for the LDAP server
password : password for the LDAP server
Returns Returns
------- -------
bool : result of the authorization process (True = success, False = failure) bool : result of the authorization process (True = success, False = failure)
dict : dictionary with information about an authorized user (contains username, email, hashed password, dict : dictionary with information about an authorized user (contains username, email, hashed password,
roles) roles)
""" """
ldap_server = ldap3.Server(current_app.config['LDAP_URL'])
ldap_user_group = current_app.config['LDAP_USER_GROUP']
ldap_domain = current_app.config['LDAP_DOMAIN']
ldap_domain_ext = current_app.config['LDAP_DOMAIN_EXT']
try: username = self.email.data
con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de", password = self.password.data
password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError:
# server not reachable -> fail (but will try authorization from local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
return False, None
# get user data and permissions from LDAP server try:
new_user_data = {} user = f"uid={username},ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
new_user_data['username'] = username con = ldap3.Connection(ldap_server,
new_user_data['password'] = hash_password(password) user=user,
new_user_data['roles'] = [] password=password, auto_bind=True)
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de', except ldap3.core.exceptions.LDAPBindError as e:
f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))', # server reachable but user unauthorized -> fail
attributes=ldap3.ALL_ATTRIBUTES) return False, None
authorized = True except LDAPSocketOpenError as e:
if lock_permission: # server not reachable -> fail (but will try authorization from local database later)
new_user_data['email'] = con.entries[0].mail.value return False, None
else: except Exception as e:
authorized = False # for other Exceptions we just fail
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de', return False, None
f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
if token_granting_permission:
new_user_data['roles'].append('admin')
return authorized, new_user_data # get user data and permissions from LDAP server
new_user_data = {}
new_user_data['username'] = username
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
search_base = f"ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
search_filter = f"(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))"
lock_permission = con.search(search_base, search_filter,
attributes=ldap3.ALL_ATTRIBUTES)
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
return False, None
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))'
token_granting_permission = con.search(search_base, search_filter)
if token_granting_permission:
new_user_data['roles'].append('admin')
return True, new_user_data

View File

@ -48,7 +48,12 @@ class DefaultConfig(object):
KEY_FILE = '/root/flask_keys' KEY_FILE = '/root/flask_keys'
TOKEN_FILE = "/etc/door_tokens" TOKEN_FILE = "/etc/door_tokens"
LDAP_URL = "ldaps://ldap.imaginaerraum.de" LDAP_URL = "ldaps://ldap.imaginaerraum.de"
LDAP_USER_GROUP = 'Users'
LDAP_DOMAIN = 'imaginaerraum'
LDAP_DOMAIN_EXT = 'de'
NFC_SOCKET = "/tmp/nfc.sock" NFC_SOCKET = "/tmp/nfc.sock"
LOG_FILE = "/var/log/webinterface.log" LOG_FILE = "/var/log/webinterface.log"
NFC_LOG = "/var/log/nfc.log" NFC_LOG = "/var/log/nfc.log"

View File

@ -4,6 +4,7 @@ import pytest
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from flask_security.utils import find_user from flask_security.utils import find_user
from imaginaerraum_door_admin.door_handle import DoorHandle from imaginaerraum_door_admin.door_handle import DoorHandle
from imaginaerraum_door_admin.auth import ExtendedLoginForm
import re import re
import secrets import secrets
import pathlib import pathlib
@ -53,16 +54,73 @@ def test_login_headless(client):
for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})]) for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
def test_validate_ldap(client, mocker):
# mock ldap Connection object to simulate successful authentication
import ldap3
def init_success(self, *args, **kwargs):
pass
def init_LDAPBindError(self, *args, **kwargs):
raise ldap3.core.exceptions.LDAPBindError()
def init_LDAPSocketOpenError(self, *args, **kwargs):
raise ldap3.core.exceptions.LDAPSocketOpenError()
def init_Exception(self, *args, **kwargs):
raise Exception()
def search_success(self, *args, **kwargs):
return True
def search_failure(self, *args, **kwargs):
return False
mocker.patch.object(ldap3.Connection, '__init__', init_success)
mocker.patch.object(ldap3.Connection, 'search', search_success)
mock_entries = mocker.MagicMock()
mock_entries[0].mail.value = 'user@example.com'
mocker.patch.object(ldap3.Connection, 'entries', mock_entries)
with client.application.app_context():
# test successful login
form = ExtendedLoginForm()
form.email.data = 'user'
form.password.data = 'password'
result = form.validate_ldap()
assert result[0]
assert result[1]['username'] == 'user'
assert result[1]['email'] == 'user@example.com'
assert result[1]['roles'] == ['admin']
# test failing ldap search
mocker.patch.object(ldap3.Connection, 'search', search_failure)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
# test some errors in ldap Connection and authentication
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPBindError)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPSocketOpenError)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
mocker.patch.object(ldap3.Connection, '__init__', init_Exception)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
def test_login_ldap(client, temp_user, mocker): def test_login_ldap(client, temp_user, mocker):
# mock ldap validation for admin user # mock ldap validation for admin user
def mock_validate(username, password): def mock_validate(self):
auth = username == temp_user['username'] and password == temp_user['password'] auth = self.email.data == temp_user['username'] and self.password.data == temp_user['password']
user_data = {'username': temp_user['username'], user_data = {'username': temp_user['username'],
'email': temp_user['email'], 'email': temp_user['email'],
'roles': ['admin'], 'roles': ['admin'],
'password': temp_user['password']} 'password': temp_user['password']}
return auth, user_data return auth, user_data
mocker.patch('imaginaerraum_door_admin.auth.validate_ldap', mock_validate) mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
user = find_user(temp_user['username']) user = find_user(temp_user['username'])
# remove local role so that ldap authentication is the default # remove local role so that ldap authentication is the default
@ -84,14 +142,14 @@ def test_login_ldap(client, temp_user, mocker):
def test_login_ldap_new_user(client, mocker): def test_login_ldap_new_user(client, mocker):
# mock ldap validation for admin user # mock ldap validation for admin user
def mock_validate(username, password): def mock_validate(self):
auth = True auth = True
user_data = {'username': 'Balrog', user_data = {'username': 'Balrog',
'email': 'balrog@moria.me', 'email': 'balrog@moria.me',
'roles': ['admin'], 'roles': ['admin'],
'password': 'youshallnotpass'} 'password': 'youshallnotpass'}
return auth, user_data return auth, user_data
mocker.patch('imaginaerraum_door_admin.auth.validate_ldap', mock_validate) mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
# initially, the Balrog user should not exist # initially, the Balrog user should not exist
user = find_user('Balrog') user = find_user('Balrog')