DoorAdmin/imaginaerraum_door_admin/auth.py

164 lines
6.5 KiB
Python
Raw Normal View History

from wtforms.fields import StringField, BooleanField
from flask import current_app
from flask_security import hash_password
from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import lookup_identity
2022-01-30 20:56:11 +00:00
from flask_security.models import fsqla_v2 as fsqla
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
2022-01-30 20:56:11 +00:00
from imaginaerraum_door_admin import db, security
# Define models
fsqla.FsModels.set_db_info(db)
class Role(db.Model, fsqla.FsRoleMixin):
pass
class User(db.Model, fsqla.FsUserMixin):
pass
class ExtendedLoginForm(LoginForm):
email = StringField('Benutzername oder E-Mail', [Required()])
password = PasswordField('Passwort', [Required()])
remember = BooleanField('Login merken?')
def validate(self):
# search for user in the current database
user = lookup_identity(self.email.data)
if user is not None:
2022-02-01 22:03:06 +00:00
# if a user is found we check if it is associated with LDAP or with
# the local database
if user.has_role('local'):
# try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate()
if authorized:
2022-09-12 20:20:40 +00:00
current_app.logger.info(
f"User with credentials '{self.email.data}' authorized "
f"through local database"
)
else:
# run LDAP authorization
2022-02-01 22:03:06 +00:00
# if the authorization succeeds we also get the new_user_data
# dict which contains information about
# the user's permissions etc.
2022-02-01 22:03:06 +00:00
authorized, new_user_data = self.validate_ldap()
if authorized:
2022-02-01 22:03:06 +00:00
current_app.logger.info(
f"User with credentials '{self.email.data}' authorized "
f"through LDAP")
# update permissions and password/email to stay up to date
# for login with no network connection
user.email = new_user_data['email']
user.password = new_user_data['password']
for role in new_user_data['roles']:
2022-01-30 20:56:11 +00:00
security.datastore.add_role_to_user(user, role)
security.datastore.commit()
self.user = user
else:
self.password.errors = ['Invalid password']
else:
# this means there is no user with that email in the database
2022-02-01 22:03:06 +00:00
# we assume that the username was entered instead of an email and
# use that for authentication with LDAP
username = self.email.data
# try LDAP authorization and create a new user if it succeeds
2022-02-01 22:03:06 +00:00
authorized, new_user_data = self.validate_ldap()
if authorized:
2022-02-01 22:03:06 +00:00
# if there was no user in the database before we create a new
# user
2022-09-12 20:20:40 +00:00
self.user = security.datastore.create_user(
username=new_user_data['username'],
email=new_user_data['email'],
password=new_user_data['password'],
roles=new_user_data['roles']
)
2022-01-30 20:56:11 +00:00
security.datastore.commit()
2022-09-12 20:20:40 +00:00
current_app.logger.info(
f"New admin user '{new_user_data['username']} "
f"<{new_user_data['email']}>' created after successful "
f"LDAP authorization"
)
# if any of the authorization methods is successful we authorize
# the user
return authorized
2022-02-01 22:03:06 +00:00
def validate_ldap(self):
"""Validate the user and password through an LDAP server.
2022-09-12 20:20:40 +00:00
If the connection completes successfully the given user and password
is authorized. Then the permissions and additional information of the
user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update
the entry for the user in the local database.
2022-02-01 22:03:06 +00:00
Parameters
----------
Returns
-------
2022-09-12 20:20:40 +00:00
bool : result of the authorization process (True = success,
False = failure)
dict : dictionary with information about an authorized user
(contains username, email, hashed password, roles)
2022-02-01 22:03:06 +00:00
"""
ldap_server = ldap3.Server(current_app.config['LDAP_URL'])
ldap_user_group = current_app.config['LDAP_USER_GROUP']
ldap_domain = current_app.config['LDAP_DOMAIN']
ldap_domain_ext = current_app.config['LDAP_DOMAIN_EXT']
username = self.email.data
password = self.password.data
try:
2022-09-12 20:20:40 +00:00
user = f"uid={username},ou={ldap_user_group},dc={ldap_domain}," \
f"dc={ldap_domain_ext}"
con = ldap3.Connection(
ldap_server,
user=user,
password=password,
auto_bind=True
)
2022-02-01 22:03:06 +00:00
except ldap3.core.exceptions.LDAPBindError as e:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError as e:
2022-09-12 20:20:40 +00:00
# server not reachable -> fail (but will try authorization from
# local database later)
2022-02-01 22:03:06 +00:00
return False, None
except Exception as e:
# for other Exceptions we just fail
return False, None
# get user data and permissions from LDAP server
new_user_data = {}
new_user_data['username'] = username
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
search_base = f"ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
2022-09-12 20:20:40 +00:00
search_filter = f"(&(uid={username})(memberof=cn=Keyholders," \
f"ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))"
lock_permission = con.search(
search_base, search_filter, attributes=ldap3.ALL_ATTRIBUTES
)
2022-02-01 22:03:06 +00:00
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
return False, None
2022-09-12 20:20:40 +00:00
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,' \
f'dc={ldap_domain},dc={ldap_domain_ext}))'
2022-02-01 22:03:06 +00:00
token_granting_permission = con.search(search_base, search_filter)
if token_granting_permission:
new_user_data['roles'].append('admin')
return True, new_user_data