Compare commits
2 Commits
2855163948
...
1320fc55ca
Author | SHA1 | Date | |
---|---|---|---|
1320fc55ca | |||
33779e31b4 |
|
@ -3,9 +3,6 @@ from flask import Flask
|
||||||
from flask_sqlalchemy import SQLAlchemy
|
from flask_sqlalchemy import SQLAlchemy
|
||||||
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
|
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
|
||||||
from email_validator import validate_email
|
from email_validator import validate_email
|
||||||
|
|
||||||
import ldap3
|
|
||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
#from .webapp import door_app
|
#from .webapp import door_app
|
||||||
|
@ -148,8 +145,6 @@ def create_app():
|
||||||
from . webapp import door_app
|
from . webapp import door_app
|
||||||
app.register_blueprint(door_app)
|
app.register_blueprint(door_app)
|
||||||
|
|
||||||
ldap_server = ldap3.Server(app.config['LDAP_URL'])
|
|
||||||
|
|
||||||
# Setup Flask-Security
|
# Setup Flask-Security
|
||||||
from .auth import ExtendedLoginForm, User, Role
|
from .auth import ExtendedLoginForm, User, Role
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,8 @@ class ExtendedLoginForm(LoginForm):
|
||||||
# search for user in the current database
|
# search for user in the current database
|
||||||
user = find_user(self.email.data)
|
user = find_user(self.email.data)
|
||||||
if user is not None:
|
if user is not None:
|
||||||
# if a user is found we check if it is associated with LDAP or with the local database
|
# if a user is found we check if it is associated with LDAP or with
|
||||||
|
# the local database
|
||||||
if user.has_role('local'):
|
if user.has_role('local'):
|
||||||
# try authorizing locally using Flask security user datastore
|
# try authorizing locally using Flask security user datastore
|
||||||
authorized = super(ExtendedLoginForm, self).validate()
|
authorized = super(ExtendedLoginForm, self).validate()
|
||||||
|
@ -40,13 +41,17 @@ class ExtendedLoginForm(LoginForm):
|
||||||
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database")
|
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database")
|
||||||
else:
|
else:
|
||||||
# run LDAP authorization
|
# run LDAP authorization
|
||||||
# if the authorization succeeds we also get the new_user_data dict which contains information about
|
# if the authorization succeeds we also get the new_user_data
|
||||||
|
# dict which contains information about
|
||||||
# the user's permissions etc.
|
# the user's permissions etc.
|
||||||
authorized, new_user_data = validate_ldap(user.username, self.password.data)
|
authorized, new_user_data = self.validate_ldap()
|
||||||
|
|
||||||
if authorized:
|
if authorized:
|
||||||
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through LDAP")
|
current_app.logger.info(
|
||||||
# update permissions and password/email to stay up to date for login with no network connection
|
f"User with credentials '{self.email.data}' authorized "
|
||||||
|
f"through LDAP")
|
||||||
|
# update permissions and password/email to stay up to date
|
||||||
|
# for login with no network connection
|
||||||
user.email = new_user_data['email']
|
user.email = new_user_data['email']
|
||||||
user.password = new_user_data['password']
|
user.password = new_user_data['password']
|
||||||
for role in new_user_data['roles']:
|
for role in new_user_data['roles']:
|
||||||
|
@ -57,13 +62,15 @@ class ExtendedLoginForm(LoginForm):
|
||||||
self.password.errors = ['Invalid password']
|
self.password.errors = ['Invalid password']
|
||||||
else:
|
else:
|
||||||
# this means there is no user with that email in the database
|
# this means there is no user with that email in the database
|
||||||
# we assume that the username was entered instead of an email and use that for authentication with LDAP
|
# we assume that the username was entered instead of an email and
|
||||||
|
# use that for authentication with LDAP
|
||||||
username = self.email.data
|
username = self.email.data
|
||||||
# try LDAP authorization and create a new user if it succeeds
|
# try LDAP authorization and create a new user if it succeeds
|
||||||
authorized, new_user_data = validate_ldap(username, self.password.data)
|
authorized, new_user_data = self.validate_ldap()
|
||||||
|
|
||||||
if authorized:
|
if authorized:
|
||||||
# if there was no user in the database before we create a new user
|
# if there was no user in the database before we create a new
|
||||||
|
# user
|
||||||
self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
|
self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
|
||||||
password=new_user_data['password'], roles=new_user_data['roles'])
|
password=new_user_data['password'], roles=new_user_data['roles'])
|
||||||
security.datastore.commit()
|
security.datastore.commit()
|
||||||
|
@ -74,7 +81,7 @@ class ExtendedLoginForm(LoginForm):
|
||||||
return authorized
|
return authorized
|
||||||
|
|
||||||
|
|
||||||
def validate_ldap(username, password):
|
def validate_ldap(self):
|
||||||
"""Validate the user and password through an LDAP server.
|
"""Validate the user and password through an LDAP server.
|
||||||
|
|
||||||
If the connection completes successfully the given user and password is authorized.
|
If the connection completes successfully the given user and password is authorized.
|
||||||
|
@ -84,8 +91,6 @@ def validate_ldap(username, password):
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
username : username for the LDAP server
|
|
||||||
password : password for the LDAP server
|
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
-------
|
-------
|
||||||
|
@ -93,14 +98,23 @@ def validate_ldap(username, password):
|
||||||
dict : dictionary with information about an authorized user (contains username, email, hashed password,
|
dict : dictionary with information about an authorized user (contains username, email, hashed password,
|
||||||
roles)
|
roles)
|
||||||
"""
|
"""
|
||||||
|
ldap_server = ldap3.Server(current_app.config['LDAP_URL'])
|
||||||
|
ldap_user_group = current_app.config['LDAP_USER_GROUP']
|
||||||
|
ldap_domain = current_app.config['LDAP_DOMAIN']
|
||||||
|
ldap_domain_ext = current_app.config['LDAP_DOMAIN_EXT']
|
||||||
|
|
||||||
|
username = self.email.data
|
||||||
|
password = self.password.data
|
||||||
|
|
||||||
try:
|
try:
|
||||||
con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
|
user = f"uid={username},ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
|
||||||
|
con = ldap3.Connection(ldap_server,
|
||||||
|
user=user,
|
||||||
password=password, auto_bind=True)
|
password=password, auto_bind=True)
|
||||||
except ldap3.core.exceptions.LDAPBindError:
|
except ldap3.core.exceptions.LDAPBindError as e:
|
||||||
# server reachable but user unauthorized -> fail
|
# server reachable but user unauthorized -> fail
|
||||||
return False, None
|
return False, None
|
||||||
except LDAPSocketOpenError:
|
except LDAPSocketOpenError as e:
|
||||||
# server not reachable -> fail (but will try authorization from local database later)
|
# server not reachable -> fail (but will try authorization from local database later)
|
||||||
return False, None
|
return False, None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -112,17 +126,18 @@ def validate_ldap(username, password):
|
||||||
new_user_data['username'] = username
|
new_user_data['username'] = username
|
||||||
new_user_data['password'] = hash_password(password)
|
new_user_data['password'] = hash_password(password)
|
||||||
new_user_data['roles'] = []
|
new_user_data['roles'] = []
|
||||||
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
|
search_base = f"ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
|
||||||
f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))',
|
search_filter = f"(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))"
|
||||||
|
lock_permission = con.search(search_base, search_filter,
|
||||||
attributes=ldap3.ALL_ATTRIBUTES)
|
attributes=ldap3.ALL_ATTRIBUTES)
|
||||||
authorized = True
|
|
||||||
if lock_permission:
|
if lock_permission:
|
||||||
new_user_data['email'] = con.entries[0].mail.value
|
new_user_data['email'] = con.entries[0].mail.value
|
||||||
else:
|
else:
|
||||||
authorized = False
|
return False, None
|
||||||
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
|
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))'
|
||||||
f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
|
token_granting_permission = con.search(search_base, search_filter)
|
||||||
if token_granting_permission:
|
if token_granting_permission:
|
||||||
new_user_data['roles'].append('admin')
|
new_user_data['roles'].append('admin')
|
||||||
|
|
||||||
return authorized, new_user_data
|
return True, new_user_data
|
||||||
|
|
|
@ -48,7 +48,12 @@ class DefaultConfig(object):
|
||||||
|
|
||||||
KEY_FILE = '/root/flask_keys'
|
KEY_FILE = '/root/flask_keys'
|
||||||
TOKEN_FILE = "/etc/door_tokens"
|
TOKEN_FILE = "/etc/door_tokens"
|
||||||
|
|
||||||
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
|
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
|
||||||
|
LDAP_USER_GROUP = 'Users'
|
||||||
|
LDAP_DOMAIN = 'imaginaerraum'
|
||||||
|
LDAP_DOMAIN_EXT = 'de'
|
||||||
|
|
||||||
NFC_SOCKET = "/tmp/nfc.sock"
|
NFC_SOCKET = "/tmp/nfc.sock"
|
||||||
LOG_FILE = "/var/log/webinterface.log"
|
LOG_FILE = "/var/log/webinterface.log"
|
||||||
NFC_LOG = "/var/log/nfc.log"
|
NFC_LOG = "/var/log/nfc.log"
|
||||||
|
|
|
@ -4,6 +4,7 @@ import pytest
|
||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
from flask_security.utils import find_user
|
from flask_security.utils import find_user
|
||||||
from imaginaerraum_door_admin.door_handle import DoorHandle
|
from imaginaerraum_door_admin.door_handle import DoorHandle
|
||||||
|
from imaginaerraum_door_admin.auth import ExtendedLoginForm
|
||||||
import re
|
import re
|
||||||
import secrets
|
import secrets
|
||||||
import pathlib
|
import pathlib
|
||||||
|
@ -53,16 +54,73 @@ def test_login_headless(client):
|
||||||
for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_ldap(client, mocker):
|
||||||
|
# mock ldap Connection object to simulate successful authentication
|
||||||
|
import ldap3
|
||||||
|
def init_success(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
def init_LDAPBindError(self, *args, **kwargs):
|
||||||
|
raise ldap3.core.exceptions.LDAPBindError()
|
||||||
|
def init_LDAPSocketOpenError(self, *args, **kwargs):
|
||||||
|
raise ldap3.core.exceptions.LDAPSocketOpenError()
|
||||||
|
def init_Exception(self, *args, **kwargs):
|
||||||
|
raise Exception()
|
||||||
|
def search_success(self, *args, **kwargs):
|
||||||
|
return True
|
||||||
|
def search_failure(self, *args, **kwargs):
|
||||||
|
return False
|
||||||
|
|
||||||
|
mocker.patch.object(ldap3.Connection, '__init__', init_success)
|
||||||
|
mocker.patch.object(ldap3.Connection, 'search', search_success)
|
||||||
|
mock_entries = mocker.MagicMock()
|
||||||
|
mock_entries[0].mail.value = 'user@example.com'
|
||||||
|
mocker.patch.object(ldap3.Connection, 'entries', mock_entries)
|
||||||
|
|
||||||
|
with client.application.app_context():
|
||||||
|
# test successful login
|
||||||
|
form = ExtendedLoginForm()
|
||||||
|
form.email.data = 'user'
|
||||||
|
form.password.data = 'password'
|
||||||
|
result = form.validate_ldap()
|
||||||
|
assert result[0]
|
||||||
|
assert result[1]['username'] == 'user'
|
||||||
|
assert result[1]['email'] == 'user@example.com'
|
||||||
|
assert result[1]['roles'] == ['admin']
|
||||||
|
|
||||||
|
# test failing ldap search
|
||||||
|
mocker.patch.object(ldap3.Connection, 'search', search_failure)
|
||||||
|
result = form.validate_ldap()
|
||||||
|
|
||||||
|
assert not result[0]
|
||||||
|
assert result[1] is None
|
||||||
|
|
||||||
|
# test some errors in ldap Connection and authentication
|
||||||
|
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPBindError)
|
||||||
|
result = form.validate_ldap()
|
||||||
|
assert not result[0]
|
||||||
|
assert result[1] is None
|
||||||
|
|
||||||
|
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPSocketOpenError)
|
||||||
|
result = form.validate_ldap()
|
||||||
|
assert not result[0]
|
||||||
|
assert result[1] is None
|
||||||
|
|
||||||
|
mocker.patch.object(ldap3.Connection, '__init__', init_Exception)
|
||||||
|
result = form.validate_ldap()
|
||||||
|
assert not result[0]
|
||||||
|
assert result[1] is None
|
||||||
|
|
||||||
|
|
||||||
def test_login_ldap(client, temp_user, mocker):
|
def test_login_ldap(client, temp_user, mocker):
|
||||||
# mock ldap validation for admin user
|
# mock ldap validation for admin user
|
||||||
def mock_validate(username, password):
|
def mock_validate(self):
|
||||||
auth = username == temp_user['username'] and password == temp_user['password']
|
auth = self.email.data == temp_user['username'] and self.password.data == temp_user['password']
|
||||||
user_data = {'username': temp_user['username'],
|
user_data = {'username': temp_user['username'],
|
||||||
'email': temp_user['email'],
|
'email': temp_user['email'],
|
||||||
'roles': ['admin'],
|
'roles': ['admin'],
|
||||||
'password': temp_user['password']}
|
'password': temp_user['password']}
|
||||||
return auth, user_data
|
return auth, user_data
|
||||||
mocker.patch('imaginaerraum_door_admin.auth.validate_ldap', mock_validate)
|
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
|
||||||
|
|
||||||
user = find_user(temp_user['username'])
|
user = find_user(temp_user['username'])
|
||||||
# remove local role so that ldap authentication is the default
|
# remove local role so that ldap authentication is the default
|
||||||
|
@ -84,14 +142,14 @@ def test_login_ldap(client, temp_user, mocker):
|
||||||
|
|
||||||
def test_login_ldap_new_user(client, mocker):
|
def test_login_ldap_new_user(client, mocker):
|
||||||
# mock ldap validation for admin user
|
# mock ldap validation for admin user
|
||||||
def mock_validate(username, password):
|
def mock_validate(self):
|
||||||
auth = True
|
auth = True
|
||||||
user_data = {'username': 'Balrog',
|
user_data = {'username': 'Balrog',
|
||||||
'email': 'balrog@moria.me',
|
'email': 'balrog@moria.me',
|
||||||
'roles': ['admin'],
|
'roles': ['admin'],
|
||||||
'password': 'youshallnotpass'}
|
'password': 'youshallnotpass'}
|
||||||
return auth, user_data
|
return auth, user_data
|
||||||
mocker.patch('imaginaerraum_door_admin.auth.validate_ldap', mock_validate)
|
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
|
||||||
|
|
||||||
# initially, the Balrog user should not exist
|
# initially, the Balrog user should not exist
|
||||||
user = find_user('Balrog')
|
user = find_user('Balrog')
|
||||||
|
|
Loading…
Reference in New Issue
Block a user