Compare commits

..

7 Commits

5 changed files with 361 additions and 84 deletions

View File

@ -1,7 +1,6 @@
import logging import logging
from flask import Flask from flask import Flask
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
from flask_security.models import fsqla_v2 as fsqla
from flask_security import Security, SQLAlchemyUserDatastore, hash_password from flask_security import Security, SQLAlchemyUserDatastore, hash_password
from email_validator import validate_email from email_validator import validate_email
@ -11,7 +10,6 @@ from pathlib import Path
#from .webapp import door_app #from .webapp import door_app
from .door_handle import DoorHandle from .door_handle import DoorHandle
from .auth import ExtendedLoginForm
security = Security() security = Security()
db = SQLAlchemy() db = SQLAlchemy()
@ -147,21 +145,14 @@ def create_app():
# Create database connection object # Create database connection object
db.init_app(app) db.init_app(app)
# Define models
fsqla.FsModels.set_db_info(db)
class Role(db.Model, fsqla.FsRoleMixin):
pass
class User(db.Model, fsqla.FsUserMixin):
pass
from . webapp import door_app from . webapp import door_app
app.register_blueprint(door_app) app.register_blueprint(door_app)
ldap_server = ldap3.Server(app.config['LDAP_URL']) ldap_server = ldap3.Server(app.config['LDAP_URL'])
# Setup Flask-Security # Setup Flask-Security
from .auth import ExtendedLoginForm, User, Role
user_datastore = SQLAlchemyUserDatastore(db, User, Role) user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security.init_app(app, user_datastore, login_form=ExtendedLoginForm) security.init_app(app, user_datastore, login_form=ExtendedLoginForm)

View File

@ -3,9 +3,24 @@ from flask import current_app
from flask_security import hash_password from flask_security import hash_password
from flask_security.forms import LoginForm, Required, PasswordField from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import find_user from flask_security.utils import find_user
from flask_security.models import fsqla_v2 as fsqla
import ldap3 import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
from imaginaerraum_door_admin import db, security
# Define models
fsqla.FsModels.set_db_info(db)
class Role(db.Model, fsqla.FsRoleMixin):
pass
class User(db.Model, fsqla.FsUserMixin):
pass
class ExtendedLoginForm(LoginForm): class ExtendedLoginForm(LoginForm):
email = StringField('Benutzername oder E-Mail', [Required()]) email = StringField('Benutzername oder E-Mail', [Required()])
@ -35,8 +50,8 @@ class ExtendedLoginForm(LoginForm):
user.email = new_user_data['email'] user.email = new_user_data['email']
user.password = new_user_data['password'] user.password = new_user_data['password']
for role in new_user_data['roles']: for role in new_user_data['roles']:
user_datastore.add_role_to_user(user, role) security.datastore.add_role_to_user(user, role)
user_datastore.commit() security.datastore.commit()
self.user = user self.user = user
else: else:
self.password.errors = ['Invalid password'] self.password.errors = ['Invalid password']
@ -49,15 +64,16 @@ class ExtendedLoginForm(LoginForm):
if authorized: if authorized:
# if there was no user in the database before we create a new user # if there was no user in the database before we create a new user
self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'], self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
password=new_user_data['password'], roles=new_user_data['roles']) password=new_user_data['password'], roles=new_user_data['roles'])
user_datastore.commit() security.datastore.commit()
current_app.logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after" current_app.logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
" successful LDAP authorization") " successful LDAP authorization")
# if any of the authorization methods is successful we authorize the user # if any of the authorization methods is successful we authorize the user
return authorized return authorized
def validate_ldap(username, password): def validate_ldap(username, password):
"""Validate the user and password through an LDAP server. """Validate the user and password through an LDAP server.

View File

@ -163,6 +163,7 @@ def promote_admin(username):
db.session.commit() db.session.commit()
return redirect('/manage_admins') return redirect('/manage_admins')
@door_app.route('/demote_admin/<username>') @door_app.route('/demote_admin/<username>')
@roles_required('super_admin') @roles_required('super_admin')
def demote_admin(username): def demote_admin(username):
@ -281,6 +282,26 @@ def token_log():
return redirect('/') return redirect('/')
def store_token(token_data):
"""Store token to the token file on disk.
This will use the token id and the associated data and create/modify a
token and store the new token file to disk.
"""
token = token_data['token']
tokens = current_app.door.get_tokens()
tokens[token] = {'name': token_data['name'],
'email': token_data['email'],
'valid_thru': token_data['valid_thru'],
'inactive': token_data['inactive'],
'organization': token_data['organization']}
try:
current_app.door.store_tokens(tokens)
current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}")
except Exception as e:
flash(f"Error during store_tokens. Exception: {e}")
# routes for registering, editing and deleting tokens # routes for registering, editing and deleting tokens
@door_app.route('/register-token', methods=['GET', 'POST']) @door_app.route('/register-token', methods=['GET', 'POST'])
@roles_required('admin') @roles_required('admin')
@ -298,7 +319,7 @@ def register():
recent_token = {} recent_token = {}
if {'token', 'timestamp'}.issubset(set(token.keys())): if {'token', 'timestamp'}.issubset(set(token.keys())):
dt = datetime.utcnow() - token['timestamp'] dt = datetime.utcnow() - token['timestamp']
if dt < timedelta(minutes=10): if dt < timedelta(minutes=10):
recent_token = token recent_token = token
recent_token['timedelta_minutes'] = int(dt.total_seconds() / 60.0) recent_token['timedelta_minutes'] = int(dt.total_seconds() / 60.0)
@ -307,22 +328,19 @@ def register():
# set default valid thru date to today to make sure form validity check passes # set default valid thru date to today to make sure form validity check passes
# (will not be used if limited validity is disabled) # (will not be used if limited validity is disabled)
form.valid_thru.data = date.today() form.valid_thru.data = date.today()
return render_template('register.html', token=recent_token, form=form)
elif request.method == 'POST' and form.validate(): elif request.method == 'POST' and form.validate():
# store data in session cookie token_data = {
session['token'] = current_app.door.get_most_recent_token()['token'] 'token': current_app.door.get_most_recent_token()['token'],
session['name'] = form.name.data 'name': form.name.data, 'email': form.email.data,
session['email'] = form.email.data 'organization': form.organization.data,
session['organization'] = form.organization.data 'inactive': not form.active.data,
if form.limit_validity.data: 'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else ''
session['valid_thru'] = form.valid_thru.data.isoformat() }
else: store_token(token_data)
session['valid_thru'] = '' return redirect('/tokens')
session['inactive'] = not form.active.data
return redirect('/store-token')
else: else:
return render_template('register.html', token=recent_token, form=form) flash(f'Token konnte nicht registiert werden. Fehler: {form.errors}')
return render_template('register.html', token=recent_token, form=form)
@door_app.route('/edit-token/<token>', methods=['GET', 'POST']) @door_app.route('/edit-token/<token>', methods=['GET', 'POST'])
@ -366,44 +384,21 @@ def edit_token(token):
return redirect('/tokens') return redirect('/tokens')
elif request.method == 'POST': elif request.method == 'POST':
if form.validate(): if form.validate():
# store data in session cookie # store data in token_data cookie
session['token'] = token token_data = {'token': token,
session['name'] = form.name.data 'name': form.name.data,
session['organization'] = form.organization.data 'organization': form.organization.data,
session['email'] = form.email.data 'email': form.email.data,
if form.limit_validity.data: 'inactive': not form.active.data,
session['valid_thru'] = form.valid_thru.data.isoformat() 'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else ''
else: }
session['valid_thru'] = '' store_token(token_data)
session['inactive'] = not form.active.data return redirect('/tokens')
return redirect(f'/store-token')
else: else:
flash(f'Token konnte nicht editiert werden. Fehler: {form.errors}')
return render_template('edit.html', token=token, form=form) return render_template('edit.html', token=token, form=form)
@door_app.route('/store-token')
@roles_required('admin')
def store_token():
"""Store token to the token file on disk.
This will use the token id and the associated data stored in the session cookie (filled by register_token() or
edit_token()) and create/modify a token and store the new token file to disk.
"""
token = session['token']
tokens = current_app.door.get_tokens()
tokens[token] = {'name': session['name'],
'email': session['email'],
'valid_thru': session['valid_thru'],
'inactive': session['inactive'],
'organization': session['organization']}
try:
current_app.door.store_tokens(tokens)
current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}")
except Exception as e:
flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens')
@door_app.route('/delete-token/<token>', methods=['GET', 'POST']) @door_app.route('/delete-token/<token>', methods=['GET', 'POST'])
@roles_required('admin') @roles_required('admin')
def delete_token(token): def delete_token(token):

56
tests/debug_app_config.py Normal file
View File

@ -0,0 +1,56 @@
import tempfile
from pathlib import Path
TESTING = True
DEBUG = True
LOG_FILE = 'webapp.log'
token_file = tempfile.NamedTemporaryFile(delete=False).name
Path(token_file).write_text(
"""# token | name | organization | email | valid_thru
#042979fa186280||||
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
"""
)
TOKEN_FILE = str(token_file)
SECRET_KEY = 'supersecret'
SECURITY_PASSWORD_SALT = 'salty'
admin_file = tempfile.NamedTemporaryFile(delete=False).name
Path(admin_file).write_text(
"""# create new super-admin by putting the following data in this file:
# username email ldap/password
# the third column can either be LDAP, in which case the user will be able to authenticate through LDAP
# or if it's not LDAP then it should be the password for the new user which will be stored in the local database
# examples:
gandalf gandalf@shire.me shadowfax
"""
)
ADMIN_FILE = str(admin_file)
db_file = tempfile.NamedTemporaryFile(delete=False).name
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + str(db_file)
token_log_file = tempfile.NamedTemporaryFile(delete=False).name
Path(token_log_file).write_text(
"""1970-01-01 00:00:12,947 - nfc_log - INFO - Loading tokens
1970-01-01 00:00:12,961 - nfc_log - DEBUG - Opening control socket
1970-01-01 00:00:24,259 - nfc_log - INFO - Got connection
1970-01-01 00:00:28,052 - nfc_log - INFO - Connected to mqtt host with result 0
2021-04-17 13:05:09,344 - nfc_log - DEBUG - Control socket opening door
2021-04-17 13:05:09,346 - nfc_log - INFO - Opening the door
2021-04-17 13:05:25,684 - nfc_log - DEBUG - Control socket closing door
2021-04-17 13:05:25,685 - nfc_log - INFO - Closing the door
2021-04-17 13:08:57,121 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
2021-04-17 13:08:57,123 - nfc_log - INFO - Closing the door
2021-04-17 13:09:06,207 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
2021-04-17 13:09:06,209 - nfc_log - INFO - Opening the door
2021-04-17 13:09:09,017 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
2021-04-17 13:09:09,019 - nfc_log - ERROR - Opening the door
2021-04-17 13:09:43,272 - nfc_log - DEBUG - Reloading tokens
"""
)
NFC_LOG = str(token_log_file)

View File

@ -1,7 +1,12 @@
import datetime
import pytest import pytest
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from flask_security.utils import find_user
from imaginaerraum_door_admin.door_handle import DoorHandle from imaginaerraum_door_admin.door_handle import DoorHandle
import re import re
import secrets
def test_login(browser, live_server): def test_login(browser, live_server):
response = browser.get(f'http://localhost:{live_server.port}') response = browser.get(f'http://localhost:{live_server.port}')
@ -10,15 +15,15 @@ def test_login(browser, live_server):
response = browser.get(f'http://localhost:{live_server.port}/login') response = browser.get(f'http://localhost:{live_server.port}/login')
email_form = browser.find_element_by_id('email').send_keys('gandalf@shire.me') email_form = browser.find_element('id', 'email').send_keys('gandalf@shire.me')
password_form = browser.find_element_by_id('password').send_keys('shadowfax') password_form = browser.find_element('id', 'password').send_keys('shadowfax')
submit_button = browser.find_element_by_id('submit').click() submit_button = browser.find_element('id', 'submit').click()
assert 'Tür öffnen' in browser.page_source assert 'Tür öffnen' in browser.page_source
def extract_csrf_token(response): def extract_csrf_token(response):
soup = BeautifulSoup(response.data) soup = BeautifulSoup(response.data, 'html.parser')
csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value'] csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value']
return csrf_token return csrf_token
@ -39,7 +44,7 @@ def headless_login(client, user='gandalf@shire.me', password='shadowfax'):
def test_login_headless(client): def test_login_headless(client):
response = headless_login(client) response = headless_login(client)
soup = BeautifulSoup(response.data) soup = BeautifulSoup(response.data, 'html.parser')
# make sure login succeeded -> Tür öffnen button will appear # make sure login succeeded -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})]) assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
@ -53,24 +58,30 @@ def client_authenticated(client):
yield client yield client
def test_open_door_button(client_authenticated, mocker): @pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')])
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.open_door') def test_access_door_button(client_authenticated, mocker, url, function):
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
# visit route for open # visit route for open
client_authenticated.get('/open') client_authenticated.get(url)
# make sure the open method was called # make sure the open method was called
DoorHandle.open_door.assert_called_once_with(user='gandalf') getattr(DoorHandle, function).assert_called_once_with(user='gandalf')
def test_close_door_button(client_authenticated, mocker): @pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')])
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.close_door') def test_access_door_unauthenticated(client, mocker, url, function):
# test for trying to visit opening/closing door while not logged in
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
# visit route for open # visit route for open
client_authenticated.get('/close') response = client.get(url, follow_redirects=True)
# make sure the open method was called # we should get redirected to login page
DoorHandle.close_door.assert_called_once_with(user='gandalf') assert 'login' in response.request.url
# the open door function should not be called
getattr(DoorHandle, function).assert_not_called()
def test_manage_admins(client_authenticated): def test_manage_admins(client_authenticated):
@ -82,14 +93,14 @@ def test_manage_admins(client_authenticated):
assert "gandalf@shire.me" in response.data.decode() assert "gandalf@shire.me" in response.data.decode()
def test_create_admin(client_authenticated): def create_user(client_authenticated, username, email):
# visit admin management page # visit admin management page
response = client_authenticated.get('/manage_admins') response = client_authenticated.get('/manage_admins')
csrf_token = extract_csrf_token(response) csrf_token = extract_csrf_token(response)
# post data for creating a new admin # post data for creating a new admin
payload = {'name': 'bilbo', payload = {'name': username,
'email': 'bilbo@shire.me', 'email': email,
'csrf_token': csrf_token} 'csrf_token': csrf_token}
response = client_authenticated.post('/manage_admins', data=payload, response = client_authenticated.post('/manage_admins', data=payload,
follow_redirects=True) follow_redirects=True)
@ -102,18 +113,226 @@ def test_create_admin(client_authenticated):
assert match is not None assert match is not None
extracted_password = match['password'] extracted_password = match['password']
return extracted_password
@pytest.fixture
def temp_user(client_authenticated):
"""Fixture for creating a temporary user for testing"""
username = secrets.token_hex(4)
email = username + '@example.com'
# create user for test
password = create_user(client_authenticated, username, email)
return {'username': username,
'email': email,
'password': password}
@pytest.fixture
def temp_admin(client_authenticated):
"""Fixture for creating a temporary admin user for testing"""
username = secrets.token_hex(4)
email = username + '@example.com'
# create user for test
password = create_user(client_authenticated, username, email)
response = client_authenticated.get(
f"/promote_admin/{username}",
follow_redirects=True)
user = find_user(username)
assert user.has_role('admin')
return {'username': username,
'email': email,
'password': password}
def test_create_admin(client_authenticated):
password = create_user(client_authenticated, 'bilbo', 'bilbo@shire.me')
# log out current user # log out current user
response = client_authenticated.get('/logout') response = client_authenticated.get('/logout')
# try to log in new user using the extracted password # try to log in new user using the extracted password
response = headless_login(client_authenticated, user='bilbo', response = headless_login(client_authenticated, user='bilbo',
password=extracted_password) password=password)
# - see if it works # - see if it works
soup = BeautifulSoup(response.data) soup = BeautifulSoup(response.data, 'html.parser')
# make sure login succeeded # make sure login succeeded
# -> username should be displayed # -> username should be displayed
assert 'Benutzer <span>bilbo</span>' in soup.decode() assert 'Benutzer <span>bilbo</span>' in soup.decode()
# -> Tür öffnen button will appear # -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})]) assert any(['Tür öffnen' in link.contents[0] for link in
soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
def test_activate_deactivate_user(temp_user, client_authenticated):
response = client_authenticated.get('/admin_toggle_active/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
# deactivate the user
response = client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True)
# make sure the user is now inactive
user = find_user(temp_user['username'])
assert user is not None
assert not user.active
# activate the user again
client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True)
# now the user should be active again
user = find_user(temp_user['username'])
assert user is not None
assert user.active
def test_delete_admin(temp_user, client_authenticated):
# first we test deleting a non-existing user
response = client_authenticated.post('/delete_admins/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
# next, we create a temporary user and try to delete that one
response = client_authenticated.post(f"/delete_admins/{temp_user['username']}",
follow_redirects=True)
# we need to deactivate the user first
assert 'Bitte den Benutzer zuerst deaktivieren.' in response.data.decode()
# make sure the user still exists
user = find_user(temp_user['username'])
assert user is not None
# deactivate the user and try deleting it again
response = client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True)
# try deleting it without filling in the confirmation form
response = client_authenticated.post(f"/delete_admins/{temp_user['username']}",
follow_redirects=True)
assert 'Der eingegebene Nutzername stimmt nicht überein' in response.data.decode()
# make sure the user still exists
user = find_user(temp_user['username'])
assert user is not None
# now we send the confirmation data with the request
response = client_authenticated.get(f"/delete_admins/{temp_user['username']}",
follow_redirects=True)
csrf_token = extract_csrf_token(response)
payload = {'name': temp_user['username'], 'csrf_token': csrf_token}
response = client_authenticated.post(
f"/delete_admins/{temp_user['username']}",
data=payload,
follow_redirects=True)
assert f"Benutzer {temp_user['username']} wurde gelöscht." in response.data.decode()
# make sure the user now is gone
user = find_user(temp_user['username'])
assert user is None
def test_promote_user(temp_user, client_authenticated):
# first we test with a non-existing user
response = client_authenticated.get('/promote_admin/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
user = find_user(temp_user['username'])
assert user is not None
assert not user.has_role('admin')
# grant admin permissions to test user
response = client_authenticated.get(f"/promote_admin/{temp_user['username']}",
follow_redirects=True)
assert user.has_role('admin')
# try granting admin permissions again
response = client_authenticated.get(f"/promote_admin/{temp_user['username']}",
follow_redirects=True)
assert f"Benutzer {temp_user['username']} hat bereits Admin-Rechte!"
assert user.has_role('admin')
def test_demote_user(temp_admin, client_authenticated):
# first we test with a non-existing user
response = client_authenticated.get('/demote_admin/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
user = find_user(temp_admin['username'])
assert user.has_role('admin')
# try removing admin permissions
response = client_authenticated.get(f"/demote_admin/{temp_admin['username']}",
follow_redirects=True)
assert not user.has_role('admin')
# try removing admin permissions
response = client_authenticated.get(f"/demote_admin/{temp_admin['username']}",
follow_redirects=True)
assert f"Benutzer {temp_admin['username']} ist bereits kein Admin!"
assert not user.has_role('admin')
def test_list_tokens(client_authenticated):
response = client_authenticated.get(f"/tokens", follow_redirects=True)
# make sure the names for the test tokens are displayed
assert all([user in response.data.decode()
for user in ['Gandalf', 'Bilbo', 'Gimli']])
def test_token_log(client_authenticated):
response = client_authenticated.get(f"/token-log", follow_redirects=True)
page_src = response.data.decode()
# perform some checks for displayed log data
assert all([msg in page_src for msg in
['INFO', 'DEBUG', 'ERROR']])
assert "Valid token 04538cfa186280 of Gandalf" in page_src
assert "2021-04-17 13:09:06,207" in page_src
def test_register_token(client_authenticated, mocker):
# test to make sure message is displayed when no tokens were recently scanned
response = client_authenticated.get(f"/register-token", follow_redirects=True)
page_src = response.data.decode()
assert 'Keine unregistrierten Tokens in MQTT Nachrichten.' in page_src
# mockup scanned token
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.get_most_recent_token',
lambda x: {'timestamp': datetime.datetime.now(),
'token': '042979fa181280'})
response = client_authenticated.get(f"/register-token", follow_redirects=True)
csrf_token = extract_csrf_token(response)
page_src = response.data.decode()
assert 'Unregistrierter Token gelesen' in page_src
assert '042979fa181280' in page_src
# try registering with incomplete data
response = client_authenticated.post(f"/register-token", data={},
follow_redirects=True)
page_src = response.data.decode()
assert 'Token konnte nicht registiert werden' in page_src
# register the mocked token
payload = {'name': 'Legolas',
'organization': 'Elves',
'email': 'legolas@mirkwood.me',
'dsgvo': True,
'csrf_token': csrf_token}
response = client_authenticated.post(f"/register-token", data=payload,
follow_redirects=True)
page_src = response.data.decode()
# make sure the user info for the new token is displayed
assert '042979fa181280' in page_src
assert 'Legolas' in page_src
assert 'Elves' in page_src
assert 'legolas@mirkwood.me' in page_src