Compare commits

..

No commits in common. "2879a694457d77f7060413f805ee70d49ae18e32" and "master" have entirely different histories.

15 changed files with 705 additions and 955 deletions

View File

@ -1,9 +1 @@
Flask-based web interface for user token adminstration of our hackerspace's door lock.
# Development
```shell
cd tests
export APPLICATION_SETTINGS=/home/simon/imaginaerraum/door-lock/webinterface/tests/debug_app_config.py
pytest --cov=../imaginaerraum_door_admin --cov-report=html --cov-report=term
```

View File

@ -1,170 +0,0 @@
import logging
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_security.models import fsqla_v2 as fsqla
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
from email_validator import validate_email
import ldap3
from pathlib import Path
#from .webapp import door_app
from .door_handle import DoorHandle
from .auth import ExtendedLoginForm
security = Security()
db = SQLAlchemy()
# create admin users (only if they don't exists already)
def create_super_admins(app, db, user_datastore, logger):
# setup user database when starting the app
with app.app_context():
new_admin_data = []
if app.config['ADMIN_FILE'] is not None:
if not Path(app.config['ADMIN_FILE']).exists():
logger.warning(
f"Admin user creation file not found at {app.config['ADMIN_FILE']}")
else:
# store data for new admins in memory s.t. the file can be deleted afterwards
with open(app.config['ADMIN_FILE']) as f:
for i, line in enumerate(f.readlines()):
if not line.strip().startswith('#'):
try:
user, email, pw = line.split()
validate_email(email)
new_admin_data.append(
{'username': user, 'email': email,
'password': pw})
except Exception as e:
print(
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created.")
db.create_all()
super_admin_role = user_datastore.find_or_create_role(
'super_admin') # root admin = can create other admins
admin_role = user_datastore.find_or_create_role(
'admin') # 'normal' admin
local_role = user_datastore.find_or_create_role(
'local') # LDAP user or local user
for d in new_admin_data:
if user_datastore.find_user(email=d['email'],
username=d['username']) is None:
roles = [super_admin_role, admin_role]
if not d['password'] == 'LDAP':
roles.append(local_role)
logger.info(
f"New super admin user created with username '{d['username']}' and email '{d['email']}', roles = {[r.name for r in roles]}")
# create new admin (only if admin does not already exist)
new_admin = user_datastore.create_user(email=d['email'],
username=d[
'username'],
password=hash_password(
d['password']),
roles=roles)
db.session.commit()
def setup_logging(app):
# set up logging for the web app
logger = logging.getLogger('webapp')
logger.setLevel(logging.INFO)
if app.config['LOG_FILE'] is not None:
ch = logging.FileHandler(app.config['LOG_FILE'])
ch.setLevel(logging.INFO)
else:
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
return logger
def create_app():
app = Flask(__name__)
app.config.from_object('imaginaerraum_door_admin.default_app_config.DefaultConfig')
app.config.from_envvar('APPLICATION_SETTINGS')
logger = setup_logging(app)
# do some checks for file existence etc.
try:
with open(app.config['KEY_FILE']) as f:
data = f.readlines()
if 'SECRET_KEY' in data[0]:
secret_key = data[0].split()[-1]
else:
raise Exception("Could not read SECURITY_PASSWORD_SALT")
if 'SECURITY_PASSWORD_SALT' in data[1]:
security_password_salt = data[1].split()[-1]
else:
raise Exception("Could not read SECURITY_PASSWORD_SALT")
except Exception as e:
logger.warning(
f"Flask keys could not be read from file at {Path(app.config['KEY_FILE']).absolute()}. Exception: {e}. Using default values instead.")
secret_key = 'Q7PJu2fg2jabYwP-Psop6c6f2G4'
security_password_salt = '10036796768252925167749545152988277953'
if Path(app.config['TEMPLATE_FOLDER']).is_absolute():
if not Path(app.config['TEMPLATE_FOLDER']).exists():
logger.error(
f"Flask template folder not found at {Path(app.config['TEMPLATE_FOLDER']).absolute()}")
else:
if not (Path(__file__).parent / app.config['TEMPLATE_FOLDER']).exists():
logger.error(
f"Flask template folder not found at {(Path(__file__).parent / app.config['TEMPLATE_FOLDER']).absolute()}")
if Path(app.config['STATIC_FOLDER']).is_absolute():
if not Path(app.config['STATIC_FOLDER']).exists():
logger.error(
f"Flask static folder not found at {Path(app.config['STATIC_FOLDER']).absolute()}")
else:
if not (Path(__file__).parent / app.config['STATIC_FOLDER']).exists():
logger.error(
f"Flask static folder not found at {(Path(__file__).parent / app.config['STATIC_FOLDER']).absolute()}")
if not Path(app.config['TOKEN_FILE']).exists():
logger.warning(
f"Token file not found at {Path(app.config['TOKEN_FILE']).absolute()}")
# create door objects which provides access to the token file and current door state via MQTT
app.door = DoorHandle(token_file=app.config['TOKEN_FILE'], mqtt_host=app.config['MQTT_HOST'],
nfc_socket=app.config['NFC_SOCKET'],
logger=logger)
# Mail Config
#mail = Mail(app)
# Create database connection object
db.init_app(app)
# Define models
fsqla.FsModels.set_db_info(db)
class Role(db.Model, fsqla.FsRoleMixin):
pass
class User(db.Model, fsqla.FsUserMixin):
pass
from . webapp import door_app
app.register_blueprint(door_app)
ldap_server = ldap3.Server(app.config['LDAP_URL'])
# Setup Flask-Security
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
create_super_admins(app, db, user_datastore, logger)
return app

View File

@ -1,112 +0,0 @@
from wtforms.fields import StringField, BooleanField
from flask import current_app
from flask_security import hash_password
from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import find_user
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
class ExtendedLoginForm(LoginForm):
email = StringField('Benutzername oder E-Mail', [Required()])
password = PasswordField('Passwort', [Required()])
remember = BooleanField('Login merken?')
def validate(self):
# search for user in the current database
user = find_user(self.email.data)
if user is not None:
# if a user is found we check if it is associated with LDAP or with the local database
if user.has_role('local'):
# try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate()
if authorized:
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database")
else:
# run LDAP authorization
# if the authorization succeeds we also get the new_user_data dict which contains information about
# the user's permissions etc.
authorized, new_user_data = validate_ldap(user.username, self.password.data)
if authorized:
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through LDAP")
# update permissions and password/email to stay up to date for login with no network connection
user.email = new_user_data['email']
user.password = new_user_data['password']
for role in new_user_data['roles']:
user_datastore.add_role_to_user(user, role)
user_datastore.commit()
self.user = user
else:
self.password.errors = ['Invalid password']
else:
# this means there is no user with that email in the database
# we assume that the username was entered instead of an email and use that for authentication with LDAP
username = self.email.data
# try LDAP authorization and create a new user if it succeeds
authorized, new_user_data = validate_ldap(username, self.password.data)
if authorized:
# if there was no user in the database before we create a new user
self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
password=new_user_data['password'], roles=new_user_data['roles'])
user_datastore.commit()
current_app.logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
" successful LDAP authorization")
# if any of the authorization methods is successful we authorize the user
return authorized
def validate_ldap(username, password):
"""Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized.
Then the permissions and additional information of the user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update the entry for the user in the local
database.
Parameters
----------
username : username for the LDAP server
password : password for the LDAP server
Returns
-------
bool : result of the authorization process (True = success, False = failure)
dict : dictionary with information about an authorized user (contains username, email, hashed password,
roles)
"""
try:
con = ldap3.Connection(ldap_server, user=f"uid={username},ou=Users,dc=imaginaerraum,dc=de",
password=password, auto_bind=True)
except ldap3.core.exceptions.LDAPBindError:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError:
# server not reachable -> fail (but will try authorization from local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
return False, None
# get user data and permissions from LDAP server
new_user_data = {}
new_user_data['username'] = username
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
lock_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc=imaginaerraum,dc=de))',
attributes=ldap3.ALL_ATTRIBUTES)
authorized = True
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
authorized = False
token_granting_permission = con.search('ou=Users,dc=imaginaerraum,dc=de',
f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc=imaginaerraum,dc=de))')
if token_granting_permission:
new_user_data['roles'].append('admin')
return authorized, new_user_data

View File

@ -1,55 +0,0 @@
print("loading default config")
import bleach
from flask_security import uia_email_mapper
def uia_username_mapper(identity):
# we allow pretty much anything - but we bleach it.
return bleach.clean(identity, strip=True)
class DefaultConfig(object):
DEBUG = False
SECRET_KEY = 'supersecret'
TEMPLATE_FOLDER = 'templates'
STATIC_FOLDER = 'static'
SECURITY_REGISTERABLE = False
SECURITY_CHANGEABLE = True
SECURITY_RECOVERABLE = True
SECURITY_SEND_PASSWORD_CHANGE_EMAIL = False
SECURITY_POST_LOGIN_VIEW = '/'
SECURITY_EMAIL_SUBJECT_PASSWORD_RESET = 'Passwort zurücksetzen'
SECURITY_EMAIL_SUBJECT_PASSWORD_NOTICE = 'Passwort wurde zurückgesetzt'
SECURITY_PASSWORD_LENGTH_MIN = 10
SECURITY_USER_IDENTITY_ATTRIBUTES = [
{"email": {"mapper": uia_email_mapper, "case_insensitive": True}},
{"username": {"mapper": uia_username_mapper}}
]
# mail configuration
MAIL_SERVER = ''
MAIL_PORT = 465
MAIL_USE_SSL = True
MAIL_USERNAME = ''
MAIL_PASSWORD = ''
MAIL_DEFAULT_SENDER = ''
SQLALCHEMY_DATABASE_URI = 'sqlite:///admin.db'
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
# underlying engine. This option makes sure that DB connections from the
# pool are still valid. Important for entire application since
# many DBaaS options automatically close idle connections.
SQLALCHEMY_ENGINE_OPTIONS = {
"pool_pre_ping": True,
}
KEY_FILE = '/root/flask_keys'
TOKEN_FILE = "/etc/door_tokens"
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
NFC_SOCKET = "/tmp/nfc.sock"
LOG_FILE = "/var/log/webinterface.log"
NFC_LOG = "/var/log/nfc.log"
MQTT_HOST = '10.10.21.2'

View File

@ -27,15 +27,15 @@
{% endfor %}
<td>
{% if not data['super_admin'] %}
<a href="{{ url_for('door_app.admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
<a href="{{ url_for('door_app.delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
<a href="{{ url_for('admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
<a href="{{ url_for('delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
{% endif %}
{% if data['admin'] %}
{% if not data['super_admin'] %}
<a href="{{ url_for('door_app.demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a>
<a href="{{ url_for('demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a>
{% endif %}
{% else %}
<a href="{{ url_for('door_app.promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a>
<a href="{{ url_for('promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a>
{% endif %}
</td>
</tr>
@ -68,14 +68,14 @@
</div>
<div class="p-2 bg-light border">
<h3>Nutzerdaten sichern:</h3>
<form action="{{ url_for('door_app.backup_user_datastore') }}" method="get">
<form action="{{ url_for('backup_user_datastore') }}" method="get">
<input type="submit" value="Download">
</form>
</div>
<div class="p-2 bg-light border">
<h3>Nutzerdaten wiederherstellen:</h3>
<form action="{{ url_for('door_app.restore_user_datastore') }}" method=post enctype=multipart/form-data>
<form action="{{ url_for('restore_user_datastore') }}" method=post enctype=multipart/form-data>
<input type=file name=file>
<input type=submit value="Abschicken">
</form>

View File

@ -13,7 +13,7 @@
<body>
<nav class="navbar navbar-expand-lg navbar-light bg-light">
<div class="container-fluid">
<a class="navbar-brand" href="{{ url_for('door_app.door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
<a class="navbar-brand" href="{{ url_for('door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent"
aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
@ -29,10 +29,10 @@
Tokens
</a>
<div class="dropdown-menu" aria-labelledby="navbarDropdown">
<a class="dropdown-item" href="{{ url_for('door_app.register') }}">Token Registrierung</a>
<a class="dropdown-item" href="{{ url_for('door_app.list_tokens') }}">Token Übersicht</a>
<a class="dropdown-item" href="{{ url_for('register') }}">Token Registrierung</a>
<a class="dropdown-item" href="{{ url_for('list_tokens') }}">Token Übersicht</a>
{% if current_user.has_role('super_admin') %}
<a class="dropdown-item" href="{{ url_for('door_app.token_log') }}">Token Log</a>
<a class="dropdown-item" href="{{ url_for('token_log') }}">Token Log</a>
{% endif %}
</div>
</li>
@ -40,7 +40,7 @@
{% if current_user.has_role('super_admin') %}
<li class="nav-item">
<a class="nav-link" href="{{ url_for('door_app.manage_admins') }}">Benutzer verwalten</a>
<a class="nav-link" href="{{ url_for('manage_admins') }}">Benutzer verwalten</a>
</li>
{% endif %}

View File

@ -24,11 +24,11 @@
{% if current_user.is_authenticated %}
<div class="row">
<div class="col-3">
<a href="{{ url_for('door_app.open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
<a href="{{ url_for('open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
</div>
<div class="col-1"></div>
<div class="col-3">
<a href="{{ url_for('door_app.close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
<a href="{{ url_for('close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
</div>
<div class="col-5"></div>
</div>

View File

@ -23,7 +23,7 @@
und diesen zustimmt.
</li>
<li>Achtung: Der Token funktioniert nicht sofort, sondern muss erst explizit aktiviert werden!
Dazu in der <a href="{{ url_for('door_app.list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol
Dazu in der <a href="{{ url_for('list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol
(<img src="static/edit.png" title="Editieren" alt="Edit">) klicken und den Haken bei "Aktiv?" setzen.
</li>
<li>Jetzt kann der Token verwendet werden.</li>

View File

@ -29,9 +29,9 @@
<td>{{ data[field] if data[field] }}</td>
{% endfor %}
<td>
<a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
<a href="{{ url_for('door_app.deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a>
<a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
<a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
<a href="{{ url_for('deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a>
<a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
</td>
</tr>
{% endfor %}
@ -47,14 +47,14 @@
<td>{{ data[field] if data[field] }}</td>
{% endfor %}
<td>
<a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
<a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
<a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
<a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
</td>
</tr>
{% endfor %}
</tbody>
</table>
<form action="{{ url_for('door_app.backup_tokens') }}" method="get">
<form action="{{ url_for('backup_tokens') }}" method="get">
<input type="submit" value="Token Daten sichern">
</form>
{% endblock %}

File diff suppressed because it is too large Load Diff

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
flask~=1.1.2
Flask-Security-Too~=4.0.0
WTForms~=2.3.3
paho-mqtt~=1.5.1
bleach~=3.3.0

View File

@ -15,35 +15,3 @@ classifiers =
Programming Language :: Python :: 3
License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Operating System :: OS Independent
[options]
python_requires = >=3.8
install_requires =
bleach
Flask
Flask-Mail
Flask-Security-Too
Flask-SQLAlchemy
Flask-WTF
email_validator
paho-mqtt
ldap3
wtforms
include_package_data = True
packages = find:
setup_requires =
wheel
tests_require = pytest>=3
zip_safe = False
scripts= bin/launch_webadmin
[options.extras_require]
dev =
pytest
pytest-cov
pytest-flask
pytest-mock
flake8
selenium
beautifulsoup4

View File

@ -1,3 +1,17 @@
from setuptools import setup
setup()
setup(install_requires=[
'bleach',
'Flask',
'Flask-Mail',
'Flask-Security-Too',
'Flask-SQLAlchemy',
'Flask-WTF',
'email_validator',
'paho-mqtt',
'ldap3',
],
include_package_data=True,
scripts=['bin/launch_webadmin'],
packages=['imaginaerraum_door_admin'],
zip_safe=False)

View File

@ -1,21 +0,0 @@
import pytest
from selenium.webdriver import Chrome
from imaginaerraum_door_admin import create_app
@pytest.fixture(scope='session')
def app():
"""Fixture to launch the webapp"""
app = create_app()
return app
@pytest.fixture
def browser():
"""Fixture for a selenium browser to access the webapp."""
driver = Chrome()
driver.implicitly_wait(10)
yield driver
driver.quit()

View File

@ -1,119 +0,0 @@
import pytest
from bs4 import BeautifulSoup
from imaginaerraum_door_admin.door_handle import DoorHandle
import re
def test_login(browser, live_server):
response = browser.get(f'http://localhost:{live_server.port}')
assert '<h1>Space Zugangsverwaltung</h1>' in browser.page_source
response = browser.get(f'http://localhost:{live_server.port}/login')
email_form = browser.find_element_by_id('email').send_keys('gandalf@shire.me')
password_form = browser.find_element_by_id('password').send_keys('shadowfax')
submit_button = browser.find_element_by_id('submit').click()
assert 'Tür öffnen' in browser.page_source
def extract_csrf_token(response):
soup = BeautifulSoup(response.data)
csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value']
return csrf_token
def headless_login(client, user='gandalf@shire.me', password='shadowfax'):
# extract csrf token from the login page source
response = client.get('/login')
csrf_token = extract_csrf_token(response)
# send login information
payload = {
'csrf_token': csrf_token,
'email': user,
'password': password
}
return client.post('/login', data=payload, follow_redirects=True)
def test_login_headless(client):
response = headless_login(client)
soup = BeautifulSoup(response.data)
# make sure login succeeded -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
@pytest.fixture
def client_authenticated(client):
# log in using admin account for testing
headless_login(client)
yield client
def test_open_door_button(client_authenticated, mocker):
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.open_door')
# visit route for open
client_authenticated.get('/open')
# make sure the open method was called
DoorHandle.open_door.assert_called_once_with(user='gandalf')
def test_close_door_button(client_authenticated, mocker):
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.close_door')
# visit route for open
client_authenticated.get('/close')
# make sure the open method was called
DoorHandle.close_door.assert_called_once_with(user='gandalf')
def test_manage_admins(client_authenticated):
# visit admin management page
response = client_authenticated.get('/manage_admins')
assert "Nutzer Übersicht" in response.data.decode()
assert "gandalf" in response.data.decode()
assert "gandalf@shire.me" in response.data.decode()
def test_create_admin(client_authenticated):
# visit admin management page
response = client_authenticated.get('/manage_admins')
csrf_token = extract_csrf_token(response)
# post data for creating a new admin
payload = {'name': 'bilbo',
'email': 'bilbo@shire.me',
'csrf_token': csrf_token}
response = client_authenticated.post('/manage_admins', data=payload,
follow_redirects=True)
# after the new admin user is created, we should have been redirected to the
# /manage_admin page. there, the password for login is displayed
# we test if the newly created user can log in with that password
# extract password displayed on the page
match = re.search('Passwort (?P<password>.*) um', response.data.decode())
assert match is not None
extracted_password = match['password']
# log out current user
response = client_authenticated.get('/logout')
# try to log in new user using the extracted password
response = headless_login(client_authenticated, user='bilbo',
password=extracted_password)
# - see if it works
soup = BeautifulSoup(response.data)
# make sure login succeeded
# -> username should be displayed
assert 'Benutzer <span>bilbo</span>' in soup.decode()
# -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])