Compare commits

..

No commits in common. "a621b6bb780f8e94913c54e806adb9863ae0b879" and "2879a694457d77f7060413f805ee70d49ae18e32" have entirely different histories.

5 changed files with 84 additions and 361 deletions

View File

@ -1,6 +1,7 @@
import logging
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_security.models import fsqla_v2 as fsqla
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
from email_validator import validate_email
@ -10,6 +11,7 @@ from pathlib import Path
#from .webapp import door_app
from .door_handle import DoorHandle
from .auth import ExtendedLoginForm
security = Security()
db = SQLAlchemy()
@ -145,14 +147,21 @@ def create_app():
# Create database connection object
db.init_app(app)
# Define models
fsqla.FsModels.set_db_info(db)
class Role(db.Model, fsqla.FsRoleMixin):
pass
class User(db.Model, fsqla.FsUserMixin):
pass
from . webapp import door_app
app.register_blueprint(door_app)
ldap_server = ldap3.Server(app.config['LDAP_URL'])
# Setup Flask-Security
from .auth import ExtendedLoginForm, User, Role
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)

View File

@ -3,24 +3,9 @@ from flask import current_app
from flask_security import hash_password
from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import find_user
from flask_security.models import fsqla_v2 as fsqla
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
from imaginaerraum_door_admin import db, security
# Define models
fsqla.FsModels.set_db_info(db)
class Role(db.Model, fsqla.FsRoleMixin):
pass
class User(db.Model, fsqla.FsUserMixin):
pass
class ExtendedLoginForm(LoginForm):
email = StringField('Benutzername oder E-Mail', [Required()])
@ -50,8 +35,8 @@ class ExtendedLoginForm(LoginForm):
user.email = new_user_data['email']
user.password = new_user_data['password']
for role in new_user_data['roles']:
security.datastore.add_role_to_user(user, role)
security.datastore.commit()
user_datastore.add_role_to_user(user, role)
user_datastore.commit()
self.user = user
else:
self.password.errors = ['Invalid password']
@ -64,16 +49,15 @@ class ExtendedLoginForm(LoginForm):
if authorized:
# if there was no user in the database before we create a new user
self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
password=new_user_data['password'], roles=new_user_data['roles'])
security.datastore.commit()
user_datastore.commit()
current_app.logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
" successful LDAP authorization")
# if any of the authorization methods is successful we authorize the user
return authorized
def validate_ldap(username, password):
"""Validate the user and password through an LDAP server.

View File

@ -163,7 +163,6 @@ def promote_admin(username):
db.session.commit()
return redirect('/manage_admins')
@door_app.route('/demote_admin/<username>')
@roles_required('super_admin')
def demote_admin(username):
@ -282,26 +281,6 @@ def token_log():
return redirect('/')
def store_token(token_data):
"""Store token to the token file on disk.
This will use the token id and the associated data and create/modify a
token and store the new token file to disk.
"""
token = token_data['token']
tokens = current_app.door.get_tokens()
tokens[token] = {'name': token_data['name'],
'email': token_data['email'],
'valid_thru': token_data['valid_thru'],
'inactive': token_data['inactive'],
'organization': token_data['organization']}
try:
current_app.door.store_tokens(tokens)
current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}")
except Exception as e:
flash(f"Error during store_tokens. Exception: {e}")
# routes for registering, editing and deleting tokens
@door_app.route('/register-token', methods=['GET', 'POST'])
@roles_required('admin')
@ -328,18 +307,21 @@ def register():
# set default valid thru date to today to make sure form validity check passes
# (will not be used if limited validity is disabled)
form.valid_thru.data = date.today()
return render_template('register.html', token=recent_token, form=form)
elif request.method == 'POST' and form.validate():
token_data = {
'token': current_app.door.get_most_recent_token()['token'],
'name': form.name.data, 'email': form.email.data,
'organization': form.organization.data,
'inactive': not form.active.data,
'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else ''
}
store_token(token_data)
return redirect('/tokens')
# store data in session cookie
session['token'] = current_app.door.get_most_recent_token()['token']
session['name'] = form.name.data
session['email'] = form.email.data
session['organization'] = form.organization.data
if form.limit_validity.data:
session['valid_thru'] = form.valid_thru.data.isoformat()
else:
session['valid_thru'] = ''
session['inactive'] = not form.active.data
return redirect('/store-token')
else:
flash(f'Token konnte nicht registiert werden. Fehler: {form.errors}')
return render_template('register.html', token=recent_token, form=form)
@ -384,21 +366,44 @@ def edit_token(token):
return redirect('/tokens')
elif request.method == 'POST':
if form.validate():
# store data in token_data cookie
token_data = {'token': token,
'name': form.name.data,
'organization': form.organization.data,
'email': form.email.data,
'inactive': not form.active.data,
'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else ''
}
store_token(token_data)
return redirect('/tokens')
# store data in session cookie
session['token'] = token
session['name'] = form.name.data
session['organization'] = form.organization.data
session['email'] = form.email.data
if form.limit_validity.data:
session['valid_thru'] = form.valid_thru.data.isoformat()
else:
session['valid_thru'] = ''
session['inactive'] = not form.active.data
return redirect(f'/store-token')
else:
flash(f'Token konnte nicht editiert werden. Fehler: {form.errors}')
return render_template('edit.html', token=token, form=form)
@door_app.route('/store-token')
@roles_required('admin')
def store_token():
"""Store token to the token file on disk.
This will use the token id and the associated data stored in the session cookie (filled by register_token() or
edit_token()) and create/modify a token and store the new token file to disk.
"""
token = session['token']
tokens = current_app.door.get_tokens()
tokens[token] = {'name': session['name'],
'email': session['email'],
'valid_thru': session['valid_thru'],
'inactive': session['inactive'],
'organization': session['organization']}
try:
current_app.door.store_tokens(tokens)
current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}")
except Exception as e:
flash(f"Error during store_tokens. Exception: {e}")
return redirect('/tokens')
@door_app.route('/delete-token/<token>', methods=['GET', 'POST'])
@roles_required('admin')
def delete_token(token):

View File

@ -1,56 +0,0 @@
import tempfile
from pathlib import Path
TESTING = True
DEBUG = True
LOG_FILE = 'webapp.log'
token_file = tempfile.NamedTemporaryFile(delete=False).name
Path(token_file).write_text(
"""# token | name | organization | email | valid_thru
#042979fa186280||||
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
"""
)
TOKEN_FILE = str(token_file)
SECRET_KEY = 'supersecret'
SECURITY_PASSWORD_SALT = 'salty'
admin_file = tempfile.NamedTemporaryFile(delete=False).name
Path(admin_file).write_text(
"""# create new super-admin by putting the following data in this file:
# username email ldap/password
# the third column can either be LDAP, in which case the user will be able to authenticate through LDAP
# or if it's not LDAP then it should be the password for the new user which will be stored in the local database
# examples:
gandalf gandalf@shire.me shadowfax
"""
)
ADMIN_FILE = str(admin_file)
db_file = tempfile.NamedTemporaryFile(delete=False).name
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + str(db_file)
token_log_file = tempfile.NamedTemporaryFile(delete=False).name
Path(token_log_file).write_text(
"""1970-01-01 00:00:12,947 - nfc_log - INFO - Loading tokens
1970-01-01 00:00:12,961 - nfc_log - DEBUG - Opening control socket
1970-01-01 00:00:24,259 - nfc_log - INFO - Got connection
1970-01-01 00:00:28,052 - nfc_log - INFO - Connected to mqtt host with result 0
2021-04-17 13:05:09,344 - nfc_log - DEBUG - Control socket opening door
2021-04-17 13:05:09,346 - nfc_log - INFO - Opening the door
2021-04-17 13:05:25,684 - nfc_log - DEBUG - Control socket closing door
2021-04-17 13:05:25,685 - nfc_log - INFO - Closing the door
2021-04-17 13:08:57,121 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
2021-04-17 13:08:57,123 - nfc_log - INFO - Closing the door
2021-04-17 13:09:06,207 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
2021-04-17 13:09:06,209 - nfc_log - INFO - Opening the door
2021-04-17 13:09:09,017 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
2021-04-17 13:09:09,019 - nfc_log - ERROR - Opening the door
2021-04-17 13:09:43,272 - nfc_log - DEBUG - Reloading tokens
"""
)
NFC_LOG = str(token_log_file)

View File

@ -1,12 +1,7 @@
import datetime
import pytest
from bs4 import BeautifulSoup
from flask_security.utils import find_user
from imaginaerraum_door_admin.door_handle import DoorHandle
import re
import secrets
def test_login(browser, live_server):
response = browser.get(f'http://localhost:{live_server.port}')
@ -15,15 +10,15 @@ def test_login(browser, live_server):
response = browser.get(f'http://localhost:{live_server.port}/login')
email_form = browser.find_element('id', 'email').send_keys('gandalf@shire.me')
password_form = browser.find_element('id', 'password').send_keys('shadowfax')
submit_button = browser.find_element('id', 'submit').click()
email_form = browser.find_element_by_id('email').send_keys('gandalf@shire.me')
password_form = browser.find_element_by_id('password').send_keys('shadowfax')
submit_button = browser.find_element_by_id('submit').click()
assert 'Tür öffnen' in browser.page_source
def extract_csrf_token(response):
soup = BeautifulSoup(response.data, 'html.parser')
soup = BeautifulSoup(response.data)
csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value']
return csrf_token
@ -44,7 +39,7 @@ def headless_login(client, user='gandalf@shire.me', password='shadowfax'):
def test_login_headless(client):
response = headless_login(client)
soup = BeautifulSoup(response.data, 'html.parser')
soup = BeautifulSoup(response.data)
# make sure login succeeded -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
@ -58,30 +53,24 @@ def client_authenticated(client):
yield client
@pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')])
def test_access_door_button(client_authenticated, mocker, url, function):
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
def test_open_door_button(client_authenticated, mocker):
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.open_door')
# visit route for open
client_authenticated.get(url)
client_authenticated.get('/open')
# make sure the open method was called
getattr(DoorHandle, function).assert_called_once_with(user='gandalf')
DoorHandle.open_door.assert_called_once_with(user='gandalf')
@pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')])
def test_access_door_unauthenticated(client, mocker, url, function):
# test for trying to visit opening/closing door while not logged in
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
def test_close_door_button(client_authenticated, mocker):
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.close_door')
# visit route for open
response = client.get(url, follow_redirects=True)
client_authenticated.get('/close')
# we should get redirected to login page
assert 'login' in response.request.url
# the open door function should not be called
getattr(DoorHandle, function).assert_not_called()
# make sure the open method was called
DoorHandle.close_door.assert_called_once_with(user='gandalf')
def test_manage_admins(client_authenticated):
@ -93,14 +82,14 @@ def test_manage_admins(client_authenticated):
assert "gandalf@shire.me" in response.data.decode()
def create_user(client_authenticated, username, email):
def test_create_admin(client_authenticated):
# visit admin management page
response = client_authenticated.get('/manage_admins')
csrf_token = extract_csrf_token(response)
# post data for creating a new admin
payload = {'name': username,
'email': email,
payload = {'name': 'bilbo',
'email': 'bilbo@shire.me',
'csrf_token': csrf_token}
response = client_authenticated.post('/manage_admins', data=payload,
follow_redirects=True)
@ -113,226 +102,18 @@ def create_user(client_authenticated, username, email):
assert match is not None
extracted_password = match['password']
return extracted_password
@pytest.fixture
def temp_user(client_authenticated):
"""Fixture for creating a temporary user for testing"""
username = secrets.token_hex(4)
email = username + '@example.com'
# create user for test
password = create_user(client_authenticated, username, email)
return {'username': username,
'email': email,
'password': password}
@pytest.fixture
def temp_admin(client_authenticated):
"""Fixture for creating a temporary admin user for testing"""
username = secrets.token_hex(4)
email = username + '@example.com'
# create user for test
password = create_user(client_authenticated, username, email)
response = client_authenticated.get(
f"/promote_admin/{username}",
follow_redirects=True)
user = find_user(username)
assert user.has_role('admin')
return {'username': username,
'email': email,
'password': password}
def test_create_admin(client_authenticated):
password = create_user(client_authenticated, 'bilbo', 'bilbo@shire.me')
# log out current user
response = client_authenticated.get('/logout')
# try to log in new user using the extracted password
response = headless_login(client_authenticated, user='bilbo',
password=password)
password=extracted_password)
# - see if it works
soup = BeautifulSoup(response.data, 'html.parser')
soup = BeautifulSoup(response.data)
# make sure login succeeded
# -> username should be displayed
assert 'Benutzer <span>bilbo</span>' in soup.decode()
# -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0] for link in
soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
def test_activate_deactivate_user(temp_user, client_authenticated):
response = client_authenticated.get('/admin_toggle_active/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
# deactivate the user
response = client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True)
# make sure the user is now inactive
user = find_user(temp_user['username'])
assert user is not None
assert not user.active
# activate the user again
client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True)
# now the user should be active again
user = find_user(temp_user['username'])
assert user is not None
assert user.active
def test_delete_admin(temp_user, client_authenticated):
# first we test deleting a non-existing user
response = client_authenticated.post('/delete_admins/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
# next, we create a temporary user and try to delete that one
response = client_authenticated.post(f"/delete_admins/{temp_user['username']}",
follow_redirects=True)
# we need to deactivate the user first
assert 'Bitte den Benutzer zuerst deaktivieren.' in response.data.decode()
# make sure the user still exists
user = find_user(temp_user['username'])
assert user is not None
# deactivate the user and try deleting it again
response = client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True)
# try deleting it without filling in the confirmation form
response = client_authenticated.post(f"/delete_admins/{temp_user['username']}",
follow_redirects=True)
assert 'Der eingegebene Nutzername stimmt nicht überein' in response.data.decode()
# make sure the user still exists
user = find_user(temp_user['username'])
assert user is not None
# now we send the confirmation data with the request
response = client_authenticated.get(f"/delete_admins/{temp_user['username']}",
follow_redirects=True)
csrf_token = extract_csrf_token(response)
payload = {'name': temp_user['username'], 'csrf_token': csrf_token}
response = client_authenticated.post(
f"/delete_admins/{temp_user['username']}",
data=payload,
follow_redirects=True)
assert f"Benutzer {temp_user['username']} wurde gelöscht." in response.data.decode()
# make sure the user now is gone
user = find_user(temp_user['username'])
assert user is None
def test_promote_user(temp_user, client_authenticated):
# first we test with a non-existing user
response = client_authenticated.get('/promote_admin/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
user = find_user(temp_user['username'])
assert user is not None
assert not user.has_role('admin')
# grant admin permissions to test user
response = client_authenticated.get(f"/promote_admin/{temp_user['username']}",
follow_redirects=True)
assert user.has_role('admin')
# try granting admin permissions again
response = client_authenticated.get(f"/promote_admin/{temp_user['username']}",
follow_redirects=True)
assert f"Benutzer {temp_user['username']} hat bereits Admin-Rechte!"
assert user.has_role('admin')
def test_demote_user(temp_admin, client_authenticated):
# first we test with a non-existing user
response = client_authenticated.get('/demote_admin/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
user = find_user(temp_admin['username'])
assert user.has_role('admin')
# try removing admin permissions
response = client_authenticated.get(f"/demote_admin/{temp_admin['username']}",
follow_redirects=True)
assert not user.has_role('admin')
# try removing admin permissions
response = client_authenticated.get(f"/demote_admin/{temp_admin['username']}",
follow_redirects=True)
assert f"Benutzer {temp_admin['username']} ist bereits kein Admin!"
assert not user.has_role('admin')
def test_list_tokens(client_authenticated):
response = client_authenticated.get(f"/tokens", follow_redirects=True)
# make sure the names for the test tokens are displayed
assert all([user in response.data.decode()
for user in ['Gandalf', 'Bilbo', 'Gimli']])
def test_token_log(client_authenticated):
response = client_authenticated.get(f"/token-log", follow_redirects=True)
page_src = response.data.decode()
# perform some checks for displayed log data
assert all([msg in page_src for msg in
['INFO', 'DEBUG', 'ERROR']])
assert "Valid token 04538cfa186280 of Gandalf" in page_src
assert "2021-04-17 13:09:06,207" in page_src
def test_register_token(client_authenticated, mocker):
# test to make sure message is displayed when no tokens were recently scanned
response = client_authenticated.get(f"/register-token", follow_redirects=True)
page_src = response.data.decode()
assert 'Keine unregistrierten Tokens in MQTT Nachrichten.' in page_src
# mockup scanned token
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.get_most_recent_token',
lambda x: {'timestamp': datetime.datetime.now(),
'token': '042979fa181280'})
response = client_authenticated.get(f"/register-token", follow_redirects=True)
csrf_token = extract_csrf_token(response)
page_src = response.data.decode()
assert 'Unregistrierter Token gelesen' in page_src
assert '042979fa181280' in page_src
# try registering with incomplete data
response = client_authenticated.post(f"/register-token", data={},
follow_redirects=True)
page_src = response.data.decode()
assert 'Token konnte nicht registiert werden' in page_src
# register the mocked token
payload = {'name': 'Legolas',
'organization': 'Elves',
'email': 'legolas@mirkwood.me',
'dsgvo': True,
'csrf_token': csrf_token}
response = client_authenticated.post(f"/register-token", data=payload,
follow_redirects=True)
page_src = response.data.decode()
# make sure the user info for the new token is displayed
assert '042979fa181280' in page_src
assert 'Legolas' in page_src
assert 'Elves' in page_src
assert 'legolas@mirkwood.me' in page_src
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])