Compare commits
No commits in common. "a621b6bb780f8e94913c54e806adb9863ae0b879" and "2879a694457d77f7060413f805ee70d49ae18e32" have entirely different histories.
a621b6bb78
...
2879a69445
|
@ -1,6 +1,7 @@
|
|||
import logging
|
||||
from flask import Flask
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_security.models import fsqla_v2 as fsqla
|
||||
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
|
||||
from email_validator import validate_email
|
||||
|
||||
|
@ -10,6 +11,7 @@ from pathlib import Path
|
|||
|
||||
#from .webapp import door_app
|
||||
from .door_handle import DoorHandle
|
||||
from .auth import ExtendedLoginForm
|
||||
|
||||
security = Security()
|
||||
db = SQLAlchemy()
|
||||
|
@ -145,14 +147,21 @@ def create_app():
|
|||
# Create database connection object
|
||||
db.init_app(app)
|
||||
|
||||
# Define models
|
||||
fsqla.FsModels.set_db_info(db)
|
||||
|
||||
class Role(db.Model, fsqla.FsRoleMixin):
|
||||
pass
|
||||
|
||||
class User(db.Model, fsqla.FsUserMixin):
|
||||
pass
|
||||
|
||||
from . webapp import door_app
|
||||
app.register_blueprint(door_app)
|
||||
|
||||
ldap_server = ldap3.Server(app.config['LDAP_URL'])
|
||||
|
||||
# Setup Flask-Security
|
||||
from .auth import ExtendedLoginForm, User, Role
|
||||
|
||||
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
||||
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
|
||||
|
||||
|
|
|
@ -3,24 +3,9 @@ from flask import current_app
|
|||
from flask_security import hash_password
|
||||
from flask_security.forms import LoginForm, Required, PasswordField
|
||||
from flask_security.utils import find_user
|
||||
from flask_security.models import fsqla_v2 as fsqla
|
||||
|
||||
import ldap3
|
||||
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
|
||||
|
||||
from imaginaerraum_door_admin import db, security
|
||||
|
||||
# Define models
|
||||
fsqla.FsModels.set_db_info(db)
|
||||
|
||||
|
||||
class Role(db.Model, fsqla.FsRoleMixin):
|
||||
pass
|
||||
|
||||
|
||||
class User(db.Model, fsqla.FsUserMixin):
|
||||
pass
|
||||
|
||||
|
||||
class ExtendedLoginForm(LoginForm):
|
||||
email = StringField('Benutzername oder E-Mail', [Required()])
|
||||
|
@ -50,8 +35,8 @@ class ExtendedLoginForm(LoginForm):
|
|||
user.email = new_user_data['email']
|
||||
user.password = new_user_data['password']
|
||||
for role in new_user_data['roles']:
|
||||
security.datastore.add_role_to_user(user, role)
|
||||
security.datastore.commit()
|
||||
user_datastore.add_role_to_user(user, role)
|
||||
user_datastore.commit()
|
||||
self.user = user
|
||||
else:
|
||||
self.password.errors = ['Invalid password']
|
||||
|
@ -64,16 +49,15 @@ class ExtendedLoginForm(LoginForm):
|
|||
|
||||
if authorized:
|
||||
# if there was no user in the database before we create a new user
|
||||
self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
|
||||
self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
|
||||
password=new_user_data['password'], roles=new_user_data['roles'])
|
||||
security.datastore.commit()
|
||||
user_datastore.commit()
|
||||
current_app.logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
|
||||
" successful LDAP authorization")
|
||||
|
||||
# if any of the authorization methods is successful we authorize the user
|
||||
return authorized
|
||||
|
||||
|
||||
def validate_ldap(username, password):
|
||||
"""Validate the user and password through an LDAP server.
|
||||
|
||||
|
|
|
@ -163,7 +163,6 @@ def promote_admin(username):
|
|||
db.session.commit()
|
||||
return redirect('/manage_admins')
|
||||
|
||||
|
||||
@door_app.route('/demote_admin/<username>')
|
||||
@roles_required('super_admin')
|
||||
def demote_admin(username):
|
||||
|
@ -282,26 +281,6 @@ def token_log():
|
|||
return redirect('/')
|
||||
|
||||
|
||||
def store_token(token_data):
|
||||
"""Store token to the token file on disk.
|
||||
|
||||
This will use the token id and the associated data and create/modify a
|
||||
token and store the new token file to disk.
|
||||
"""
|
||||
token = token_data['token']
|
||||
tokens = current_app.door.get_tokens()
|
||||
tokens[token] = {'name': token_data['name'],
|
||||
'email': token_data['email'],
|
||||
'valid_thru': token_data['valid_thru'],
|
||||
'inactive': token_data['inactive'],
|
||||
'organization': token_data['organization']}
|
||||
try:
|
||||
current_app.door.store_tokens(tokens)
|
||||
current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}")
|
||||
except Exception as e:
|
||||
flash(f"Error during store_tokens. Exception: {e}")
|
||||
|
||||
|
||||
# routes for registering, editing and deleting tokens
|
||||
@door_app.route('/register-token', methods=['GET', 'POST'])
|
||||
@roles_required('admin')
|
||||
|
@ -319,7 +298,7 @@ def register():
|
|||
recent_token = {}
|
||||
if {'token', 'timestamp'}.issubset(set(token.keys())):
|
||||
dt = datetime.utcnow() - token['timestamp']
|
||||
if dt < timedelta(minutes=10):
|
||||
if dt < timedelta(minutes=10):
|
||||
recent_token = token
|
||||
recent_token['timedelta_minutes'] = int(dt.total_seconds() / 60.0)
|
||||
|
||||
|
@ -328,19 +307,22 @@ def register():
|
|||
# set default valid thru date to today to make sure form validity check passes
|
||||
# (will not be used if limited validity is disabled)
|
||||
form.valid_thru.data = date.today()
|
||||
|
||||
return render_template('register.html', token=recent_token, form=form)
|
||||
elif request.method == 'POST' and form.validate():
|
||||
token_data = {
|
||||
'token': current_app.door.get_most_recent_token()['token'],
|
||||
'name': form.name.data, 'email': form.email.data,
|
||||
'organization': form.organization.data,
|
||||
'inactive': not form.active.data,
|
||||
'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else ''
|
||||
}
|
||||
store_token(token_data)
|
||||
return redirect('/tokens')
|
||||
# store data in session cookie
|
||||
session['token'] = current_app.door.get_most_recent_token()['token']
|
||||
session['name'] = form.name.data
|
||||
session['email'] = form.email.data
|
||||
session['organization'] = form.organization.data
|
||||
if form.limit_validity.data:
|
||||
session['valid_thru'] = form.valid_thru.data.isoformat()
|
||||
else:
|
||||
session['valid_thru'] = ''
|
||||
session['inactive'] = not form.active.data
|
||||
return redirect('/store-token')
|
||||
else:
|
||||
flash(f'Token konnte nicht registiert werden. Fehler: {form.errors}')
|
||||
return render_template('register.html', token=recent_token, form=form)
|
||||
return render_template('register.html', token=recent_token, form=form)
|
||||
|
||||
|
||||
@door_app.route('/edit-token/<token>', methods=['GET', 'POST'])
|
||||
|
@ -384,21 +366,44 @@ def edit_token(token):
|
|||
return redirect('/tokens')
|
||||
elif request.method == 'POST':
|
||||
if form.validate():
|
||||
# store data in token_data cookie
|
||||
token_data = {'token': token,
|
||||
'name': form.name.data,
|
||||
'organization': form.organization.data,
|
||||
'email': form.email.data,
|
||||
'inactive': not form.active.data,
|
||||
'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else ''
|
||||
}
|
||||
store_token(token_data)
|
||||
return redirect('/tokens')
|
||||
# store data in session cookie
|
||||
session['token'] = token
|
||||
session['name'] = form.name.data
|
||||
session['organization'] = form.organization.data
|
||||
session['email'] = form.email.data
|
||||
if form.limit_validity.data:
|
||||
session['valid_thru'] = form.valid_thru.data.isoformat()
|
||||
else:
|
||||
session['valid_thru'] = ''
|
||||
session['inactive'] = not form.active.data
|
||||
return redirect(f'/store-token')
|
||||
else:
|
||||
flash(f'Token konnte nicht editiert werden. Fehler: {form.errors}')
|
||||
return render_template('edit.html', token=token, form=form)
|
||||
|
||||
|
||||
@door_app.route('/store-token')
|
||||
@roles_required('admin')
|
||||
def store_token():
|
||||
"""Store token to the token file on disk.
|
||||
|
||||
This will use the token id and the associated data stored in the session cookie (filled by register_token() or
|
||||
edit_token()) and create/modify a token and store the new token file to disk.
|
||||
"""
|
||||
token = session['token']
|
||||
tokens = current_app.door.get_tokens()
|
||||
tokens[token] = {'name': session['name'],
|
||||
'email': session['email'],
|
||||
'valid_thru': session['valid_thru'],
|
||||
'inactive': session['inactive'],
|
||||
'organization': session['organization']}
|
||||
try:
|
||||
current_app.door.store_tokens(tokens)
|
||||
current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}")
|
||||
except Exception as e:
|
||||
flash(f"Error during store_tokens. Exception: {e}")
|
||||
return redirect('/tokens')
|
||||
|
||||
|
||||
@door_app.route('/delete-token/<token>', methods=['GET', 'POST'])
|
||||
@roles_required('admin')
|
||||
def delete_token(token):
|
||||
|
|
|
@ -1,56 +0,0 @@
|
|||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
TESTING = True
|
||||
DEBUG = True
|
||||
LOG_FILE = 'webapp.log'
|
||||
|
||||
token_file = tempfile.NamedTemporaryFile(delete=False).name
|
||||
Path(token_file).write_text(
|
||||
"""# token | name | organization | email | valid_thru
|
||||
#042979fa186280||||
|
||||
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
|
||||
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
|
||||
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
|
||||
"""
|
||||
)
|
||||
TOKEN_FILE = str(token_file)
|
||||
|
||||
SECRET_KEY = 'supersecret'
|
||||
SECURITY_PASSWORD_SALT = 'salty'
|
||||
|
||||
admin_file = tempfile.NamedTemporaryFile(delete=False).name
|
||||
Path(admin_file).write_text(
|
||||
"""# create new super-admin by putting the following data in this file:
|
||||
# username email ldap/password
|
||||
# the third column can either be LDAP, in which case the user will be able to authenticate through LDAP
|
||||
# or if it's not LDAP then it should be the password for the new user which will be stored in the local database
|
||||
# examples:
|
||||
gandalf gandalf@shire.me shadowfax
|
||||
"""
|
||||
)
|
||||
ADMIN_FILE = str(admin_file)
|
||||
|
||||
db_file = tempfile.NamedTemporaryFile(delete=False).name
|
||||
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + str(db_file)
|
||||
|
||||
token_log_file = tempfile.NamedTemporaryFile(delete=False).name
|
||||
Path(token_log_file).write_text(
|
||||
"""1970-01-01 00:00:12,947 - nfc_log - INFO - Loading tokens
|
||||
1970-01-01 00:00:12,961 - nfc_log - DEBUG - Opening control socket
|
||||
1970-01-01 00:00:24,259 - nfc_log - INFO - Got connection
|
||||
1970-01-01 00:00:28,052 - nfc_log - INFO - Connected to mqtt host with result 0
|
||||
2021-04-17 13:05:09,344 - nfc_log - DEBUG - Control socket opening door
|
||||
2021-04-17 13:05:09,346 - nfc_log - INFO - Opening the door
|
||||
2021-04-17 13:05:25,684 - nfc_log - DEBUG - Control socket closing door
|
||||
2021-04-17 13:05:25,685 - nfc_log - INFO - Closing the door
|
||||
2021-04-17 13:08:57,121 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
|
||||
2021-04-17 13:08:57,123 - nfc_log - INFO - Closing the door
|
||||
2021-04-17 13:09:06,207 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
|
||||
2021-04-17 13:09:06,209 - nfc_log - INFO - Opening the door
|
||||
2021-04-17 13:09:09,017 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
|
||||
2021-04-17 13:09:09,019 - nfc_log - ERROR - Opening the door
|
||||
2021-04-17 13:09:43,272 - nfc_log - DEBUG - Reloading tokens
|
||||
"""
|
||||
)
|
||||
NFC_LOG = str(token_log_file)
|
|
@ -1,12 +1,7 @@
|
|||
import datetime
|
||||
|
||||
import pytest
|
||||
from bs4 import BeautifulSoup
|
||||
from flask_security.utils import find_user
|
||||
from imaginaerraum_door_admin.door_handle import DoorHandle
|
||||
import re
|
||||
import secrets
|
||||
|
||||
|
||||
def test_login(browser, live_server):
|
||||
response = browser.get(f'http://localhost:{live_server.port}')
|
||||
|
@ -15,15 +10,15 @@ def test_login(browser, live_server):
|
|||
|
||||
response = browser.get(f'http://localhost:{live_server.port}/login')
|
||||
|
||||
email_form = browser.find_element('id', 'email').send_keys('gandalf@shire.me')
|
||||
password_form = browser.find_element('id', 'password').send_keys('shadowfax')
|
||||
submit_button = browser.find_element('id', 'submit').click()
|
||||
email_form = browser.find_element_by_id('email').send_keys('gandalf@shire.me')
|
||||
password_form = browser.find_element_by_id('password').send_keys('shadowfax')
|
||||
submit_button = browser.find_element_by_id('submit').click()
|
||||
|
||||
assert 'Tür öffnen' in browser.page_source
|
||||
|
||||
|
||||
def extract_csrf_token(response):
|
||||
soup = BeautifulSoup(response.data, 'html.parser')
|
||||
soup = BeautifulSoup(response.data)
|
||||
csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value']
|
||||
return csrf_token
|
||||
|
||||
|
@ -44,7 +39,7 @@ def headless_login(client, user='gandalf@shire.me', password='shadowfax'):
|
|||
|
||||
def test_login_headless(client):
|
||||
response = headless_login(client)
|
||||
soup = BeautifulSoup(response.data, 'html.parser')
|
||||
soup = BeautifulSoup(response.data)
|
||||
|
||||
# make sure login succeeded -> Tür öffnen button will appear
|
||||
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
||||
|
@ -58,30 +53,24 @@ def client_authenticated(client):
|
|||
yield client
|
||||
|
||||
|
||||
@pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')])
|
||||
def test_access_door_button(client_authenticated, mocker, url, function):
|
||||
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
|
||||
def test_open_door_button(client_authenticated, mocker):
|
||||
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.open_door')
|
||||
|
||||
# visit route for open
|
||||
client_authenticated.get(url)
|
||||
client_authenticated.get('/open')
|
||||
|
||||
# make sure the open method was called
|
||||
getattr(DoorHandle, function).assert_called_once_with(user='gandalf')
|
||||
DoorHandle.open_door.assert_called_once_with(user='gandalf')
|
||||
|
||||
|
||||
@pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')])
|
||||
def test_access_door_unauthenticated(client, mocker, url, function):
|
||||
# test for trying to visit opening/closing door while not logged in
|
||||
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
|
||||
def test_close_door_button(client_authenticated, mocker):
|
||||
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.close_door')
|
||||
|
||||
# visit route for open
|
||||
response = client.get(url, follow_redirects=True)
|
||||
client_authenticated.get('/close')
|
||||
|
||||
# we should get redirected to login page
|
||||
assert 'login' in response.request.url
|
||||
|
||||
# the open door function should not be called
|
||||
getattr(DoorHandle, function).assert_not_called()
|
||||
# make sure the open method was called
|
||||
DoorHandle.close_door.assert_called_once_with(user='gandalf')
|
||||
|
||||
|
||||
def test_manage_admins(client_authenticated):
|
||||
|
@ -93,14 +82,14 @@ def test_manage_admins(client_authenticated):
|
|||
assert "gandalf@shire.me" in response.data.decode()
|
||||
|
||||
|
||||
def create_user(client_authenticated, username, email):
|
||||
def test_create_admin(client_authenticated):
|
||||
# visit admin management page
|
||||
response = client_authenticated.get('/manage_admins')
|
||||
csrf_token = extract_csrf_token(response)
|
||||
|
||||
# post data for creating a new admin
|
||||
payload = {'name': username,
|
||||
'email': email,
|
||||
payload = {'name': 'bilbo',
|
||||
'email': 'bilbo@shire.me',
|
||||
'csrf_token': csrf_token}
|
||||
response = client_authenticated.post('/manage_admins', data=payload,
|
||||
follow_redirects=True)
|
||||
|
@ -113,226 +102,18 @@ def create_user(client_authenticated, username, email):
|
|||
assert match is not None
|
||||
extracted_password = match['password']
|
||||
|
||||
return extracted_password
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_user(client_authenticated):
|
||||
"""Fixture for creating a temporary user for testing"""
|
||||
username = secrets.token_hex(4)
|
||||
email = username + '@example.com'
|
||||
|
||||
# create user for test
|
||||
password = create_user(client_authenticated, username, email)
|
||||
|
||||
return {'username': username,
|
||||
'email': email,
|
||||
'password': password}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_admin(client_authenticated):
|
||||
"""Fixture for creating a temporary admin user for testing"""
|
||||
username = secrets.token_hex(4)
|
||||
email = username + '@example.com'
|
||||
|
||||
# create user for test
|
||||
password = create_user(client_authenticated, username, email)
|
||||
|
||||
response = client_authenticated.get(
|
||||
f"/promote_admin/{username}",
|
||||
follow_redirects=True)
|
||||
user = find_user(username)
|
||||
assert user.has_role('admin')
|
||||
|
||||
return {'username': username,
|
||||
'email': email,
|
||||
'password': password}
|
||||
|
||||
|
||||
def test_create_admin(client_authenticated):
|
||||
password = create_user(client_authenticated, 'bilbo', 'bilbo@shire.me')
|
||||
|
||||
# log out current user
|
||||
response = client_authenticated.get('/logout')
|
||||
|
||||
# try to log in new user using the extracted password
|
||||
response = headless_login(client_authenticated, user='bilbo',
|
||||
password=password)
|
||||
password=extracted_password)
|
||||
# - see if it works
|
||||
soup = BeautifulSoup(response.data, 'html.parser')
|
||||
soup = BeautifulSoup(response.data)
|
||||
|
||||
# make sure login succeeded
|
||||
# -> username should be displayed
|
||||
assert 'Benutzer <span>bilbo</span>' in soup.decode()
|
||||
# -> Tür öffnen button will appear
|
||||
assert any(['Tür öffnen' in link.contents[0] for link in
|
||||
soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
||||
|
||||
|
||||
def test_activate_deactivate_user(temp_user, client_authenticated):
|
||||
response = client_authenticated.get('/admin_toggle_active/nosuchuser',
|
||||
follow_redirects=True)
|
||||
assert 'Ungültiger Nutzer' in response.data.decode()
|
||||
|
||||
# deactivate the user
|
||||
response = client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
# make sure the user is now inactive
|
||||
user = find_user(temp_user['username'])
|
||||
assert user is not None
|
||||
assert not user.active
|
||||
|
||||
# activate the user again
|
||||
client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
|
||||
# now the user should be active again
|
||||
user = find_user(temp_user['username'])
|
||||
assert user is not None
|
||||
assert user.active
|
||||
|
||||
|
||||
def test_delete_admin(temp_user, client_authenticated):
|
||||
# first we test deleting a non-existing user
|
||||
response = client_authenticated.post('/delete_admins/nosuchuser',
|
||||
follow_redirects=True)
|
||||
assert 'Ungültiger Nutzer' in response.data.decode()
|
||||
|
||||
# next, we create a temporary user and try to delete that one
|
||||
response = client_authenticated.post(f"/delete_admins/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
|
||||
# we need to deactivate the user first
|
||||
assert 'Bitte den Benutzer zuerst deaktivieren.' in response.data.decode()
|
||||
# make sure the user still exists
|
||||
user = find_user(temp_user['username'])
|
||||
assert user is not None
|
||||
|
||||
# deactivate the user and try deleting it again
|
||||
response = client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
|
||||
# try deleting it without filling in the confirmation form
|
||||
response = client_authenticated.post(f"/delete_admins/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
assert 'Der eingegebene Nutzername stimmt nicht überein' in response.data.decode()
|
||||
# make sure the user still exists
|
||||
user = find_user(temp_user['username'])
|
||||
assert user is not None
|
||||
|
||||
# now we send the confirmation data with the request
|
||||
response = client_authenticated.get(f"/delete_admins/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
csrf_token = extract_csrf_token(response)
|
||||
payload = {'name': temp_user['username'], 'csrf_token': csrf_token}
|
||||
response = client_authenticated.post(
|
||||
f"/delete_admins/{temp_user['username']}",
|
||||
data=payload,
|
||||
follow_redirects=True)
|
||||
assert f"Benutzer {temp_user['username']} wurde gelöscht." in response.data.decode()
|
||||
|
||||
# make sure the user now is gone
|
||||
user = find_user(temp_user['username'])
|
||||
assert user is None
|
||||
|
||||
|
||||
def test_promote_user(temp_user, client_authenticated):
|
||||
# first we test with a non-existing user
|
||||
response = client_authenticated.get('/promote_admin/nosuchuser',
|
||||
follow_redirects=True)
|
||||
assert 'Ungültiger Nutzer' in response.data.decode()
|
||||
|
||||
user = find_user(temp_user['username'])
|
||||
assert user is not None
|
||||
assert not user.has_role('admin')
|
||||
# grant admin permissions to test user
|
||||
response = client_authenticated.get(f"/promote_admin/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
assert user.has_role('admin')
|
||||
|
||||
# try granting admin permissions again
|
||||
response = client_authenticated.get(f"/promote_admin/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
assert f"Benutzer {temp_user['username']} hat bereits Admin-Rechte!"
|
||||
assert user.has_role('admin')
|
||||
|
||||
|
||||
def test_demote_user(temp_admin, client_authenticated):
|
||||
# first we test with a non-existing user
|
||||
response = client_authenticated.get('/demote_admin/nosuchuser',
|
||||
follow_redirects=True)
|
||||
assert 'Ungültiger Nutzer' in response.data.decode()
|
||||
|
||||
user = find_user(temp_admin['username'])
|
||||
assert user.has_role('admin')
|
||||
# try removing admin permissions
|
||||
response = client_authenticated.get(f"/demote_admin/{temp_admin['username']}",
|
||||
follow_redirects=True)
|
||||
assert not user.has_role('admin')
|
||||
|
||||
# try removing admin permissions
|
||||
response = client_authenticated.get(f"/demote_admin/{temp_admin['username']}",
|
||||
follow_redirects=True)
|
||||
assert f"Benutzer {temp_admin['username']} ist bereits kein Admin!"
|
||||
assert not user.has_role('admin')
|
||||
|
||||
|
||||
def test_list_tokens(client_authenticated):
|
||||
response = client_authenticated.get(f"/tokens", follow_redirects=True)
|
||||
|
||||
# make sure the names for the test tokens are displayed
|
||||
assert all([user in response.data.decode()
|
||||
for user in ['Gandalf', 'Bilbo', 'Gimli']])
|
||||
|
||||
|
||||
def test_token_log(client_authenticated):
|
||||
response = client_authenticated.get(f"/token-log", follow_redirects=True)
|
||||
|
||||
page_src = response.data.decode()
|
||||
|
||||
# perform some checks for displayed log data
|
||||
assert all([msg in page_src for msg in
|
||||
['INFO', 'DEBUG', 'ERROR']])
|
||||
assert "Valid token 04538cfa186280 of Gandalf" in page_src
|
||||
assert "2021-04-17 13:09:06,207" in page_src
|
||||
|
||||
|
||||
def test_register_token(client_authenticated, mocker):
|
||||
# test to make sure message is displayed when no tokens were recently scanned
|
||||
response = client_authenticated.get(f"/register-token", follow_redirects=True)
|
||||
page_src = response.data.decode()
|
||||
assert 'Keine unregistrierten Tokens in MQTT Nachrichten.' in page_src
|
||||
|
||||
# mockup scanned token
|
||||
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.get_most_recent_token',
|
||||
lambda x: {'timestamp': datetime.datetime.now(),
|
||||
'token': '042979fa181280'})
|
||||
response = client_authenticated.get(f"/register-token", follow_redirects=True)
|
||||
csrf_token = extract_csrf_token(response)
|
||||
page_src = response.data.decode()
|
||||
assert 'Unregistrierter Token gelesen' in page_src
|
||||
assert '042979fa181280' in page_src
|
||||
|
||||
# try registering with incomplete data
|
||||
response = client_authenticated.post(f"/register-token", data={},
|
||||
follow_redirects=True)
|
||||
page_src = response.data.decode()
|
||||
assert 'Token konnte nicht registiert werden' in page_src
|
||||
|
||||
# register the mocked token
|
||||
payload = {'name': 'Legolas',
|
||||
'organization': 'Elves',
|
||||
'email': 'legolas@mirkwood.me',
|
||||
'dsgvo': True,
|
||||
'csrf_token': csrf_token}
|
||||
response = client_authenticated.post(f"/register-token", data=payload,
|
||||
follow_redirects=True)
|
||||
page_src = response.data.decode()
|
||||
|
||||
# make sure the user info for the new token is displayed
|
||||
assert '042979fa181280' in page_src
|
||||
assert 'Legolas' in page_src
|
||||
assert 'Elves' in page_src
|
||||
assert 'legolas@mirkwood.me' in page_src
|
||||
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user