Compare commits
7 Commits
2879a69445
...
a621b6bb78
Author | SHA1 | Date | |
---|---|---|---|
a621b6bb78 | |||
ec1e843f57 | |||
3caf17c861 | |||
f1eaf8af4e | |||
4b3aed25d2 | |||
080ea0f3b0 | |||
ee6ee0e111 |
|
@ -1,7 +1,6 @@
|
||||||
import logging
|
import logging
|
||||||
from flask import Flask
|
from flask import Flask
|
||||||
from flask_sqlalchemy import SQLAlchemy
|
from flask_sqlalchemy import SQLAlchemy
|
||||||
from flask_security.models import fsqla_v2 as fsqla
|
|
||||||
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
|
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
|
||||||
from email_validator import validate_email
|
from email_validator import validate_email
|
||||||
|
|
||||||
|
@ -11,7 +10,6 @@ from pathlib import Path
|
||||||
|
|
||||||
#from .webapp import door_app
|
#from .webapp import door_app
|
||||||
from .door_handle import DoorHandle
|
from .door_handle import DoorHandle
|
||||||
from .auth import ExtendedLoginForm
|
|
||||||
|
|
||||||
security = Security()
|
security = Security()
|
||||||
db = SQLAlchemy()
|
db = SQLAlchemy()
|
||||||
|
@ -147,21 +145,14 @@ def create_app():
|
||||||
# Create database connection object
|
# Create database connection object
|
||||||
db.init_app(app)
|
db.init_app(app)
|
||||||
|
|
||||||
# Define models
|
|
||||||
fsqla.FsModels.set_db_info(db)
|
|
||||||
|
|
||||||
class Role(db.Model, fsqla.FsRoleMixin):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class User(db.Model, fsqla.FsUserMixin):
|
|
||||||
pass
|
|
||||||
|
|
||||||
from . webapp import door_app
|
from . webapp import door_app
|
||||||
app.register_blueprint(door_app)
|
app.register_blueprint(door_app)
|
||||||
|
|
||||||
ldap_server = ldap3.Server(app.config['LDAP_URL'])
|
ldap_server = ldap3.Server(app.config['LDAP_URL'])
|
||||||
|
|
||||||
# Setup Flask-Security
|
# Setup Flask-Security
|
||||||
|
from .auth import ExtendedLoginForm, User, Role
|
||||||
|
|
||||||
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
||||||
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
|
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
|
||||||
|
|
||||||
|
|
|
@ -3,9 +3,24 @@ from flask import current_app
|
||||||
from flask_security import hash_password
|
from flask_security import hash_password
|
||||||
from flask_security.forms import LoginForm, Required, PasswordField
|
from flask_security.forms import LoginForm, Required, PasswordField
|
||||||
from flask_security.utils import find_user
|
from flask_security.utils import find_user
|
||||||
|
from flask_security.models import fsqla_v2 as fsqla
|
||||||
|
|
||||||
import ldap3
|
import ldap3
|
||||||
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
|
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
|
||||||
|
|
||||||
|
from imaginaerraum_door_admin import db, security
|
||||||
|
|
||||||
|
# Define models
|
||||||
|
fsqla.FsModels.set_db_info(db)
|
||||||
|
|
||||||
|
|
||||||
|
class Role(db.Model, fsqla.FsRoleMixin):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class User(db.Model, fsqla.FsUserMixin):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class ExtendedLoginForm(LoginForm):
|
class ExtendedLoginForm(LoginForm):
|
||||||
email = StringField('Benutzername oder E-Mail', [Required()])
|
email = StringField('Benutzername oder E-Mail', [Required()])
|
||||||
|
@ -35,8 +50,8 @@ class ExtendedLoginForm(LoginForm):
|
||||||
user.email = new_user_data['email']
|
user.email = new_user_data['email']
|
||||||
user.password = new_user_data['password']
|
user.password = new_user_data['password']
|
||||||
for role in new_user_data['roles']:
|
for role in new_user_data['roles']:
|
||||||
user_datastore.add_role_to_user(user, role)
|
security.datastore.add_role_to_user(user, role)
|
||||||
user_datastore.commit()
|
security.datastore.commit()
|
||||||
self.user = user
|
self.user = user
|
||||||
else:
|
else:
|
||||||
self.password.errors = ['Invalid password']
|
self.password.errors = ['Invalid password']
|
||||||
|
@ -49,15 +64,16 @@ class ExtendedLoginForm(LoginForm):
|
||||||
|
|
||||||
if authorized:
|
if authorized:
|
||||||
# if there was no user in the database before we create a new user
|
# if there was no user in the database before we create a new user
|
||||||
self.user = user_datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
|
self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
|
||||||
password=new_user_data['password'], roles=new_user_data['roles'])
|
password=new_user_data['password'], roles=new_user_data['roles'])
|
||||||
user_datastore.commit()
|
security.datastore.commit()
|
||||||
current_app.logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
|
current_app.logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
|
||||||
" successful LDAP authorization")
|
" successful LDAP authorization")
|
||||||
|
|
||||||
# if any of the authorization methods is successful we authorize the user
|
# if any of the authorization methods is successful we authorize the user
|
||||||
return authorized
|
return authorized
|
||||||
|
|
||||||
|
|
||||||
def validate_ldap(username, password):
|
def validate_ldap(username, password):
|
||||||
"""Validate the user and password through an LDAP server.
|
"""Validate the user and password through an LDAP server.
|
||||||
|
|
||||||
|
|
|
@ -163,6 +163,7 @@ def promote_admin(username):
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return redirect('/manage_admins')
|
return redirect('/manage_admins')
|
||||||
|
|
||||||
|
|
||||||
@door_app.route('/demote_admin/<username>')
|
@door_app.route('/demote_admin/<username>')
|
||||||
@roles_required('super_admin')
|
@roles_required('super_admin')
|
||||||
def demote_admin(username):
|
def demote_admin(username):
|
||||||
|
@ -281,6 +282,26 @@ def token_log():
|
||||||
return redirect('/')
|
return redirect('/')
|
||||||
|
|
||||||
|
|
||||||
|
def store_token(token_data):
|
||||||
|
"""Store token to the token file on disk.
|
||||||
|
|
||||||
|
This will use the token id and the associated data and create/modify a
|
||||||
|
token and store the new token file to disk.
|
||||||
|
"""
|
||||||
|
token = token_data['token']
|
||||||
|
tokens = current_app.door.get_tokens()
|
||||||
|
tokens[token] = {'name': token_data['name'],
|
||||||
|
'email': token_data['email'],
|
||||||
|
'valid_thru': token_data['valid_thru'],
|
||||||
|
'inactive': token_data['inactive'],
|
||||||
|
'organization': token_data['organization']}
|
||||||
|
try:
|
||||||
|
current_app.door.store_tokens(tokens)
|
||||||
|
current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}")
|
||||||
|
except Exception as e:
|
||||||
|
flash(f"Error during store_tokens. Exception: {e}")
|
||||||
|
|
||||||
|
|
||||||
# routes for registering, editing and deleting tokens
|
# routes for registering, editing and deleting tokens
|
||||||
@door_app.route('/register-token', methods=['GET', 'POST'])
|
@door_app.route('/register-token', methods=['GET', 'POST'])
|
||||||
@roles_required('admin')
|
@roles_required('admin')
|
||||||
|
@ -298,7 +319,7 @@ def register():
|
||||||
recent_token = {}
|
recent_token = {}
|
||||||
if {'token', 'timestamp'}.issubset(set(token.keys())):
|
if {'token', 'timestamp'}.issubset(set(token.keys())):
|
||||||
dt = datetime.utcnow() - token['timestamp']
|
dt = datetime.utcnow() - token['timestamp']
|
||||||
if dt < timedelta(minutes=10):
|
if dt < timedelta(minutes=10):
|
||||||
recent_token = token
|
recent_token = token
|
||||||
recent_token['timedelta_minutes'] = int(dt.total_seconds() / 60.0)
|
recent_token['timedelta_minutes'] = int(dt.total_seconds() / 60.0)
|
||||||
|
|
||||||
|
@ -307,22 +328,19 @@ def register():
|
||||||
# set default valid thru date to today to make sure form validity check passes
|
# set default valid thru date to today to make sure form validity check passes
|
||||||
# (will not be used if limited validity is disabled)
|
# (will not be used if limited validity is disabled)
|
||||||
form.valid_thru.data = date.today()
|
form.valid_thru.data = date.today()
|
||||||
|
|
||||||
return render_template('register.html', token=recent_token, form=form)
|
|
||||||
elif request.method == 'POST' and form.validate():
|
elif request.method == 'POST' and form.validate():
|
||||||
# store data in session cookie
|
token_data = {
|
||||||
session['token'] = current_app.door.get_most_recent_token()['token']
|
'token': current_app.door.get_most_recent_token()['token'],
|
||||||
session['name'] = form.name.data
|
'name': form.name.data, 'email': form.email.data,
|
||||||
session['email'] = form.email.data
|
'organization': form.organization.data,
|
||||||
session['organization'] = form.organization.data
|
'inactive': not form.active.data,
|
||||||
if form.limit_validity.data:
|
'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else ''
|
||||||
session['valid_thru'] = form.valid_thru.data.isoformat()
|
}
|
||||||
else:
|
store_token(token_data)
|
||||||
session['valid_thru'] = ''
|
return redirect('/tokens')
|
||||||
session['inactive'] = not form.active.data
|
|
||||||
return redirect('/store-token')
|
|
||||||
else:
|
else:
|
||||||
return render_template('register.html', token=recent_token, form=form)
|
flash(f'Token konnte nicht registiert werden. Fehler: {form.errors}')
|
||||||
|
return render_template('register.html', token=recent_token, form=form)
|
||||||
|
|
||||||
|
|
||||||
@door_app.route('/edit-token/<token>', methods=['GET', 'POST'])
|
@door_app.route('/edit-token/<token>', methods=['GET', 'POST'])
|
||||||
|
@ -366,44 +384,21 @@ def edit_token(token):
|
||||||
return redirect('/tokens')
|
return redirect('/tokens')
|
||||||
elif request.method == 'POST':
|
elif request.method == 'POST':
|
||||||
if form.validate():
|
if form.validate():
|
||||||
# store data in session cookie
|
# store data in token_data cookie
|
||||||
session['token'] = token
|
token_data = {'token': token,
|
||||||
session['name'] = form.name.data
|
'name': form.name.data,
|
||||||
session['organization'] = form.organization.data
|
'organization': form.organization.data,
|
||||||
session['email'] = form.email.data
|
'email': form.email.data,
|
||||||
if form.limit_validity.data:
|
'inactive': not form.active.data,
|
||||||
session['valid_thru'] = form.valid_thru.data.isoformat()
|
'valid_thru': form.valid_thru.data.isoformat() if form.limit_validity.data else ''
|
||||||
else:
|
}
|
||||||
session['valid_thru'] = ''
|
store_token(token_data)
|
||||||
session['inactive'] = not form.active.data
|
return redirect('/tokens')
|
||||||
return redirect(f'/store-token')
|
|
||||||
else:
|
else:
|
||||||
|
flash(f'Token konnte nicht editiert werden. Fehler: {form.errors}')
|
||||||
return render_template('edit.html', token=token, form=form)
|
return render_template('edit.html', token=token, form=form)
|
||||||
|
|
||||||
|
|
||||||
@door_app.route('/store-token')
|
|
||||||
@roles_required('admin')
|
|
||||||
def store_token():
|
|
||||||
"""Store token to the token file on disk.
|
|
||||||
|
|
||||||
This will use the token id and the associated data stored in the session cookie (filled by register_token() or
|
|
||||||
edit_token()) and create/modify a token and store the new token file to disk.
|
|
||||||
"""
|
|
||||||
token = session['token']
|
|
||||||
tokens = current_app.door.get_tokens()
|
|
||||||
tokens[token] = {'name': session['name'],
|
|
||||||
'email': session['email'],
|
|
||||||
'valid_thru': session['valid_thru'],
|
|
||||||
'inactive': session['inactive'],
|
|
||||||
'organization': session['organization']}
|
|
||||||
try:
|
|
||||||
current_app.door.store_tokens(tokens)
|
|
||||||
current_app.logger.info(f"Token {token} stored in database by admin user {current_user.username}")
|
|
||||||
except Exception as e:
|
|
||||||
flash(f"Error during store_tokens. Exception: {e}")
|
|
||||||
return redirect('/tokens')
|
|
||||||
|
|
||||||
|
|
||||||
@door_app.route('/delete-token/<token>', methods=['GET', 'POST'])
|
@door_app.route('/delete-token/<token>', methods=['GET', 'POST'])
|
||||||
@roles_required('admin')
|
@roles_required('admin')
|
||||||
def delete_token(token):
|
def delete_token(token):
|
||||||
|
|
56
tests/debug_app_config.py
Normal file
56
tests/debug_app_config.py
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
TESTING = True
|
||||||
|
DEBUG = True
|
||||||
|
LOG_FILE = 'webapp.log'
|
||||||
|
|
||||||
|
token_file = tempfile.NamedTemporaryFile(delete=False).name
|
||||||
|
Path(token_file).write_text(
|
||||||
|
"""# token | name | organization | email | valid_thru
|
||||||
|
#042979fa186280||||
|
||||||
|
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
|
||||||
|
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
|
||||||
|
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
TOKEN_FILE = str(token_file)
|
||||||
|
|
||||||
|
SECRET_KEY = 'supersecret'
|
||||||
|
SECURITY_PASSWORD_SALT = 'salty'
|
||||||
|
|
||||||
|
admin_file = tempfile.NamedTemporaryFile(delete=False).name
|
||||||
|
Path(admin_file).write_text(
|
||||||
|
"""# create new super-admin by putting the following data in this file:
|
||||||
|
# username email ldap/password
|
||||||
|
# the third column can either be LDAP, in which case the user will be able to authenticate through LDAP
|
||||||
|
# or if it's not LDAP then it should be the password for the new user which will be stored in the local database
|
||||||
|
# examples:
|
||||||
|
gandalf gandalf@shire.me shadowfax
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
ADMIN_FILE = str(admin_file)
|
||||||
|
|
||||||
|
db_file = tempfile.NamedTemporaryFile(delete=False).name
|
||||||
|
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + str(db_file)
|
||||||
|
|
||||||
|
token_log_file = tempfile.NamedTemporaryFile(delete=False).name
|
||||||
|
Path(token_log_file).write_text(
|
||||||
|
"""1970-01-01 00:00:12,947 - nfc_log - INFO - Loading tokens
|
||||||
|
1970-01-01 00:00:12,961 - nfc_log - DEBUG - Opening control socket
|
||||||
|
1970-01-01 00:00:24,259 - nfc_log - INFO - Got connection
|
||||||
|
1970-01-01 00:00:28,052 - nfc_log - INFO - Connected to mqtt host with result 0
|
||||||
|
2021-04-17 13:05:09,344 - nfc_log - DEBUG - Control socket opening door
|
||||||
|
2021-04-17 13:05:09,346 - nfc_log - INFO - Opening the door
|
||||||
|
2021-04-17 13:05:25,684 - nfc_log - DEBUG - Control socket closing door
|
||||||
|
2021-04-17 13:05:25,685 - nfc_log - INFO - Closing the door
|
||||||
|
2021-04-17 13:08:57,121 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
|
||||||
|
2021-04-17 13:08:57,123 - nfc_log - INFO - Closing the door
|
||||||
|
2021-04-17 13:09:06,207 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
|
||||||
|
2021-04-17 13:09:06,209 - nfc_log - INFO - Opening the door
|
||||||
|
2021-04-17 13:09:09,017 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
|
||||||
|
2021-04-17 13:09:09,019 - nfc_log - ERROR - Opening the door
|
||||||
|
2021-04-17 13:09:43,272 - nfc_log - DEBUG - Reloading tokens
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
NFC_LOG = str(token_log_file)
|
|
@ -1,7 +1,12 @@
|
||||||
|
import datetime
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
|
from flask_security.utils import find_user
|
||||||
from imaginaerraum_door_admin.door_handle import DoorHandle
|
from imaginaerraum_door_admin.door_handle import DoorHandle
|
||||||
import re
|
import re
|
||||||
|
import secrets
|
||||||
|
|
||||||
|
|
||||||
def test_login(browser, live_server):
|
def test_login(browser, live_server):
|
||||||
response = browser.get(f'http://localhost:{live_server.port}')
|
response = browser.get(f'http://localhost:{live_server.port}')
|
||||||
|
@ -10,15 +15,15 @@ def test_login(browser, live_server):
|
||||||
|
|
||||||
response = browser.get(f'http://localhost:{live_server.port}/login')
|
response = browser.get(f'http://localhost:{live_server.port}/login')
|
||||||
|
|
||||||
email_form = browser.find_element_by_id('email').send_keys('gandalf@shire.me')
|
email_form = browser.find_element('id', 'email').send_keys('gandalf@shire.me')
|
||||||
password_form = browser.find_element_by_id('password').send_keys('shadowfax')
|
password_form = browser.find_element('id', 'password').send_keys('shadowfax')
|
||||||
submit_button = browser.find_element_by_id('submit').click()
|
submit_button = browser.find_element('id', 'submit').click()
|
||||||
|
|
||||||
assert 'Tür öffnen' in browser.page_source
|
assert 'Tür öffnen' in browser.page_source
|
||||||
|
|
||||||
|
|
||||||
def extract_csrf_token(response):
|
def extract_csrf_token(response):
|
||||||
soup = BeautifulSoup(response.data)
|
soup = BeautifulSoup(response.data, 'html.parser')
|
||||||
csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value']
|
csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value']
|
||||||
return csrf_token
|
return csrf_token
|
||||||
|
|
||||||
|
@ -39,7 +44,7 @@ def headless_login(client, user='gandalf@shire.me', password='shadowfax'):
|
||||||
|
|
||||||
def test_login_headless(client):
|
def test_login_headless(client):
|
||||||
response = headless_login(client)
|
response = headless_login(client)
|
||||||
soup = BeautifulSoup(response.data)
|
soup = BeautifulSoup(response.data, 'html.parser')
|
||||||
|
|
||||||
# make sure login succeeded -> Tür öffnen button will appear
|
# make sure login succeeded -> Tür öffnen button will appear
|
||||||
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
||||||
|
@ -53,24 +58,30 @@ def client_authenticated(client):
|
||||||
yield client
|
yield client
|
||||||
|
|
||||||
|
|
||||||
def test_open_door_button(client_authenticated, mocker):
|
@pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')])
|
||||||
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.open_door')
|
def test_access_door_button(client_authenticated, mocker, url, function):
|
||||||
|
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
|
||||||
|
|
||||||
# visit route for open
|
# visit route for open
|
||||||
client_authenticated.get('/open')
|
client_authenticated.get(url)
|
||||||
|
|
||||||
# make sure the open method was called
|
# make sure the open method was called
|
||||||
DoorHandle.open_door.assert_called_once_with(user='gandalf')
|
getattr(DoorHandle, function).assert_called_once_with(user='gandalf')
|
||||||
|
|
||||||
|
|
||||||
def test_close_door_button(client_authenticated, mocker):
|
@pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')])
|
||||||
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.close_door')
|
def test_access_door_unauthenticated(client, mocker, url, function):
|
||||||
|
# test for trying to visit opening/closing door while not logged in
|
||||||
|
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
|
||||||
|
|
||||||
# visit route for open
|
# visit route for open
|
||||||
client_authenticated.get('/close')
|
response = client.get(url, follow_redirects=True)
|
||||||
|
|
||||||
# make sure the open method was called
|
# we should get redirected to login page
|
||||||
DoorHandle.close_door.assert_called_once_with(user='gandalf')
|
assert 'login' in response.request.url
|
||||||
|
|
||||||
|
# the open door function should not be called
|
||||||
|
getattr(DoorHandle, function).assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
def test_manage_admins(client_authenticated):
|
def test_manage_admins(client_authenticated):
|
||||||
|
@ -82,14 +93,14 @@ def test_manage_admins(client_authenticated):
|
||||||
assert "gandalf@shire.me" in response.data.decode()
|
assert "gandalf@shire.me" in response.data.decode()
|
||||||
|
|
||||||
|
|
||||||
def test_create_admin(client_authenticated):
|
def create_user(client_authenticated, username, email):
|
||||||
# visit admin management page
|
# visit admin management page
|
||||||
response = client_authenticated.get('/manage_admins')
|
response = client_authenticated.get('/manage_admins')
|
||||||
csrf_token = extract_csrf_token(response)
|
csrf_token = extract_csrf_token(response)
|
||||||
|
|
||||||
# post data for creating a new admin
|
# post data for creating a new admin
|
||||||
payload = {'name': 'bilbo',
|
payload = {'name': username,
|
||||||
'email': 'bilbo@shire.me',
|
'email': email,
|
||||||
'csrf_token': csrf_token}
|
'csrf_token': csrf_token}
|
||||||
response = client_authenticated.post('/manage_admins', data=payload,
|
response = client_authenticated.post('/manage_admins', data=payload,
|
||||||
follow_redirects=True)
|
follow_redirects=True)
|
||||||
|
@ -102,18 +113,226 @@ def test_create_admin(client_authenticated):
|
||||||
assert match is not None
|
assert match is not None
|
||||||
extracted_password = match['password']
|
extracted_password = match['password']
|
||||||
|
|
||||||
|
return extracted_password
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def temp_user(client_authenticated):
|
||||||
|
"""Fixture for creating a temporary user for testing"""
|
||||||
|
username = secrets.token_hex(4)
|
||||||
|
email = username + '@example.com'
|
||||||
|
|
||||||
|
# create user for test
|
||||||
|
password = create_user(client_authenticated, username, email)
|
||||||
|
|
||||||
|
return {'username': username,
|
||||||
|
'email': email,
|
||||||
|
'password': password}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def temp_admin(client_authenticated):
|
||||||
|
"""Fixture for creating a temporary admin user for testing"""
|
||||||
|
username = secrets.token_hex(4)
|
||||||
|
email = username + '@example.com'
|
||||||
|
|
||||||
|
# create user for test
|
||||||
|
password = create_user(client_authenticated, username, email)
|
||||||
|
|
||||||
|
response = client_authenticated.get(
|
||||||
|
f"/promote_admin/{username}",
|
||||||
|
follow_redirects=True)
|
||||||
|
user = find_user(username)
|
||||||
|
assert user.has_role('admin')
|
||||||
|
|
||||||
|
return {'username': username,
|
||||||
|
'email': email,
|
||||||
|
'password': password}
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_admin(client_authenticated):
|
||||||
|
password = create_user(client_authenticated, 'bilbo', 'bilbo@shire.me')
|
||||||
|
|
||||||
# log out current user
|
# log out current user
|
||||||
response = client_authenticated.get('/logout')
|
response = client_authenticated.get('/logout')
|
||||||
|
|
||||||
# try to log in new user using the extracted password
|
# try to log in new user using the extracted password
|
||||||
response = headless_login(client_authenticated, user='bilbo',
|
response = headless_login(client_authenticated, user='bilbo',
|
||||||
password=extracted_password)
|
password=password)
|
||||||
# - see if it works
|
# - see if it works
|
||||||
soup = BeautifulSoup(response.data)
|
soup = BeautifulSoup(response.data, 'html.parser')
|
||||||
|
|
||||||
# make sure login succeeded
|
# make sure login succeeded
|
||||||
# -> username should be displayed
|
# -> username should be displayed
|
||||||
assert 'Benutzer <span>bilbo</span>' in soup.decode()
|
assert 'Benutzer <span>bilbo</span>' in soup.decode()
|
||||||
# -> Tür öffnen button will appear
|
# -> Tür öffnen button will appear
|
||||||
assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
assert any(['Tür öffnen' in link.contents[0] for link in
|
||||||
|
soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
||||||
|
|
||||||
|
|
||||||
|
def test_activate_deactivate_user(temp_user, client_authenticated):
|
||||||
|
response = client_authenticated.get('/admin_toggle_active/nosuchuser',
|
||||||
|
follow_redirects=True)
|
||||||
|
assert 'Ungültiger Nutzer' in response.data.decode()
|
||||||
|
|
||||||
|
# deactivate the user
|
||||||
|
response = client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
|
||||||
|
follow_redirects=True)
|
||||||
|
# make sure the user is now inactive
|
||||||
|
user = find_user(temp_user['username'])
|
||||||
|
assert user is not None
|
||||||
|
assert not user.active
|
||||||
|
|
||||||
|
# activate the user again
|
||||||
|
client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
|
||||||
|
follow_redirects=True)
|
||||||
|
|
||||||
|
# now the user should be active again
|
||||||
|
user = find_user(temp_user['username'])
|
||||||
|
assert user is not None
|
||||||
|
assert user.active
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_admin(temp_user, client_authenticated):
|
||||||
|
# first we test deleting a non-existing user
|
||||||
|
response = client_authenticated.post('/delete_admins/nosuchuser',
|
||||||
|
follow_redirects=True)
|
||||||
|
assert 'Ungültiger Nutzer' in response.data.decode()
|
||||||
|
|
||||||
|
# next, we create a temporary user and try to delete that one
|
||||||
|
response = client_authenticated.post(f"/delete_admins/{temp_user['username']}",
|
||||||
|
follow_redirects=True)
|
||||||
|
|
||||||
|
# we need to deactivate the user first
|
||||||
|
assert 'Bitte den Benutzer zuerst deaktivieren.' in response.data.decode()
|
||||||
|
# make sure the user still exists
|
||||||
|
user = find_user(temp_user['username'])
|
||||||
|
assert user is not None
|
||||||
|
|
||||||
|
# deactivate the user and try deleting it again
|
||||||
|
response = client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
|
||||||
|
follow_redirects=True)
|
||||||
|
|
||||||
|
# try deleting it without filling in the confirmation form
|
||||||
|
response = client_authenticated.post(f"/delete_admins/{temp_user['username']}",
|
||||||
|
follow_redirects=True)
|
||||||
|
assert 'Der eingegebene Nutzername stimmt nicht überein' in response.data.decode()
|
||||||
|
# make sure the user still exists
|
||||||
|
user = find_user(temp_user['username'])
|
||||||
|
assert user is not None
|
||||||
|
|
||||||
|
# now we send the confirmation data with the request
|
||||||
|
response = client_authenticated.get(f"/delete_admins/{temp_user['username']}",
|
||||||
|
follow_redirects=True)
|
||||||
|
csrf_token = extract_csrf_token(response)
|
||||||
|
payload = {'name': temp_user['username'], 'csrf_token': csrf_token}
|
||||||
|
response = client_authenticated.post(
|
||||||
|
f"/delete_admins/{temp_user['username']}",
|
||||||
|
data=payload,
|
||||||
|
follow_redirects=True)
|
||||||
|
assert f"Benutzer {temp_user['username']} wurde gelöscht." in response.data.decode()
|
||||||
|
|
||||||
|
# make sure the user now is gone
|
||||||
|
user = find_user(temp_user['username'])
|
||||||
|
assert user is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_promote_user(temp_user, client_authenticated):
|
||||||
|
# first we test with a non-existing user
|
||||||
|
response = client_authenticated.get('/promote_admin/nosuchuser',
|
||||||
|
follow_redirects=True)
|
||||||
|
assert 'Ungültiger Nutzer' in response.data.decode()
|
||||||
|
|
||||||
|
user = find_user(temp_user['username'])
|
||||||
|
assert user is not None
|
||||||
|
assert not user.has_role('admin')
|
||||||
|
# grant admin permissions to test user
|
||||||
|
response = client_authenticated.get(f"/promote_admin/{temp_user['username']}",
|
||||||
|
follow_redirects=True)
|
||||||
|
assert user.has_role('admin')
|
||||||
|
|
||||||
|
# try granting admin permissions again
|
||||||
|
response = client_authenticated.get(f"/promote_admin/{temp_user['username']}",
|
||||||
|
follow_redirects=True)
|
||||||
|
assert f"Benutzer {temp_user['username']} hat bereits Admin-Rechte!"
|
||||||
|
assert user.has_role('admin')
|
||||||
|
|
||||||
|
|
||||||
|
def test_demote_user(temp_admin, client_authenticated):
|
||||||
|
# first we test with a non-existing user
|
||||||
|
response = client_authenticated.get('/demote_admin/nosuchuser',
|
||||||
|
follow_redirects=True)
|
||||||
|
assert 'Ungültiger Nutzer' in response.data.decode()
|
||||||
|
|
||||||
|
user = find_user(temp_admin['username'])
|
||||||
|
assert user.has_role('admin')
|
||||||
|
# try removing admin permissions
|
||||||
|
response = client_authenticated.get(f"/demote_admin/{temp_admin['username']}",
|
||||||
|
follow_redirects=True)
|
||||||
|
assert not user.has_role('admin')
|
||||||
|
|
||||||
|
# try removing admin permissions
|
||||||
|
response = client_authenticated.get(f"/demote_admin/{temp_admin['username']}",
|
||||||
|
follow_redirects=True)
|
||||||
|
assert f"Benutzer {temp_admin['username']} ist bereits kein Admin!"
|
||||||
|
assert not user.has_role('admin')
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_tokens(client_authenticated):
|
||||||
|
response = client_authenticated.get(f"/tokens", follow_redirects=True)
|
||||||
|
|
||||||
|
# make sure the names for the test tokens are displayed
|
||||||
|
assert all([user in response.data.decode()
|
||||||
|
for user in ['Gandalf', 'Bilbo', 'Gimli']])
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_log(client_authenticated):
|
||||||
|
response = client_authenticated.get(f"/token-log", follow_redirects=True)
|
||||||
|
|
||||||
|
page_src = response.data.decode()
|
||||||
|
|
||||||
|
# perform some checks for displayed log data
|
||||||
|
assert all([msg in page_src for msg in
|
||||||
|
['INFO', 'DEBUG', 'ERROR']])
|
||||||
|
assert "Valid token 04538cfa186280 of Gandalf" in page_src
|
||||||
|
assert "2021-04-17 13:09:06,207" in page_src
|
||||||
|
|
||||||
|
|
||||||
|
def test_register_token(client_authenticated, mocker):
|
||||||
|
# test to make sure message is displayed when no tokens were recently scanned
|
||||||
|
response = client_authenticated.get(f"/register-token", follow_redirects=True)
|
||||||
|
page_src = response.data.decode()
|
||||||
|
assert 'Keine unregistrierten Tokens in MQTT Nachrichten.' in page_src
|
||||||
|
|
||||||
|
# mockup scanned token
|
||||||
|
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.get_most_recent_token',
|
||||||
|
lambda x: {'timestamp': datetime.datetime.now(),
|
||||||
|
'token': '042979fa181280'})
|
||||||
|
response = client_authenticated.get(f"/register-token", follow_redirects=True)
|
||||||
|
csrf_token = extract_csrf_token(response)
|
||||||
|
page_src = response.data.decode()
|
||||||
|
assert 'Unregistrierter Token gelesen' in page_src
|
||||||
|
assert '042979fa181280' in page_src
|
||||||
|
|
||||||
|
# try registering with incomplete data
|
||||||
|
response = client_authenticated.post(f"/register-token", data={},
|
||||||
|
follow_redirects=True)
|
||||||
|
page_src = response.data.decode()
|
||||||
|
assert 'Token konnte nicht registiert werden' in page_src
|
||||||
|
|
||||||
|
# register the mocked token
|
||||||
|
payload = {'name': 'Legolas',
|
||||||
|
'organization': 'Elves',
|
||||||
|
'email': 'legolas@mirkwood.me',
|
||||||
|
'dsgvo': True,
|
||||||
|
'csrf_token': csrf_token}
|
||||||
|
response = client_authenticated.post(f"/register-token", data=payload,
|
||||||
|
follow_redirects=True)
|
||||||
|
page_src = response.data.decode()
|
||||||
|
|
||||||
|
# make sure the user info for the new token is displayed
|
||||||
|
assert '042979fa181280' in page_src
|
||||||
|
assert 'Legolas' in page_src
|
||||||
|
assert 'Elves' in page_src
|
||||||
|
assert 'legolas@mirkwood.me' in page_src
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user