Compare commits

..

No commits in common. "1320fc55ca05c2738c02f2346391c9fc007dcdf3" and "2855163948d823ef7e79545dc9ecc86a3ab91a46" have entirely different histories.

4 changed files with 64 additions and 137 deletions

View File

@ -3,6 +3,9 @@ from flask import Flask
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
from flask_security import Security, SQLAlchemyUserDatastore, hash_password from flask_security import Security, SQLAlchemyUserDatastore, hash_password
from email_validator import validate_email from email_validator import validate_email
import ldap3
from pathlib import Path from pathlib import Path
#from .webapp import door_app #from .webapp import door_app
@ -145,6 +148,8 @@ def create_app():
from . webapp import door_app from . webapp import door_app
app.register_blueprint(door_app) app.register_blueprint(door_app)
ldap_server = ldap3.Server(app.config['LDAP_URL'])
# Setup Flask-Security # Setup Flask-Security
from .auth import ExtendedLoginForm, User, Role from .auth import ExtendedLoginForm, User, Role

View File

@ -31,8 +31,7 @@ class ExtendedLoginForm(LoginForm):
# search for user in the current database # search for user in the current database
user = find_user(self.email.data) user = find_user(self.email.data)
if user is not None: if user is not None:
# if a user is found we check if it is associated with LDAP or with # if a user is found we check if it is associated with LDAP or with the local database
# the local database
if user.has_role('local'): if user.has_role('local'):
# try authorizing locally using Flask security user datastore # try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate() authorized = super(ExtendedLoginForm, self).validate()
@ -41,17 +40,13 @@ class ExtendedLoginForm(LoginForm):
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database") current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database")
else: else:
# run LDAP authorization # run LDAP authorization
# if the authorization succeeds we also get the new_user_data # if the authorization succeeds we also get the new_user_data dict which contains information about
# dict which contains information about
# the user's permissions etc. # the user's permissions etc.
authorized, new_user_data = self.validate_ldap() authorized, new_user_data = validate_ldap(user.username, self.password.data)
if authorized: if authorized:
current_app.logger.info( current_app.logger.info(f"User with credentials '{self.email.data}' authorized through LDAP")
f"User with credentials '{self.email.data}' authorized " # update permissions and password/email to stay up to date for login with no network connection
f"through LDAP")
# update permissions and password/email to stay up to date
# for login with no network connection
user.email = new_user_data['email'] user.email = new_user_data['email']
user.password = new_user_data['password'] user.password = new_user_data['password']
for role in new_user_data['roles']: for role in new_user_data['roles']:
@ -62,15 +57,13 @@ class ExtendedLoginForm(LoginForm):
self.password.errors = ['Invalid password'] self.password.errors = ['Invalid password']
else: else:
# this means there is no user with that email in the database # this means there is no user with that email in the database
# we assume that the username was entered instead of an email and # we assume that the username was entered instead of an email and use that for authentication with LDAP
# use that for authentication with LDAP
username = self.email.data username = self.email.data
# try LDAP authorization and create a new user if it succeeds # try LDAP authorization and create a new user if it succeeds
authorized, new_user_data = self.validate_ldap() authorized, new_user_data = validate_ldap(username, self.password.data)
if authorized: if authorized:
# if there was no user in the database before we create a new # if there was no user in the database before we create a new user
# user
self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'], self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
password=new_user_data['password'], roles=new_user_data['roles']) password=new_user_data['password'], roles=new_user_data['roles'])
security.datastore.commit() security.datastore.commit()
@ -81,7 +74,7 @@ class ExtendedLoginForm(LoginForm):
return authorized return authorized
def validate_ldap(self): def validate_ldap(username, password):
"""Validate the user and password through an LDAP server. """Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized. If the connection completes successfully the given user and password is authorized.
@ -91,6 +84,8 @@ class ExtendedLoginForm(LoginForm):
Parameters Parameters
---------- ----------
username : username for the LDAP server
password : password for the LDAP server
Returns Returns
------- -------
@ -98,23 +93,14 @@ class ExtendedLoginForm(LoginForm):
dict : dictionary with information about an authorized user (contains username, email, hashed password, dict : dictionary with information about an authorized user (contains username, email, hashed password,
roles) roles)
""" """
ldap_server = ldap3.Server(current_app.config['LDAP_URL'])
ldap_user_group = current_app.config['LDAP_USER_GROUP']
ldap_domain = current_app.config['LDAP_DOMAIN']
ldap_domain_ext = current_app.config['LDAP_DOMAIN_EXT']
username = self.email.data
password = self.password.data
try: try:
user = f"uid={username},ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}" con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
con = ldap3.Connection(ldap_server,
user=user,
password=password, auto_bind=True) password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError as e: except ldap3.core.exceptions.LDAPBindError:
# server reachable but user unauthorized -> fail # server reachable but user unauthorized -> fail
return False, None return False, None
except LDAPSocketOpenError as e: except LDAPSocketOpenError:
# server not reachable -> fail (but will try authorization from local database later) # server not reachable -> fail (but will try authorization from local database later)
return False, None return False, None
except Exception as e: except Exception as e:
@ -126,18 +112,17 @@ class ExtendedLoginForm(LoginForm):
new_user_data['username'] = username new_user_data['username'] = username
new_user_data['password'] = hash_password(password) new_user_data['password'] = hash_password(password)
new_user_data['roles'] = [] new_user_data['roles'] = []
search_base = f"ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}" lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
search_filter = f"(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))" f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))',
lock_permission = con.search(search_base, search_filter,
attributes=ldap3.ALL_ATTRIBUTES) attributes=ldap3.ALL_ATTRIBUTES)
authorized = True
if lock_permission: if lock_permission:
new_user_data['email'] = con.entries[0].mail.value new_user_data['email'] = con.entries[0].mail.value
else: else:
return False, None authorized = False
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))' token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
token_granting_permission = con.search(search_base, search_filter) f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
if token_granting_permission: if token_granting_permission:
new_user_data['roles'].append('admin') new_user_data['roles'].append('admin')
return True, new_user_data return authorized, new_user_data

View File

@ -48,12 +48,7 @@ class DefaultConfig(object):
KEY_FILE = '/root/flask_keys' KEY_FILE = '/root/flask_keys'
TOKEN_FILE = "/etc/door_tokens" TOKEN_FILE = "/etc/door_tokens"
LDAP_URL = "ldaps://ldap.imaginaerraum.de" LDAP_URL = "ldaps://ldap.imaginaerraum.de"
LDAP_USER_GROUP = 'Users'
LDAP_DOMAIN = 'imaginaerraum'
LDAP_DOMAIN_EXT = 'de'
NFC_SOCKET = "/tmp/nfc.sock" NFC_SOCKET = "/tmp/nfc.sock"
LOG_FILE = "/var/log/webinterface.log" LOG_FILE = "/var/log/webinterface.log"
NFC_LOG = "/var/log/nfc.log" NFC_LOG = "/var/log/nfc.log"

View File

@ -4,7 +4,6 @@ import pytest
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from flask_security.utils import find_user from flask_security.utils import find_user
from imaginaerraum_door_admin.door_handle import DoorHandle from imaginaerraum_door_admin.door_handle import DoorHandle
from imaginaerraum_door_admin.auth import ExtendedLoginForm
import re import re
import secrets import secrets
import pathlib import pathlib
@ -54,73 +53,16 @@ def test_login_headless(client):
for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})]) for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
def test_validate_ldap(client, mocker):
# mock ldap Connection object to simulate successful authentication
import ldap3
def init_success(self, *args, **kwargs):
pass
def init_LDAPBindError(self, *args, **kwargs):
raise ldap3.core.exceptions.LDAPBindError()
def init_LDAPSocketOpenError(self, *args, **kwargs):
raise ldap3.core.exceptions.LDAPSocketOpenError()
def init_Exception(self, *args, **kwargs):
raise Exception()
def search_success(self, *args, **kwargs):
return True
def search_failure(self, *args, **kwargs):
return False
mocker.patch.object(ldap3.Connection, '__init__', init_success)
mocker.patch.object(ldap3.Connection, 'search', search_success)
mock_entries = mocker.MagicMock()
mock_entries[0].mail.value = 'user@example.com'
mocker.patch.object(ldap3.Connection, 'entries', mock_entries)
with client.application.app_context():
# test successful login
form = ExtendedLoginForm()
form.email.data = 'user'
form.password.data = 'password'
result = form.validate_ldap()
assert result[0]
assert result[1]['username'] == 'user'
assert result[1]['email'] == 'user@example.com'
assert result[1]['roles'] == ['admin']
# test failing ldap search
mocker.patch.object(ldap3.Connection, 'search', search_failure)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
# test some errors in ldap Connection and authentication
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPBindError)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPSocketOpenError)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
mocker.patch.object(ldap3.Connection, '__init__', init_Exception)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
def test_login_ldap(client, temp_user, mocker): def test_login_ldap(client, temp_user, mocker):
# mock ldap validation for admin user # mock ldap validation for admin user
def mock_validate(self): def mock_validate(username, password):
auth = self.email.data == temp_user['username'] and self.password.data == temp_user['password'] auth = username == temp_user['username'] and password == temp_user['password']
user_data = {'username': temp_user['username'], user_data = {'username': temp_user['username'],
'email': temp_user['email'], 'email': temp_user['email'],
'roles': ['admin'], 'roles': ['admin'],
'password': temp_user['password']} 'password': temp_user['password']}
return auth, user_data return auth, user_data
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate) mocker.patch('imaginaerraum_door_admin.auth.validate_ldap', mock_validate)
user = find_user(temp_user['username']) user = find_user(temp_user['username'])
# remove local role so that ldap authentication is the default # remove local role so that ldap authentication is the default
@ -142,14 +84,14 @@ def test_login_ldap(client, temp_user, mocker):
def test_login_ldap_new_user(client, mocker): def test_login_ldap_new_user(client, mocker):
# mock ldap validation for admin user # mock ldap validation for admin user
def mock_validate(self): def mock_validate(username, password):
auth = True auth = True
user_data = {'username': 'Balrog', user_data = {'username': 'Balrog',
'email': 'balrog@moria.me', 'email': 'balrog@moria.me',
'roles': ['admin'], 'roles': ['admin'],
'password': 'youshallnotpass'} 'password': 'youshallnotpass'}
return auth, user_data return auth, user_data
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate) mocker.patch('imaginaerraum_door_admin.auth.validate_ldap', mock_validate)
# initially, the Balrog user should not exist # initially, the Balrog user should not exist
user = find_user('Balrog') user = find_user('Balrog')