import logging from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask_security.models import fsqla_v2 as fsqla from flask_security import Security, SQLAlchemyUserDatastore, hash_password from email_validator import validate_email import ldap3 from pathlib import Path from .webapp import door_app from .door_handle import DoorHandle from .auth import ExtendedLoginForm security = Security() # create admin users (only if they don't exists already) def create_super_admins(app, db, user_datastore, logger): # setup user database when starting the app with app.app_context(): new_admin_data = [] if app.config['ADMIN_FILE'] is not None: if not Path(app.config['ADMIN_FILE']).exists(): logger.warning( f"Admin user creation file not found at {app.config['ADMIN_FILE']}") else: # store data for new admins in memory s.t. the file can be deleted afterwards with open(app.config['ADMIN_FILE']) as f: for i, line in enumerate(f.readlines()): if not line.strip().startswith('#'): try: user, email, pw = line.split() validate_email(email) new_admin_data.append( {'username': user, 'email': email, 'password': pw}) except Exception as e: print( f"Error while parsing line {i} in admin config file. Config file should contain lines of " f"' \\n'\n Exception: {e}\nAdmin account could not be created.") db.create_all() super_admin_role = user_datastore.find_or_create_role( 'super_admin') # root admin = can create other admins admin_role = user_datastore.find_or_create_role( 'admin') # 'normal' admin local_role = user_datastore.find_or_create_role( 'local') # LDAP user or local user for d in new_admin_data: if user_datastore.find_user(email=d['email'], username=d['username']) is None: roles = [super_admin_role, admin_role] if not d['password'] == 'LDAP': roles.append(local_role) logger.info( f"New super admin user created with username '{d['username']}' and email '{d['email']}', roles = {[r.name for r in roles]}") # create new admin (only if admin does not already exist) new_admin = user_datastore.create_user(email=d['email'], username=d[ 'username'], password=hash_password( d['password']), roles=roles) db.session.commit() def setup_logging(app): # set up logging for the web app logger = logging.getLogger('webapp') logger.setLevel(logging.INFO) if app.config['LOG_FILE'] is not None: ch = logging.FileHandler(app.config['LOG_FILE']) ch.setLevel(logging.INFO) else: # create console handler and set level to debug ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) # create formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s') # add formatter to ch ch.setFormatter(formatter) # add ch to logger logger.addHandler(ch) return logger def create_app(): app = Flask(__name__) app.config.from_object('imaginaerraum_door_admin.default_app_config.DefaultConfig') app.config.from_envvar('APPLICATION_SETTINGS') logger = setup_logging(app) # do some checks for file existence etc. try: with open(app.config['KEY_FILE']) as f: data = f.readlines() if 'SECRET_KEY' in data[0]: secret_key = data[0].split()[-1] else: raise Exception("Could not read SECURITY_PASSWORD_SALT") if 'SECURITY_PASSWORD_SALT' in data[1]: security_password_salt = data[1].split()[-1] else: raise Exception("Could not read SECURITY_PASSWORD_SALT") except Exception as e: logger.warning( f"Flask keys could not be read from file at {Path(app.config['KEY_FILE']).absolute()}. Exception: {e}. Using default values instead.") secret_key = 'Q7PJu2fg2jabYwP-Psop6c6f2G4' security_password_salt = '10036796768252925167749545152988277953' if Path(app.config['TEMPLATE_FOLDER']).is_absolute(): if not Path(app.config['TEMPLATE_FOLDER']).exists(): logger.error( f"Flask template folder not found at {Path(app.config['TEMPLATE_FOLDER']).absolute()}") else: if not (Path(__file__).parent / app.config['TEMPLATE_FOLDER']).exists(): logger.error( f"Flask template folder not found at {(Path(__file__).parent / app.config['TEMPLATE_FOLDER']).absolute()}") if Path(app.config['STATIC_FOLDER']).is_absolute(): if not Path(app.config['STATIC_FOLDER']).exists(): logger.error( f"Flask static folder not found at {Path(app.config['STATIC_FOLDER']).absolute()}") else: if not (Path(__file__).parent / app.config['STATIC_FOLDER']).exists(): logger.error( f"Flask static folder not found at {(Path(__file__).parent / app.config['STATIC_FOLDER']).absolute()}") if not Path(app.config['TOKEN_FILE']).exists(): logger.warning( f"Token file not found at {Path(app.config['TOKEN_FILE']).absolute()}") # create door objects which provides access to the token file and current door state via MQTT app.door = DoorHandle(token_file=app.config['TOKEN_FILE'], mqtt_host=app.config['MQTT_HOST'], nfc_socket=app.config['NFC_SOCKET'], logger=logger) # Mail Config #mail = Mail(app) from . import webapp #app.register_blueprint # Create database connection object db = SQLAlchemy(app) # Define models fsqla.FsModels.set_db_info(db) class Role(db.Model, fsqla.FsRoleMixin): pass class User(db.Model, fsqla.FsUserMixin): pass app.register_blueprint(door_app) ldap_server = ldap3.Server(app.config['LDAP_URL']) # Setup Flask-Security user_datastore = SQLAlchemyUserDatastore(db, User, Role) security.init_app(app, user_datastore, login_form=ExtendedLoginForm) create_super_admins(app, db, user_datastore, logger) return app