prevent password changing for ldap users (they get redirected to the ldap self service instead)
This commit is contained in:
parent
f48f78997c
commit
734bed2092
|
@ -10,6 +10,7 @@ from flask_security import Security, SQLAlchemyUserDatastore, auth_required, has
|
||||||
from flask_security.models import fsqla_v2 as fsqla
|
from flask_security.models import fsqla_v2 as fsqla
|
||||||
from flask_security.forms import LoginForm, Required, PasswordField
|
from flask_security.forms import LoginForm, Required, PasswordField
|
||||||
from flask_security.utils import find_user
|
from flask_security.utils import find_user
|
||||||
|
from flask_security.views import change_password
|
||||||
from flask_mail import Mail
|
from flask_mail import Mail
|
||||||
from email_validator import validate_email
|
from email_validator import validate_email
|
||||||
|
|
||||||
|
@ -162,7 +163,7 @@ def create_application(config):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class User(db.Model, fsqla.FsUserMixin):
|
class User(db.Model, fsqla.FsUserMixin):
|
||||||
username = db.Column(db.String(255))
|
pass
|
||||||
|
|
||||||
# LDAP
|
# LDAP
|
||||||
ldap_server = ldap3.Server(config.ldap_url)
|
ldap_server = ldap3.Server(config.ldap_url)
|
||||||
|
@ -228,45 +229,61 @@ def create_application(config):
|
||||||
# search for user in the current database
|
# search for user in the current database
|
||||||
user = find_user(self.email.data)
|
user = find_user(self.email.data)
|
||||||
if user is not None:
|
if user is not None:
|
||||||
# if a user is found we use that username for LDAP authentication
|
# if a user is found we check if it is associated with LDAP or with the local database
|
||||||
username = user.username
|
if user.has_role('local'):
|
||||||
|
# try authorizing locally using Flask security user datastore
|
||||||
|
authorized = super(ExtendedLoginForm, self).validate()
|
||||||
|
|
||||||
|
if authorized:
|
||||||
|
logger.info(f"User with credentials '{self.email.data}' authorized through local database")
|
||||||
|
else:
|
||||||
|
# run LDAP authorization
|
||||||
|
# if the authorization succeeds we also get the new_user_data dict which contains information about
|
||||||
|
# the user's permissions etc.
|
||||||
|
authorized, new_user_data = validate_ldap(user.username, self.password.data)
|
||||||
|
|
||||||
|
if authorized:
|
||||||
|
logger.info(f"User with credentials '{self.email.data}' authorized through LDAP")
|
||||||
|
# update permissions and password/email to stay up to date for login with no network connection
|
||||||
|
user.email = new_user_data['email']
|
||||||
|
user.password = new_user_data['password']
|
||||||
|
for role in new_user_data['roles']:
|
||||||
|
user_datastore.add_role_to_user(user, role)
|
||||||
|
user_datastore.commit()
|
||||||
|
self.user = user
|
||||||
|
else:
|
||||||
|
self.password.errors = ['Invalid password']
|
||||||
else:
|
else:
|
||||||
# this means there is no user with that email in the database
|
# this means there is no user with that email in the database
|
||||||
# we assume that the username was entered instead of an email and use that for authentication with LDAP
|
# we assume that the username was entered instead of an email and use that for authentication with LDAP
|
||||||
username = self.email.data
|
username = self.email.data
|
||||||
|
# try LDAP authorization and create a new user if it succeeds
|
||||||
# run LDAP authorization
|
authorized, new_user_data = validate_ldap(username, self.password.data)
|
||||||
# if the authorization succeeds we also get the new_user_data dict which contains information about the
|
|
||||||
# user's permissions etc.
|
|
||||||
authorized, new_user_data = validate_ldap(username, self.password.data)
|
|
||||||
|
|
||||||
if authorized:
|
|
||||||
if user is None:
|
|
||||||
# if there was no user in the database before we create a new user
|
|
||||||
user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
|
|
||||||
password=new_user_data['password'], roles=new_user_data['roles'])
|
|
||||||
logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
|
|
||||||
" successful LDAP authorization")
|
|
||||||
else: # for existing users we update permissions and password/email to stay up to date
|
|
||||||
user.email = new_user_data['email']
|
|
||||||
user.password = new_user_data['password']
|
|
||||||
for role in new_user_data['roles']:
|
|
||||||
user_datastore.add_role_to_user(user, role)
|
|
||||||
user_datastore.commit()
|
|
||||||
|
|
||||||
self.user = find_user(self.email.data)
|
|
||||||
logger.info(f"User with credentials '{self.email.data}' authorized through LDAP")
|
|
||||||
|
|
||||||
if not authorized:
|
|
||||||
# try authorizing locally using Flask security user datastore
|
|
||||||
authorized = super(ExtendedLoginForm, self).validate()
|
|
||||||
|
|
||||||
if authorized:
|
if authorized:
|
||||||
logger.info(f"User with credentials '{self.email.data}' authorized through local database")
|
# if there was no user in the database before we create a new user
|
||||||
|
self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
|
||||||
|
password=new_user_data['password'], roles=new_user_data['roles'])
|
||||||
|
user_datastore.commit()
|
||||||
|
logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
|
||||||
|
" successful LDAP authorization")
|
||||||
|
|
||||||
# if any of the authorization methods is successful we authorize the user
|
# if any of the authorization methods is successful we authorize the user
|
||||||
return authorized
|
return authorized
|
||||||
|
|
||||||
|
# we override the change_password view from flask security to only allow local users to change their passwords
|
||||||
|
# LDAP users should use the LDAP self service for changing passwords
|
||||||
|
# this route needs to be defined before the Flask Security setup
|
||||||
|
@app.route('/change', methods=['GET', 'POST'])
|
||||||
|
@auth_required()
|
||||||
|
def change_pw():
|
||||||
|
if current_user.has_role('local'):
|
||||||
|
# local users can change their password
|
||||||
|
return change_password()
|
||||||
|
else:
|
||||||
|
# LDAP users get redirected to the LDAP self service
|
||||||
|
return redirect('https://ldap.imaginaerraum.de/')
|
||||||
|
|
||||||
# Setup Flask-Security
|
# Setup Flask-Security
|
||||||
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
||||||
security = Security(app, user_datastore, login_form=ExtendedLoginForm)
|
security = Security(app, user_datastore, login_form=ExtendedLoginForm)
|
||||||
|
@ -686,29 +703,35 @@ def create_application(config):
|
||||||
else:
|
else:
|
||||||
# store data for new admins in memory s.t. the file can be deleted afterwards
|
# store data for new admins in memory s.t. the file can be deleted afterwards
|
||||||
with open(config.admin_file) as f:
|
with open(config.admin_file) as f:
|
||||||
for i, d in enumerate(f.readlines()):
|
for i, line in enumerate(f.readlines()):
|
||||||
try:
|
if not line.strip().startswith('#'):
|
||||||
user, email, pw = d.split()
|
try:
|
||||||
validate_email(email)
|
user, email, pw = line.split()
|
||||||
new_admin_data.append({'username': user, 'email': email, 'password': pw})
|
validate_email(email)
|
||||||
except Exception as e:
|
new_admin_data.append({'username': user, 'email': email, 'password': pw})
|
||||||
print(
|
except Exception as e:
|
||||||
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
|
print(
|
||||||
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
|
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
|
||||||
|
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
|
||||||
|
|
||||||
# create admin users (only if they don't exists already)
|
# create admin users (only if they don't exists already)
|
||||||
def create_super_admins(new_admin_data):
|
def create_super_admins(new_admin_data):
|
||||||
db.create_all()
|
db.create_all()
|
||||||
super_admin_role = user_datastore.find_or_create_role('super_admin') # root admin = can create other admins
|
super_admin_role = user_datastore.find_or_create_role('super_admin') # root admin = can create other admins
|
||||||
admin_role = user_datastore.find_or_create_role('admin') # 'normal' admin
|
admin_role = user_datastore.find_or_create_role('admin') # 'normal' admin
|
||||||
|
local_role = user_datastore.find_or_create_role('local') # LDAP user or local user
|
||||||
|
|
||||||
for d in new_admin_data:
|
for d in new_admin_data:
|
||||||
if user_datastore.find_user(email=d['email'], username=d['username']) is None:
|
if user_datastore.find_user(email=d['email'], username=d['username']) is None:
|
||||||
logger.info(f"New super admin user created with username '{d['username']}' and email '{d['email']}'")
|
roles = [super_admin_role, admin_role]
|
||||||
|
if not d['password'] == 'LDAP':
|
||||||
|
roles.append(local_role)
|
||||||
|
logger.info(f"New super admin user created with username '{d['username']}' and email '{d['email']}', roles = {[r.name for r in roles]}")
|
||||||
|
|
||||||
# create new admin (only if admin does not already exist)
|
# create new admin (only if admin does not already exist)
|
||||||
new_admin = user_datastore.create_user(email=d['email'], username=d['username'],
|
new_admin = user_datastore.create_user(email=d['email'], username=d['username'],
|
||||||
password=hash_password(d['password']),
|
password=hash_password(d['password']),
|
||||||
roles=[super_admin_role, admin_role])
|
roles=roles)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
create_super_admins(new_admin_data)
|
create_super_admins(new_admin_data)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user