Compare commits

...

7 Commits

3 changed files with 242 additions and 70 deletions

View File

@ -172,11 +172,13 @@ def demote_admin(username):
flash(f"Ungültiger Nutzer {username}") flash(f"Ungültiger Nutzer {username}")
return redirect('/manage_admins') return redirect('/manage_admins')
if user.has_role('super_admin'): if user.has_role('super_admin'):
flash(f'Benutzer {username} hat Super-Admin-Rechte und kann nicht verändert werden!') flash(f'Benutzer {username} hat Super-Admin-Rechte und kann nicht '
'verändert werden!')
return redirect('/manage_admins') return redirect('/manage_admins')
if user.has_role('admin'): if user.has_role('admin'):
security.datastore.remove_role_from_user(user, 'admin') security.datastore.remove_role_from_user(user, 'admin')
current_app.logger.info(f"Super admin {current_user.username} revoked admin privileges of user {username}") current_app.logger.info(f"Super admin {current_user.username} revoked "
f"admin privileges of user {username}")
db.session.commit() db.session.commit()
else: else:
flash(f'Benutzer {username} ist bereits kein Admin!') flash(f'Benutzer {username} ist bereits kein Admin!')
@ -188,7 +190,8 @@ def demote_admin(username):
def backup_user_datastore(): def backup_user_datastore():
# get list of defined admin users for backup # get list of defined admin users for backup
users = security.datastore.user_model.query.all() users = security.datastore.user_model.query.all()
user_data = [{'username': u.username, 'email': u.email, 'active': u.is_active, 'password_hash': u.password, user_data = [{'username': u.username, 'email': u.email,
'active': u.is_active, 'password_hash': u.password,
'roles': [r.name for r in u.roles]} 'roles': [r.name for r in u.roles]}
for u in users if not u.has_role('super_admin')] for u in users if not u.has_role('super_admin')]
try: try:
@ -380,7 +383,7 @@ def edit_token(token):
return render_template('edit.html', token=token, form=form) return render_template('edit.html', token=token, form=form)
else: else:
# flash an error message if the route is accessed with an invalid token # flash an error message if the route is accessed with an invalid token
flash(f'Ausgewaehlter Token {token} in Tokenfile nicht gefunden.') flash(f'Ungültiger Token {token}!')
return redirect('/tokens') return redirect('/tokens')
elif request.method == 'POST': elif request.method == 'POST':
if form.validate(): if form.validate():
@ -411,7 +414,10 @@ def delete_token(token):
""" """
tokens = current_app.door.get_tokens() tokens = current_app.door.get_tokens()
if token in tokens: if token not in tokens:
flash(f'Ungültiger Token {token} für Löschung.')
return redirect('/tokens')
token_to_delete = tokens[token] token_to_delete = tokens[token]
# set up form for confirming deletion # set up form for confirming deletion
@ -426,7 +432,8 @@ def delete_token(token):
tokens.pop(token) tokens.pop(token)
try: try:
current_app.door.store_tokens(tokens) current_app.door.store_tokens(tokens)
current_app.logger.info(f"Token {token} was deleted from database by admin user {current_user.username}") current_app.logger.info(f"Token {token} was deleted from database "
f"by admin user {current_user.username}")
except Exception as e: except Exception as e:
flash(f"Error during store_tokens. Exception: {e}") flash(f"Error during store_tokens. Exception: {e}")
flash(f"Token {token} wurde gelöscht!") flash(f"Token {token} wurde gelöscht!")
@ -434,10 +441,10 @@ def delete_token(token):
else: else:
# form validation failed -> return to token overview and flash message # form validation failed -> return to token overview and flash message
flash( flash(
f"Der eingegebene Name stimmt nicht überein. Der Token {token} von {token_to_delete['name']} wurde nicht gelöscht.") f"Der eingegebene Name stimmt nicht überein. Error: {form.errors}"
return redirect('/tokens') f"Der Token {token} von {token_to_delete['name']} wurde nicht "
else: "gelöscht."
flash(f'Ungültiger Token {token} für Löschung.') )
return redirect('/tokens') return redirect('/tokens')
@ -452,7 +459,11 @@ def deactivate_token(token):
The token to deactivate. The token to deactivate.
""" """
tokens = current_app.door.get_tokens() tokens = current_app.door.get_tokens()
if token in tokens:
if token not in tokens:
flash(f'Ungültiger Token {token} für Deaktivierung.')
return redirect('/tokens')
tokens[token]['inactive'] = True tokens[token]['inactive'] = True
try: try:
current_app.door.store_tokens(tokens) current_app.door.store_tokens(tokens)

View File

@ -1,8 +1,11 @@
import pytest import pytest
import os
from selenium.webdriver import Chrome from selenium.webdriver import Chrome
from imaginaerraum_door_admin import create_app from imaginaerraum_door_admin import create_app
os.environ['APPLICATION_SETTINGS'] = os.getcwd() + '/debug_app_config.py'
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def app(): def app():

View File

@ -6,6 +6,8 @@ from flask_security.utils import find_user
from imaginaerraum_door_admin.door_handle import DoorHandle from imaginaerraum_door_admin.door_handle import DoorHandle
import re import re
import secrets import secrets
import pathlib
import json
def test_login(browser, live_server): def test_login(browser, live_server):
@ -47,7 +49,8 @@ def test_login_headless(client):
soup = BeautifulSoup(response.data, 'html.parser') soup = BeautifulSoup(response.data, 'html.parser')
# make sure login succeeded -> Tür öffnen button will appear # make sure login succeeded -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})]) assert any(['Tür öffnen' in link.contents[0]
for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
@pytest.fixture @pytest.fixture
@ -58,7 +61,8 @@ def client_authenticated(client):
yield client yield client
@pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')]) @pytest.mark.parametrize("url,function", [('/open', 'open_door'),
('/close', 'close_door')])
def test_access_door_button(client_authenticated, mocker, url, function): def test_access_door_button(client_authenticated, mocker, url, function):
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function) mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
@ -150,6 +154,18 @@ def temp_admin(client_authenticated):
'password': password} 'password': password}
def test_backup_users(client_authenticated, temp_user):
# test with invalid token
response = client_authenticated.get("/backup_user_datastore",
follow_redirects=True)
user_data = json.loads(response.data)
users = [d['username'] for d in user_data]
emails = [d['email'] for d in user_data]
assert temp_user['username'] in users
assert temp_user['email'] in emails
def test_create_admin(client_authenticated): def test_create_admin(client_authenticated):
password = create_user(client_authenticated, 'bilbo', 'bilbo@shire.me') password = create_user(client_authenticated, 'bilbo', 'bilbo@shire.me')
@ -171,12 +187,14 @@ def test_create_admin(client_authenticated):
def test_activate_deactivate_user(temp_user, client_authenticated): def test_activate_deactivate_user(temp_user, client_authenticated):
response = client_authenticated.get('/admin_toggle_active/nosuchuser', response = client_authenticated.get(
'/admin_toggle_active/nosuchuser',
follow_redirects=True) follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode() assert 'Ungültiger Nutzer' in response.data.decode()
# deactivate the user # deactivate the user
response = client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}", response = client_authenticated.get(
f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True) follow_redirects=True)
# make sure the user is now inactive # make sure the user is now inactive
user = find_user(temp_user['username']) user = find_user(temp_user['username'])
@ -195,12 +213,14 @@ def test_activate_deactivate_user(temp_user, client_authenticated):
def test_delete_admin(temp_user, client_authenticated): def test_delete_admin(temp_user, client_authenticated):
# first we test deleting a non-existing user # first we test deleting a non-existing user
response = client_authenticated.post('/delete_admins/nosuchuser', response = client_authenticated.post(
'/delete_admins/nosuchuser',
follow_redirects=True) follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode() assert 'Ungültiger Nutzer' in response.data.decode()
# next, we create a temporary user and try to delete that one # next, we create a temporary user and try to delete that one
response = client_authenticated.post(f"/delete_admins/{temp_user['username']}", response = client_authenticated.post(
f"/delete_admins/{temp_user['username']}",
follow_redirects=True) follow_redirects=True)
# we need to deactivate the user first # we need to deactivate the user first
@ -210,19 +230,23 @@ def test_delete_admin(temp_user, client_authenticated):
assert user is not None assert user is not None
# deactivate the user and try deleting it again # deactivate the user and try deleting it again
response = client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}", response = client_authenticated.get(
f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True) follow_redirects=True)
# try deleting it without filling in the confirmation form # try deleting it without filling in the confirmation form
response = client_authenticated.post(f"/delete_admins/{temp_user['username']}", response = client_authenticated.post(
f"/delete_admins/{temp_user['username']}",
follow_redirects=True) follow_redirects=True)
assert 'Der eingegebene Nutzername stimmt nicht überein' in response.data.decode() assert 'Der eingegebene Nutzername stimmt nicht überein' \
in response.data.decode()
# make sure the user still exists # make sure the user still exists
user = find_user(temp_user['username']) user = find_user(temp_user['username'])
assert user is not None assert user is not None
# now we send the confirmation data with the request # now we send the confirmation data with the request
response = client_authenticated.get(f"/delete_admins/{temp_user['username']}", response = client_authenticated.get(
f"/delete_admins/{temp_user['username']}",
follow_redirects=True) follow_redirects=True)
csrf_token = extract_csrf_token(response) csrf_token = extract_csrf_token(response)
payload = {'name': temp_user['username'], 'csrf_token': csrf_token} payload = {'name': temp_user['username'], 'csrf_token': csrf_token}
@ -239,7 +263,8 @@ def test_delete_admin(temp_user, client_authenticated):
def test_promote_user(temp_user, client_authenticated): def test_promote_user(temp_user, client_authenticated):
# first we test with a non-existing user # first we test with a non-existing user
response = client_authenticated.get('/promote_admin/nosuchuser', response = client_authenticated.get(
'/promote_admin/nosuchuser',
follow_redirects=True) follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode() assert 'Ungültiger Nutzer' in response.data.decode()
@ -247,12 +272,14 @@ def test_promote_user(temp_user, client_authenticated):
assert user is not None assert user is not None
assert not user.has_role('admin') assert not user.has_role('admin')
# grant admin permissions to test user # grant admin permissions to test user
response = client_authenticated.get(f"/promote_admin/{temp_user['username']}", response = client_authenticated.get(
f"/promote_admin/{temp_user['username']}",
follow_redirects=True) follow_redirects=True)
assert user.has_role('admin') assert user.has_role('admin')
# try granting admin permissions again # try granting admin permissions again
response = client_authenticated.get(f"/promote_admin/{temp_user['username']}", response = client_authenticated.get(
f"/promote_admin/{temp_user['username']}",
follow_redirects=True) follow_redirects=True)
assert f"Benutzer {temp_user['username']} hat bereits Admin-Rechte!" assert f"Benutzer {temp_user['username']} hat bereits Admin-Rechte!"
assert user.has_role('admin') assert user.has_role('admin')
@ -260,23 +287,35 @@ def test_promote_user(temp_user, client_authenticated):
def test_demote_user(temp_admin, client_authenticated): def test_demote_user(temp_admin, client_authenticated):
# first we test with a non-existing user # first we test with a non-existing user
response = client_authenticated.get('/demote_admin/nosuchuser', response = client_authenticated.get(
'/demote_admin/nosuchuser',
follow_redirects=True) follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode() assert 'Ungültiger Nutzer' in response.data.decode()
user = find_user(temp_admin['username']) user = find_user(temp_admin['username'])
assert user.has_role('admin') assert user.has_role('admin')
# try removing admin permissions # try removing admin permissions
response = client_authenticated.get(f"/demote_admin/{temp_admin['username']}", response = client_authenticated.get(
f"/demote_admin/{temp_admin['username']}",
follow_redirects=True) follow_redirects=True)
assert not user.has_role('admin') assert not user.has_role('admin')
# try removing admin permissions # try removing admin permissions
response = client_authenticated.get(f"/demote_admin/{temp_admin['username']}", response = client_authenticated.get(
f"/demote_admin/{temp_admin['username']}",
follow_redirects=True) follow_redirects=True)
assert f"Benutzer {temp_admin['username']} ist bereits kein Admin!" assert f"Benutzer {temp_admin['username']} ist bereits kein Admin!" \
in response.data.decode()
assert not user.has_role('admin') assert not user.has_role('admin')
# try removing admin permissions from superadmin
response = client_authenticated.get(
f"/demote_admin/gandalf",
follow_redirects=True)
assert "hat Super-Admin-Rechte und kann nicht verändert werden!" \
in response.data.decode()
def test_list_tokens(client_authenticated): def test_list_tokens(client_authenticated):
response = client_authenticated.get(f"/tokens", follow_redirects=True) response = client_authenticated.get(f"/tokens", follow_redirects=True)
@ -298,14 +337,27 @@ def test_token_log(client_authenticated):
assert "2021-04-17 13:09:06,207" in page_src assert "2021-04-17 13:09:06,207" in page_src
def test_backup_tokens(client_authenticated):
# test with invalid token
response = client_authenticated.get(f"/backup_tokens",
follow_redirects=True)
token_data = json.loads(response.data)
assert {'04387cfa186280', '043a81fa186280', '04538cfa186280',
'042979fa186280'}.issubset(token_data.keys())
def test_register_token(client_authenticated, mocker): def test_register_token(client_authenticated, mocker):
# test to make sure message is displayed when no tokens were recently scanned # test to make sure message is displayed when no tokens were recently
response = client_authenticated.get(f"/register-token", follow_redirects=True) # scanned
response = client_authenticated.get(f"/register-token",
follow_redirects=True)
page_src = response.data.decode() page_src = response.data.decode()
assert 'Keine unregistrierten Tokens in MQTT Nachrichten.' in page_src assert 'Keine unregistrierten Tokens in MQTT Nachrichten.' in page_src
# mockup scanned token # mockup scanned token
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.get_most_recent_token', mocker.patch(
'imaginaerraum_door_admin.door_handle.DoorHandle.get_most_recent_token',
lambda x: {'timestamp': datetime.datetime.now(), lambda x: {'timestamp': datetime.datetime.now(),
'token': '042979fa181280'}) 'token': '042979fa181280'})
response = client_authenticated.get(f"/register-token", follow_redirects=True) response = client_authenticated.get(f"/register-token", follow_redirects=True)
@ -336,3 +388,109 @@ def test_register_token(client_authenticated, mocker):
assert 'Elves' in page_src assert 'Elves' in page_src
assert 'legolas@mirkwood.me' in page_src assert 'legolas@mirkwood.me' in page_src
# check that the token is created in the token file
token_data = pathlib.Path(
client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '042979fa181280' in token_data
assert 'Legolas' in token_data
def test_edit_token(client_authenticated):
# test with invalid token
response = client_authenticated.get(f"/edit-token/nosuchtoken",
follow_redirects=True)
page_src = response.data.decode()
assert 'Ungültiger Token' in page_src
response = client_authenticated.post(f"/edit-token/nosuchtoken",
follow_redirects=True)
page_src = response.data.decode()
assert 'Token konnte nicht editiert werden' in page_src
# test using a valid token from the token file
response = client_authenticated.get(f"/edit-token/04538cfa186280",
follow_redirects=True)
csrf_token = extract_csrf_token(response)
payload = {
'name': 'Balin',
'organization': 'Dwarves',
'email': 'balin@erebor.me',
'active': True,
'limit_validity': False,
'valid_thru': datetime.date.today(),
'csrf_token': csrf_token
}
response = client_authenticated.post(f"/edit-token/04538cfa186280",
data=payload,
follow_redirects=True)
page_src = response.data.decode()
# make sure the new user info for the token is displayed
assert '04538cfa186280' in page_src
assert 'Balin' in page_src
assert 'Dwarves' in page_src
assert 'balin@erebor.me' in page_src
# check that the token is changed in the token file
token_data = pathlib.Path(client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '04538cfa186280' in token_data
assert 'Balin' in token_data
def test_delete_token(client_authenticated):
token_data = pathlib.Path(
client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '04538cfa186280' in token_data
# test with invalid token
response = client_authenticated.get(f"/delete-token/nosuchtoken",
follow_redirects=True)
page_src = response.data.decode()
assert 'Ungültiger Token' in page_src
# test using a valid token from the token file
response = client_authenticated.get(f"/delete-token/043a81fa186280",
follow_redirects=True)
csrf_token = extract_csrf_token(response)
# try deleting without form data
response = client_authenticated.post(f"/delete-token/043a81fa186280",
follow_redirects=True)
page_src = response.data.decode()
assert "wurde nicht gelöscht" in page_src
payload = {
'name': 'Bilbo',
'csrf_token': csrf_token
}
response = client_authenticated.post(f"/delete-token/043a81fa186280",
data=payload,
follow_redirects=True)
page_src = response.data.decode()
print(page_src)
assert "wurde gelöscht" in page_src
# check that the token is now gone from the token file
token_data = pathlib.Path(client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '043a81fa186280' not in token_data
def test_deactivate_token(client_authenticated):
token_data = pathlib.Path(
client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '04387cfa186280' in token_data
# test with invalid token
response = client_authenticated.get(f"/deactivate-token/nosuchtoken",
follow_redirects=True)
page_src = response.data.decode()
assert 'Ungültiger Token' in page_src
# deactivate token
response = client_authenticated.get(f"/deactivate-token/04387cfa186280",
follow_redirects=True)
# check that the token is now gone from the token file
token_data = pathlib.Path(
client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '#04387cfa186280' in token_data