import datetime import pytest from bs4 import BeautifulSoup from flask_security.utils import lookup_identity from imaginaerraum_door_admin.door_handle import DoorHandle from imaginaerraum_door_admin.auth import ExtendedLoginForm import re import secrets import pathlib import json def test_login(browser, live_server): response = browser.get(f'http://localhost:{live_server.port}') assert '

Space Zugangsverwaltung

' in browser.page_source response = browser.get(f'http://localhost:{live_server.port}/login') email_form = browser.find_element_by_xpath('//input[@id="email"]') email_form.send_keys('gandalf@shire.me') password_form = browser.find_element_by_xpath('//input[@id="password"]') password_form.send_keys('shadowfax') submit_button = browser.find_element_by_xpath('//input[@id="submit"]') submit_button.click() assert 'Tür öffnen' in browser.page_source def extract_csrf_token(response): soup = BeautifulSoup(response.data, 'html.parser') csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value'] return csrf_token def headless_login(client, user='gandalf@shire.me', password='shadowfax'): # extract csrf token from the login page source response = client.get('/login', follow_redirects=True) csrf_token = extract_csrf_token(response) # send login information payload = { 'csrf_token': csrf_token, 'email': user, 'password': password } return client.post('/login', data=payload, follow_redirects=True) def test_login_headless(client): response = headless_login(client) soup = BeautifulSoup(response.data, 'html.parser') # make sure login succeeded -> Tür öffnen button will appear assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})]) def test_validate_ldap(client, mocker): # mock ldap Connection object to simulate successful authentication import ldap3 def init_success(self, *args, **kwargs): pass def init_LDAPBindError(self, *args, **kwargs): raise ldap3.core.exceptions.LDAPBindError() def init_LDAPSocketOpenError(self, *args, **kwargs): raise ldap3.core.exceptions.LDAPSocketOpenError() def init_Exception(self, *args, **kwargs): raise Exception() def search_success(self, *args, **kwargs): return True def search_failure(self, *args, **kwargs): return False mocker.patch.object(ldap3.Connection, '__init__', init_success) mocker.patch.object(ldap3.Connection, 'search', search_success) mock_entries = mocker.MagicMock() mock_entries[0].mail.value = 'user@example.com' mocker.patch.object(ldap3.Connection, 'entries', mock_entries) with client.application.app_context(): # test successful login form = ExtendedLoginForm() form.email.data = 'user' form.password.data = 'password' result = form.validate_ldap() assert result[0] assert result[1]['username'] == 'user' assert result[1]['email'] == 'user@example.com' assert result[1]['roles'] == ['admin'] # test failing ldap search mocker.patch.object(ldap3.Connection, 'search', search_failure) result = form.validate_ldap() assert not result[0] assert result[1] is None # test some errors in ldap Connection and authentication mocker.patch.object(ldap3.Connection, '__init__', init_LDAPBindError) result = form.validate_ldap() assert not result[0] assert result[1] is None mocker.patch.object(ldap3.Connection, '__init__', init_LDAPSocketOpenError) result = form.validate_ldap() assert not result[0] assert result[1] is None mocker.patch.object(ldap3.Connection, '__init__', init_Exception) result = form.validate_ldap() assert not result[0] assert result[1] is None def test_login_ldap(client, temp_user, mocker): # mock ldap validation for admin user def mock_validate(self): auth = self.email.data == temp_user['username'] and self.password.data == temp_user['password'] user_data = {'username': temp_user['username'], 'email': temp_user['email'], 'roles': ['admin'], 'password': temp_user['password']} return auth, user_data mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate) user = lookup_identity(temp_user['username']) # remove local role so that ldap authentication is the default user.roles.pop(0) # log out admin user client.get('/logout') # log in temp user using ldap response = headless_login(client, user=temp_user['username'], password=temp_user['password']) soup = BeautifulSoup(response.data, 'html.parser') # make sure login succeeded -> Tür öffnen button will appear assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})]) def test_login_ldap_new_user(client, mocker): # mock ldap validation for admin user def mock_validate(self): auth = True user_data = {'username': 'Balrog', 'email': 'balrog@moria.me', 'roles': ['admin'], 'password': 'youshallnotpass'} return auth, user_data mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate) # initially, the Balrog user should not exist user = lookup_identity('Balrog') assert user is None # log in temp user using ldap -> this will succeed and create a local user response = headless_login(client, user='Balrog', password='youshallnotpass') soup = BeautifulSoup(response.data, 'html.parser') # make sure user is now created locally user = lookup_identity('Balrog') assert user is not None # make sure login succeeded -> Tür öffnen button will appear assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})]) @pytest.fixture def client_authenticated(client): # log in using admin account for testing headless_login(client) yield client @pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')]) def test_access_door_button(client_authenticated, mocker, url, function): mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function) # visit route for open client_authenticated.get(url) # make sure the open method was called getattr(DoorHandle, function).assert_called_once_with(user='gandalf') @pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')]) def test_access_door_unauthenticated(client, mocker, url, function): # test for trying to visit opening/closing door while not logged in mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function) # visit route for open response = client.get(url, follow_redirects=True) # we should get redirected to login page assert 'login' in response.request.url # the open door function should not be called getattr(DoorHandle, function).assert_not_called() def test_manage_admins(client_authenticated): # visit admin management page response = client_authenticated.get('/manage_admins') assert "Nutzer Übersicht" in response.data.decode() assert "gandalf" in response.data.decode() assert "gandalf@shire.me" in response.data.decode() def create_user(client_authenticated, username, email): # visit admin management page response = client_authenticated.get('/manage_admins') csrf_token = extract_csrf_token(response) # post data for creating a new admin payload = {'name': username, 'email': email, 'csrf_token': csrf_token} response = client_authenticated.post('/manage_admins', data=payload, follow_redirects=True) # after the new admin user is created, we should have been redirected to the # /manage_admin page. there, the password for login is displayed # we test if the newly created user can log in with that password # extract password displayed on the page match = re.search('Passwort (?P.*) um', response.data.decode()) assert match is not None extracted_password = match['password'] return extracted_password @pytest.fixture def temp_user(client_authenticated): """Fixture for creating a temporary user for testing""" username = secrets.token_hex(4) email = username + '@example.com' # create user for test password = create_user(client_authenticated, username, email) return {'username': username, 'email': email, 'password': password} @pytest.fixture def temp_admin(client_authenticated): """Fixture for creating a temporary admin user for testing""" username = secrets.token_hex(4) email = username + '@example.com' # create user for test password = create_user(client_authenticated, username, email) response = client_authenticated.get( f"/promote_admin/{username}", follow_redirects=True) user = lookup_identity(username) assert user.has_role('admin') return {'username': username, 'email': email, 'password': password} def test_backup_users(client_authenticated, temp_user): # test with invalid token response = client_authenticated.get("/backup_user_datastore", follow_redirects=True) user_data = json.loads(response.data) users = [d['username'] for d in user_data] emails = [d['email'] for d in user_data] assert temp_user['username'] in users assert temp_user['email'] in emails def test_create_admin(client_authenticated): password = create_user(client_authenticated, 'bilbo', 'bilbo@shire.me') # log out current user response = client_authenticated.get('/logout') # try to log in new user using the extracted password response = headless_login(client_authenticated, user='bilbo@shire.me', password=password) # - see if it works soup = BeautifulSoup(response.data, 'html.parser') # make sure login succeeded # -> username should be displayed assert 'Benutzer bilbo' in soup.decode() # -> Tür öffnen button will appear assert any(['Tür öffnen' in link.contents[0] for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})]) def test_activate_deactivate_user(temp_user, client_authenticated): response = client_authenticated.get( '/admin_toggle_active/nosuchuser', follow_redirects=True) assert 'Ungültiger Nutzer' in response.data.decode() # deactivate the user response = client_authenticated.get( f"/admin_toggle_active/{temp_user['username']}", follow_redirects=True) # make sure the user is now inactive user = lookup_identity(temp_user['username']) assert user is not None assert not user.active # activate the user again client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}", follow_redirects=True) # now the user should be active again user = lookup_identity(temp_user['username']) assert user is not None assert user.active # test deactivating super admin response = client_authenticated.get(f"/admin_toggle_active/gandalf", follow_redirects=True) assert 'Super-Admins können nicht deaktiviert werden!' \ in response.data.decode() def test_delete_admin(temp_user, client_authenticated): # first we test deleting a non-existing user response = client_authenticated.post( '/delete_admins/nosuchuser', follow_redirects=True) assert 'Ungültiger Nutzer' in response.data.decode() # next, we create a temporary user and try to delete that one response = client_authenticated.post( f"/delete_admins/{temp_user['username']}", follow_redirects=True) # we need to deactivate the user first assert 'Bitte den Benutzer zuerst deaktivieren.' in response.data.decode() # make sure the user still exists user = lookup_identity(temp_user['username']) assert user is not None # deactivate the user and try deleting it again response = client_authenticated.get( f"/admin_toggle_active/{temp_user['username']}", follow_redirects=True) # try deleting it without filling in the confirmation form response = client_authenticated.post( f"/delete_admins/{temp_user['username']}", follow_redirects=True) assert 'Der eingegebene Nutzername stimmt nicht überein' \ in response.data.decode() # make sure the user still exists user = lookup_identity(temp_user['username']) assert user is not None # now we send the confirmation data with the request response = client_authenticated.get( f"/delete_admins/{temp_user['username']}", follow_redirects=True) csrf_token = extract_csrf_token(response) payload = {'name': temp_user['username'], 'csrf_token': csrf_token} response = client_authenticated.post( f"/delete_admins/{temp_user['username']}", data=payload, follow_redirects=True) assert f"Benutzer {temp_user['username']} wurde gelöscht." in response.data.decode() # make sure the user now is gone user = lookup_identity(temp_user['username']) assert user is None def test_promote_user(temp_user, client_authenticated): # first we test with a non-existing user response = client_authenticated.get( '/promote_admin/nosuchuser', follow_redirects=True) assert 'Ungültiger Nutzer' in response.data.decode() user = lookup_identity(temp_user['username']) assert user is not None assert not user.has_role('admin') # grant admin permissions to test user response = client_authenticated.get( f"/promote_admin/{temp_user['username']}", follow_redirects=True) assert user.has_role('admin') # try granting admin permissions again response = client_authenticated.get( f"/promote_admin/{temp_user['username']}", follow_redirects=True) assert f"Benutzer {temp_user['username']} hat bereits Admin-Rechte!" assert user.has_role('admin') def test_demote_user(temp_admin, client_authenticated): # first we test with a non-existing user response = client_authenticated.get( '/demote_admin/nosuchuser', follow_redirects=True) assert 'Ungültiger Nutzer' in response.data.decode() user = lookup_identity(temp_admin['username']) assert user.has_role('admin') # try removing admin permissions response = client_authenticated.get( f"/demote_admin/{temp_admin['username']}", follow_redirects=True) assert not user.has_role('admin') # try removing admin permissions response = client_authenticated.get( f"/demote_admin/{temp_admin['username']}", follow_redirects=True) assert f"Benutzer {temp_admin['username']} ist bereits kein Admin!" \ in response.data.decode() assert not user.has_role('admin') # try removing admin permissions from superadmin response = client_authenticated.get( f"/demote_admin/gandalf", follow_redirects=True) assert "hat Super-Admin-Rechte und kann nicht verändert werden!" \ in response.data.decode() def test_list_tokens(client_authenticated): response = client_authenticated.get(f"/tokens", follow_redirects=True) # make sure the names for the test tokens are displayed assert all([user in response.data.decode() for user in ['Gandalf', 'Bilbo', 'Gimli']]) def test_token_log(client_authenticated): response = client_authenticated.get(f"/token-log", follow_redirects=True) page_src = response.data.decode() # perform some checks for displayed log data assert all([msg in page_src for msg in ['INFO', 'DEBUG', 'ERROR']]) assert "Valid token 04538cfa186280 of Gandalf" in page_src assert "2021-04-17 13:09:06,207" in page_src def test_backup_tokens(client_authenticated): # test with invalid token response = client_authenticated.get(f"/backup_tokens", follow_redirects=True) token_data = json.loads(response.data) assert {'04387cfa186280', '043a81fa186280', '04538cfa186280', '042979fa186280'}.issubset(token_data.keys()) def test_register_token(client_authenticated, mocker): # test to make sure message is displayed when no tokens were recently # scanned response = client_authenticated.get(f"/register-token", follow_redirects=True) page_src = response.data.decode() assert 'Keine unregistrierten Tokens in MQTT Nachrichten.' in page_src # mockup scanned token mocker.patch( 'imaginaerraum_door_admin.door_handle.DoorHandle.get_most_recent_token', lambda x: {'timestamp': datetime.datetime.now(), 'token': '042979fa181280'}) response = client_authenticated.get(f"/register-token", follow_redirects=True) csrf_token = extract_csrf_token(response) page_src = response.data.decode() assert 'Unregistrierter Token gelesen' in page_src assert '042979fa181280' in page_src # try registering with incomplete data response = client_authenticated.post(f"/register-token", data={}, follow_redirects=True) page_src = response.data.decode() assert 'Token konnte nicht registiert werden' in page_src # register the mocked token payload = {'name': 'Legolas', 'organization': 'Elves', 'email': 'legolas@mirkwood.me', 'dsgvo': True, 'csrf_token': csrf_token} response = client_authenticated.post(f"/register-token", data=payload, follow_redirects=True) page_src = response.data.decode() # make sure the user info for the new token is displayed assert '042979fa181280' in page_src assert 'Legolas' in page_src assert 'Elves' in page_src assert 'legolas@mirkwood.me' in page_src # check that the token is created in the token file token_data = pathlib.Path( client_authenticated.application.config['TOKEN_FILE']).read_text() assert '042979fa181280' in token_data assert 'Legolas' in token_data def test_edit_token(client_authenticated): # test with invalid token response = client_authenticated.get(f"/edit-token/nosuchtoken", follow_redirects=True) page_src = response.data.decode() assert 'Ungültiger Token' in page_src response = client_authenticated.post(f"/edit-token/nosuchtoken", follow_redirects=True) page_src = response.data.decode() assert 'Token konnte nicht editiert werden' in page_src # test using a valid token from the token file response = client_authenticated.get(f"/edit-token/04538cfa186280", follow_redirects=True) csrf_token = extract_csrf_token(response) payload = { 'name': 'Balin', 'organization': 'Dwarves', 'email': 'balin@erebor.me', 'active': True, 'limit_validity': False, 'valid_thru': datetime.date.today(), 'csrf_token': csrf_token } response = client_authenticated.post(f"/edit-token/04538cfa186280", data=payload, follow_redirects=True) page_src = response.data.decode() # make sure the new user info for the token is displayed assert '04538cfa186280' in page_src assert 'Balin' in page_src assert 'Dwarves' in page_src assert 'balin@erebor.me' in page_src # check that the token is changed in the token file token_data = pathlib.Path(client_authenticated.application.config['TOKEN_FILE']).read_text() assert '04538cfa186280' in token_data assert 'Balin' in token_data def test_delete_token(client_authenticated): token_data = pathlib.Path( client_authenticated.application.config['TOKEN_FILE']).read_text() assert '04538cfa186280' in token_data # test with invalid token response = client_authenticated.get(f"/delete-token/nosuchtoken", follow_redirects=True) page_src = response.data.decode() assert 'Ungültiger Token' in page_src # test using a valid token from the token file response = client_authenticated.get(f"/delete-token/043a81fa186280", follow_redirects=True) csrf_token = extract_csrf_token(response) # try deleting without form data response = client_authenticated.post(f"/delete-token/043a81fa186280", follow_redirects=True) page_src = response.data.decode() assert "wurde nicht gelöscht" in page_src payload = { 'name': 'Bilbo', 'csrf_token': csrf_token } response = client_authenticated.post(f"/delete-token/043a81fa186280", data=payload, follow_redirects=True) page_src = response.data.decode() print(page_src) assert "wurde gelöscht" in page_src # check that the token is now gone from the token file token_data = pathlib.Path(client_authenticated.application.config['TOKEN_FILE']).read_text() assert '043a81fa186280' not in token_data def test_deactivate_token(client_authenticated): token_data = pathlib.Path( client_authenticated.application.config['TOKEN_FILE']).read_text() assert '04387cfa186280' in token_data # test with invalid token response = client_authenticated.get(f"/deactivate-token/nosuchtoken", follow_redirects=True) page_src = response.data.decode() assert 'Ungültiger Token' in page_src # deactivate token response = client_authenticated.get(f"/deactivate-token/04387cfa186280", follow_redirects=True) # check that the token is now gone from the token file token_data = pathlib.Path( client_authenticated.application.config['TOKEN_FILE']).read_text() assert '#04387cfa186280' in token_data