Compare commits

...

58 Commits

Author SHA1 Message Date
Simon Pirkelmann ca1de7f82d logging configuration 2022-12-28 17:47:14 +01:00
Simon Pirkelmann 786df6b552 skip empty lines in admins file 2022-12-28 17:10:06 +01:00
Simon Pirkelmann 7ec0ba251d moved back to pure setup.cfg based config, added console script 2022-09-17 22:25:32 +02:00
Simon Pirkelmann a13e7b3e29 changed encoding 2022-09-16 21:56:20 +02:00
Simon Pirkelmann 8d29fbe9cd require APPLICATION_SETTINGS to be set 2022-09-16 21:55:48 +02:00
Simon Pirkelmann 74c6661f0e updated Flask-Security-Too to most recent version 2022-09-16 21:53:10 +02:00
Simon Pirkelmann 48e076eb16 moved back to setup.py because buildroot does not seem to work with pure setup.cfg 2022-09-12 23:30:54 +02:00
Simon Pirkelmann d9a078a114 fixed typos 2022-09-12 22:24:21 +02:00
Simon Pirkelmann a88da152c1 removed KEY_FILE option (no longer used) 2022-09-12 22:23:56 +02:00
Simon Pirkelmann 118f492881 use config file instead of program arguments, added config option for port 2022-09-12 22:22:49 +02:00
Simon Pirkelmann 19d330b6d8 added log output 2022-09-12 22:21:56 +02:00
Simon Pirkelmann d53334efd4 fixed formatting 2022-09-12 22:21:28 +02:00
Simon Pirkelmann 7c29f847bc fixed formatting 2022-09-12 22:21:21 +02:00
Simon Pirkelmann 3a6ea5926b fixed formatting 2022-09-12 22:20:40 +02:00
Simon Pirkelmann 5fb652c1d2 fixed formatting 2022-09-12 22:20:06 +02:00
Simon Pirkelmann 6d9e90631a worked on docs 2022-02-04 23:08:52 +01:00
Simon Pirkelmann f66283c328 fixed incorrect indentation 2022-02-04 22:45:44 +01:00
Simon Pirkelmann f0be983a5a make setting APPLICATION_SETTINGS file via environment variable optional 2022-02-04 22:41:55 +01:00
Simon Pirkelmann e8c1effd15 simplified super admin creation 2022-02-04 22:40:56 +01:00
Simon Pirkelmann 4470d2fb82 use logger from flask 2022-02-04 22:40:14 +01:00
Simon Pirkelmann 856a72b0cc added security salt in default configuration 2022-02-04 22:38:26 +01:00
Simon Pirkelmann a77dcd1878 simplified app configuration process 2022-02-04 22:13:15 +01:00
Simon Pirkelmann fa7c878cab reformatting 2022-02-04 21:28:37 +01:00
Simon Pirkelmann 2f40732d0b fixed issue with non-typeable key 2022-02-02 21:40:17 +01:00
Simon Pirkelmann 47f378fe6e added tests for DoorHandle class 2022-02-02 21:39:52 +01:00
Simon Pirkelmann 1320fc55ca added test for ldap validation 2022-02-01 23:03:35 +01:00
Simon Pirkelmann 33779e31b4 generalized ldap authentication 2022-02-01 23:03:06 +01:00
Simon Pirkelmann 2855163948 added test for ldap authentication 2022-02-01 00:53:14 +01:00
Simon Pirkelmann 0c7821cbe5 test for deactivating super admin 2022-01-31 23:25:47 +01:00
Simon Pirkelmann e0bbe02bc1 reformatting 2022-01-31 23:06:49 +01:00
Simon Pirkelmann d0f093ac16 test for demoting super admin 2022-01-31 23:06:38 +01:00
Simon Pirkelmann 95f0ff3563 added test for backing up user datastore 2022-01-31 22:59:44 +01:00
Simon Pirkelmann f945b7f79e test for downloading token data + some formatting fixes 2022-01-31 22:40:43 +01:00
Simon Pirkelmann 3bbf60b42f tests for deleting and deactivating tokens 2022-01-31 22:27:38 +01:00
Simon Pirkelmann 8d02e669f8 changed error message 2022-01-31 08:03:47 +01:00
Simon Pirkelmann c941d383c1 added test for editing tokens 2022-01-31 08:03:31 +01:00
Simon Pirkelmann a621b6bb78 added dummy token log data for testing 2022-01-30 23:09:20 +01:00
Simon Pirkelmann ec1e843f57 added tests for token log and registration 2022-01-30 23:08:51 +01:00
Simon Pirkelmann 3caf17c861 removed use of session cookie for token creation and modification 2022-01-30 23:08:18 +01:00
Simon Pirkelmann f1eaf8af4e added configuration data for testing 2022-01-30 22:08:29 +01:00
Simon Pirkelmann 4b3aed25d2 moved models to auth module 2022-01-30 21:56:11 +01:00
Simon Pirkelmann 080ea0f3b0 added tests for modifying user permissions 2022-01-30 21:54:35 +01:00
Simon Pirkelmann ee6ee0e111 streamlined tests and added test for trying to access the door without authentication 2022-01-30 18:49:09 +01:00
Simon Pirkelmann 2879a69445 make database and security objects global so we can access them in routes 2022-01-29 23:48:58 +01:00
Simon Pirkelmann 97957e389c added tests for admin management operations, streamlined testing a bit 2022-01-29 23:46:20 +01:00
Simon Pirkelmann 8f8bdb8cc3 added instructions for running tests 2022-01-29 23:45:31 +01:00
Simon Pirkelmann b64b0c7bb6 access config from current_app object 2022-01-27 23:57:06 +01:00
Simon Pirkelmann b3c585bd27 updated routes for blueprint based app 2022-01-27 23:56:48 +01:00
Simon Pirkelmann 42345273dd removed requirements.txt now that we have setup.cfg 2022-01-27 23:49:32 +01:00
Simon Pirkelmann ace5868571 added test configuration fixtures 2022-01-27 23:48:31 +01:00
Simon Pirkelmann a104a3d00f attach door object to flask application and use application's logger 2022-01-27 23:46:45 +01:00
Simon Pirkelmann ff9d21bcd5 moved creation of initial admin user to separate function 2022-01-27 23:45:17 +01:00
Simon Pirkelmann 38164aca4b started refactoring:
- use blueprint
- read configuration from file (default_app_config.py) and additional file specified by APPLICATION_SETTINGS environment variable
2022-01-25 21:42:35 +01:00
Simon Pirkelmann ba9379449a added tests for opening and closing the door via button 2022-01-25 19:38:19 +01:00
Simon Pirkelmann bb022fd1ce added login tests (headless and with selenium) 2022-01-25 00:09:14 +01:00
Simon Pirkelmann e0a22f770d added testing dependencies 2022-01-25 00:08:25 +01:00
Simon Pirkelmann 03e7425b2a newer versions of wtforms use html5 by default 2022-01-23 12:33:57 +01:00
Simon Pirkelmann b4f9e4525b use setup.cfg based package setup script 2022-01-23 12:33:20 +01:00
21 changed files with 1965 additions and 780 deletions

View File

@ -1 +1,79 @@
Flask-based web interface for user token adminstration of our hackerspace's door lock.
Flask-based web interface for user token administration of our hackerspace's door lock.
## Installation
Clone the repo
```shell
git clone <path_to_repo>
```
Install using pip
```shell
pip install .
```
## Running the app
```shell
export FLASK_APP=imaginaerraum_door_admin
flask run
```
## Configuration
You can set custom configuration options by defining an environment variable
``APPLICATION_SETTINGS`` pointing to a file with configuration options.
For example, consider the following configuration file ``app_config.py``:
```
# door app configuration
SECRET_KEY = 'mysupersecretkey'
SECURITY_PASSWORD_SALT = 'saltycaramel'
TESTING = False
DEBUG = False
TOKEN_FILE = 'door_tokens.txt'
ADMIN_FILE = 'admins.txt'
NFC_SOCKET = "/tmp/nfc.sock"
NFC_LOG = "nfc.log"
```
To instruct the flask app to use this configuration, use the following commands:
```shell
export APPLICATION_SETTINGS=app_config.py
flask run
```
Below, you can find a list of configuration options.
### Flask app configuration
You can override common Flask configuration variables. In particular, you
definitely should set custom values for the ``SECRET_KEY`` and
``SECURITY_PASSWORD_SALT``.
### Token file
The token file is an ASCII file which lists the IDs of RFID tokens that can be
used to unlock the door. You can specify the path to the token file using the
``TOKEN_FILE`` variable in the configuration file.
Here's an example of a token file (lines starting with ``#`` represent inactive
tokens):
```
# token | name | organization | email | valid_thru
#042979fa186280||||
04487cfa176280|Frodo|Hobbits|frodo@shire.me|
043a85fa1a6280|Gandalf|Wizards|gandalf@middleearth.me|
#04206e2aef6880|Boromir|Humans|boromir@gondor.me|
```
### Admin file
``ADMIN_FILE`` -> file to create new super admins
### User database
### NFC files
``NFC_SOCKET = "/tmp/nfc.sock"`` -> unix socket to interact with the door
``NFC_LOG = "nfc.log"`` -> log file of door events
## Development
```shell
cd tests
export APPLICATION_SETTINGS=/home/simon/imaginaerraum/door-lock/webinterface/tests/debug_app_config.py
pytest --cov=../imaginaerraum_door_admin --cov-report=html --cov-report=term
```

View File

@ -1,28 +0,0 @@
#!/usr/bin/env python3
import argparse
from imaginaerraum_door_admin.webapp import create_application
parser = argparse.ArgumentParser()
parser.add_argument("--key_file", default='/root/flask_keys', help="Path to file with Flask SECRET_KEY and SECURITY_PASSWORD_SALT")
parser.add_argument("--token_file", default="/etc/door_tokens", help="path to the file with door tokens and users")
parser.add_argument("--nfc_socket", default="/tmp/nfc.sock", help="socket for handling NFC reader commands")
parser.add_argument("--template_folder", default="templates", help="path to Flask templates folder")
parser.add_argument("--static_folder", default="static", help="path to Flask static folder")
parser.add_argument("--admin_file", default="/etc/admins.conf", help="Path to file for creating super admin users")
parser.add_argument("--log_file", default="/var/log/webinterface.log", help="Path to flask log file")
parser.add_argument("--nfc_log", default="/var/log/nfc.log", help="Path to nfc log file")
parser.add_argument("--ldap_url", default="ldaps://ldap.imaginaerraum.de",
help="URL for LDAP server for alternative user authorization")
parser.add_argument("--mqtt_host", default="10.10.21.2", help="IP address of MQTT broker")
parser.add_argument("--flask_port", default=80, help="Port for running the Flask server")
parser.add_argument("--mail_server", default="smtp.googlemail.com", help="email server for sending security messages")
parser.add_argument("--mail_port", default=465, help="port for security email server")
parser.add_argument("--mail_use_tls", default=False, help="use TLS for security emails")
parser.add_argument("--mail_use_ssl", default=True, help="use SSL for security emails")
parser.add_argument("--mail_username", default="admin@example.com", help="email account for sending security messages")
parser.add_argument("--mail_password", default="password", help="Password for email account")
config = parser.parse_args()
app = create_application(config)
app.run(host='0.0.0.0', port=config.flask_port)

View File

@ -0,0 +1,117 @@
import logging
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
from email_validator import validate_email
from pathlib import Path
from .door_handle import DoorHandle
security = Security()
db = SQLAlchemy()
# create admin users (only if they don't exist already)
def create_super_admins(app, user_datastore):
admin_file = Path(app.config.get('ADMIN_FILE'))
# setup user database when starting the app
new_admin_data = []
if not admin_file.exists():
app.logger.warning(
f"Admin user creation file not found at path "
f"{admin_file.absolute()}."
f"No super admins have been created in the datastore."
)
else:
# store data for new admins in memory s.t. the file can be deleted
# afterwards
admin_data = admin_file.read_text().split('\n')
for i, line in enumerate(admin_data):
if len(line.strip()) > 0 and not line.strip().startswith('#'):
try:
user, email, pw = line.split()
validate_email(email)
new_admin_data.append(
{'username': user, 'email': email,
'password': pw})
except Exception as e:
app.logger.error(
f"Error while parsing line {i} in admin config file. "
f"Config file should contain lines of <username> "
f"<email> <password>\\n'\n "
f"Exception: {e}\nAdmin account could not be created."
)
with app.app_context():
db.create_all()
super_admin_role = user_datastore.find_or_create_role(
'super_admin') # root admin = can create other admins
admin_role = user_datastore.find_or_create_role(
'admin') # 'normal' admin
local_role = user_datastore.find_or_create_role(
'local') # LDAP user or local user
for d in new_admin_data:
if user_datastore.find_user(email=d['email'],
username=d['username']) is None:
roles = [super_admin_role, admin_role]
if not d['password'] == 'LDAP':
roles.append(local_role)
# create new admin (only if admin does not already exist)
new_admin = user_datastore.create_user(
email=d['email'], username=d['username'],
password=hash_password(d['password']), roles=roles
)
app.logger.info(
f"New super admin user created with username "
f"'{new_admin.username}' and email '{new_admin.email}'"
f", roles = {[r.name for r in new_admin.roles]}"
)
db.session.commit()
def create_app():
app = Flask(__name__)
app.config.from_object(
'imaginaerraum_door_admin.default_app_config.DefaultConfig'
)
app.config.from_envvar('APPLICATION_SETTINGS')
logging.basicConfig(filename=app.config['LOG_FILE'], level=logging.INFO)
token_file = Path(app.config.get('TOKEN_FILE'))
if not token_file.exists():
app.logger.warning(
f"Token file not found at {token_file.absolute()}. "
"An empty token file will be created."
)
token_file.touch()
# create door objects which provides access to the token file and current
# door state via MQTT
app.door = DoorHandle(
token_file=token_file, mqtt_host=app.config['MQTT_HOST'],
nfc_socket=app.config['NFC_SOCKET'], logger=app.logger
)
# Mail Config
#mail = Mail(app)
# Create database connection object
db.init_app(app)
from . webapp import door_app
app.register_blueprint(door_app)
# Setup Flask-Security
from .auth import ExtendedLoginForm, User, Role
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
create_super_admins(app, user_datastore)
return app

View File

@ -0,0 +1,163 @@
from wtforms.fields import StringField, BooleanField
from flask import current_app
from flask_security import hash_password
from flask_security.forms import LoginForm, Required, PasswordField
from flask_security.utils import lookup_identity
from flask_security.models import fsqla_v2 as fsqla
import ldap3
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
from imaginaerraum_door_admin import db, security
# Define models
fsqla.FsModels.set_db_info(db)
class Role(db.Model, fsqla.FsRoleMixin):
pass
class User(db.Model, fsqla.FsUserMixin):
pass
class ExtendedLoginForm(LoginForm):
email = StringField('Benutzername oder E-Mail', [Required()])
password = PasswordField('Passwort', [Required()])
remember = BooleanField('Login merken?')
def validate(self):
# search for user in the current database
user = lookup_identity(self.email.data)
if user is not None:
# if a user is found we check if it is associated with LDAP or with
# the local database
if user.has_role('local'):
# try authorizing locally using Flask security user datastore
authorized = super(ExtendedLoginForm, self).validate()
if authorized:
current_app.logger.info(
f"User with credentials '{self.email.data}' authorized "
f"through local database"
)
else:
# run LDAP authorization
# if the authorization succeeds we also get the new_user_data
# dict which contains information about
# the user's permissions etc.
authorized, new_user_data = self.validate_ldap()
if authorized:
current_app.logger.info(
f"User with credentials '{self.email.data}' authorized "
f"through LDAP")
# update permissions and password/email to stay up to date
# for login with no network connection
user.email = new_user_data['email']
user.password = new_user_data['password']
for role in new_user_data['roles']:
security.datastore.add_role_to_user(user, role)
security.datastore.commit()
self.user = user
else:
self.password.errors = ['Invalid password']
else:
# this means there is no user with that email in the database
# we assume that the username was entered instead of an email and
# use that for authentication with LDAP
username = self.email.data
# try LDAP authorization and create a new user if it succeeds
authorized, new_user_data = self.validate_ldap()
if authorized:
# if there was no user in the database before we create a new
# user
self.user = security.datastore.create_user(
username=new_user_data['username'],
email=new_user_data['email'],
password=new_user_data['password'],
roles=new_user_data['roles']
)
security.datastore.commit()
current_app.logger.info(
f"New admin user '{new_user_data['username']} "
f"<{new_user_data['email']}>' created after successful "
f"LDAP authorization"
)
# if any of the authorization methods is successful we authorize
# the user
return authorized
def validate_ldap(self):
"""Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password
is authorized. Then the permissions and additional information of the
user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update
the entry for the user in the local database.
Parameters
----------
Returns
-------
bool : result of the authorization process (True = success,
False = failure)
dict : dictionary with information about an authorized user
(contains username, email, hashed password, roles)
"""
ldap_server = ldap3.Server(current_app.config['LDAP_URL'])
ldap_user_group = current_app.config['LDAP_USER_GROUP']
ldap_domain = current_app.config['LDAP_DOMAIN']
ldap_domain_ext = current_app.config['LDAP_DOMAIN_EXT']
username = self.email.data
password = self.password.data
try:
user = f"uid={username},ou={ldap_user_group},dc={ldap_domain}," \
f"dc={ldap_domain_ext}"
con = ldap3.Connection(
ldap_server,
user=user,
password=password,
auto_bind=True
)
except ldap3.core.exceptions.LDAPBindError as e:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError as e:
# server not reachable -> fail (but will try authorization from
# local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
return False, None
# get user data and permissions from LDAP server
new_user_data = {}
new_user_data['username'] = username
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
search_base = f"ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
search_filter = f"(&(uid={username})(memberof=cn=Keyholders," \
f"ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))"
lock_permission = con.search(
search_base, search_filter, attributes=ldap3.ALL_ATTRIBUTES
)
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
return False, None
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,' \
f'dc={ldap_domain},dc={ldap_domain_ext}))'
token_granting_permission = con.search(search_base, search_filter)
if token_granting_permission:
new_user_data['roles'].append('admin')
return True, new_user_data

View File

@ -0,0 +1,11 @@
#!/usr/bin/env python3
from imaginaerraum_door_admin import create_app
def main():
app = create_app()
app.run(host='0.0.0.0', port=app.config.get('PORT', 80))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,61 @@
import bleach
from flask_security import uia_email_mapper
print("loading default config")
def uia_username_mapper(identity):
# we allow pretty much anything - but we bleach it.
return bleach.clean(identity, strip=True)
class DefaultConfig(object):
DEBUG = False
PORT = 80
SECRET_KEY = 'supersecret'
SECURITY_PASSWORD_SALT = 'salty'
SECURITY_REGISTERABLE = False
SECURITY_CHANGEABLE = True
SECURITY_RECOVERABLE = True
SECURITY_SEND_PASSWORD_CHANGE_EMAIL = False
SECURITY_POST_LOGIN_VIEW = '/'
SECURITY_EMAIL_SUBJECT_PASSWORD_RESET = 'Passwort zurücksetzen'
SECURITY_EMAIL_SUBJECT_PASSWORD_NOTICE = 'Passwort wurde zurückgesetzt'
SECURITY_PASSWORD_LENGTH_MIN = 10
SECURITY_USER_IDENTITY_ATTRIBUTES = [
{"email": {"mapper": uia_email_mapper, "case_insensitive": True}},
{"username": {"mapper": uia_username_mapper}}
]
# mail configuration
MAIL_SERVER = ''
MAIL_PORT = 465
MAIL_USE_SSL = True
MAIL_USERNAME = ''
MAIL_PASSWORD = ''
MAIL_DEFAULT_SENDER = ''
SQLALCHEMY_DATABASE_URI = 'sqlite:///admin.db'
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
# underlying engine. This option makes sure that DB connections from the
# pool are still valid. Important for entire application since
# many DBaaS options automatically close idle connections.
SQLALCHEMY_ENGINE_OPTIONS = {
"pool_pre_ping": True,
}
TOKEN_FILE = "door_tokens"
ADMIN_FILE = 'admins'
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
LDAP_USER_GROUP = 'Users'
LDAP_DOMAIN = 'imaginaerraum'
LDAP_DOMAIN_EXT = 'de'
NFC_SOCKET = "/tmp/nfc.sock"
LOG_FILE = "webinterface.log"
NFC_LOG = "nfc.log"
MQTT_HOST = '10.10.21.2'

View File

@ -4,23 +4,22 @@ from pathlib import Path
import logging
from datetime import datetime
class DoorHandle:
def __init__(self, token_file, mqtt_host, mqtt_port=1883, nfc_socket='/tmp/nfc.sock', logger=None):
def __init__(self, token_file, mqtt_host, mqtt_port=1883,
nfc_socket='/tmp/nfc.sock', logger=None):
self.state = None
self.encoder_position = None
if not Path(token_file).exists():
raise FileNotFoundError(f"File with door tokens could not be found at {Path(token_file).absolute()}")
raise FileNotFoundError(
"File with door tokens could not be found at "
f"{Path(token_file).absolute()}"
)
self.token_file = token_file
self.last_invalid = {}
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
self.mqtt_client.loop_start()
if logger:
self.logger = logger
else:
@ -31,10 +30,18 @@ class DoorHandle:
self.nfc_sock.connect(nfc_socket)
self.logger.info(f"Connected to NFC socket at {nfc_socket}.")
except Exception as e:
print(f"Could not connect to NFC socket at {nfc_socket}. Exception: {e}")
self.logger.error(f"Could not connect to NFC socket at {nfc_socket}. "
f"Exception: {e}")
self.nfc_sock = None
#raise
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
self.mqtt_client.loop_start()
self.logger.info(f"Connected to MQTT broker at {mqtt_host}:{mqtt_port}")
self.data_fields = ['name', 'organization', 'email', 'valid_thru']
# The callback for when the client receives a CONNACK response from the server.
@ -43,7 +50,7 @@ class DoorHandle:
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("#")
client.subscribe("door/#")
# The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg):
@ -70,11 +77,13 @@ class DoorHandle:
email = data[3].strip() if len(data) > 3 else None
valid_thru = data[4].strip() if len(data) > 4 else None
tokens[token] = {'name': name,
'organization': organization,
'email': email,
'valid_thru': valid_thru,
'inactive': inactive}
tokens[token] = {
'name': name,
'organization': organization,
'email': email,
'valid_thru': valid_thru,
'inactive': inactive
}
return tokens
def store_tokens(self, tokens):
@ -98,13 +107,15 @@ class DoorHandle:
if self.nfc_sock is not None:
self.nfc_sock.send(b'open ' + user.encode() + b'\n')
else:
raise Exception("No connection to NFC socket. Cannot close door!")
self.logger.error("No connection to NFC socket. Cannot open door!")
raise RuntimeError("No connection to NFC socket. Cannot open door!")
def close_door(self, user=''):
if self.nfc_sock is not None:
self.nfc_sock.send(b'close ' + user.encode() + b'\n')
else:
raise Exception("No connection to NFC socket. Cannot close door!")
self.logger.error("No connection to NFC socket. Cannot close door!")
raise RuntimeError("No connection to NFC socket. Cannot close door!")
def get_most_recent_token(self):
# read last invalid token from logfile

View File

@ -0,0 +1,46 @@
from flask import flash
from flask_wtf import FlaskForm
from wtforms.fields import DateField, EmailField
from wtforms.fields import StringField, BooleanField
from wtforms.validators import DataRequired, ValidationError, EqualTo
from datetime import date
def validate_valid_thru_date(form, field):
if form.limit_validity.data:
# only check date format if limited validity of token is set
try:
if not field.data >= date.today():
raise ValueError
except ValueError as e:
flash("Ungültiges Datum")
raise ValidationError
return True
class TokenForm(FlaskForm):
name = StringField("Name", validators=[DataRequired()])
email = EmailField("E-Mail", validators=[DataRequired()])
organization = StringField("Organization", validators=[DataRequired()])
limit_validity = BooleanField("Gültigkeit begrenzen?")
valid_thru = DateField("Gültig bis", validators=[validate_valid_thru_date])
active = BooleanField("Aktiv?")
dsgvo = BooleanField(
"Einwilligung Nutzungsbedingungen erfragt?", validators=[DataRequired()]
)
class ConfirmDeleteForm(FlaskForm):
name = StringField(
"Name",
validators=[
DataRequired(),
EqualTo("name_confirm", "Name stimmt nicht überein"),
],
)
name_confirm = StringField("Name confirm")
class AdminCreationForm(FlaskForm):
name = StringField("Name", validators=[DataRequired()])
email = EmailField("E-Mail", validators=[DataRequired()])

View File

@ -27,15 +27,15 @@
{% endfor %}
<td>
{% if not data['super_admin'] %}
<a href="{{ url_for('admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
<a href="{{ url_for('delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
<a href="{{ url_for('door_app.admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
<a href="{{ url_for('door_app.delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
{% endif %}
{% if data['admin'] %}
{% if not data['super_admin'] %}
<a href="{{ url_for('demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a>
<a href="{{ url_for('door_app.demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a>
{% endif %}
{% else %}
<a href="{{ url_for('promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a>
<a href="{{ url_for('door_app.promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a>
{% endif %}
</td>
</tr>
@ -68,14 +68,14 @@
</div>
<div class="p-2 bg-light border">
<h3>Nutzerdaten sichern:</h3>
<form action="{{ url_for('backup_user_datastore') }}" method="get">
<form action="{{ url_for('door_app.backup_user_datastore') }}" method="get">
<input type="submit" value="Download">
</form>
</div>
<div class="p-2 bg-light border">
<h3>Nutzerdaten wiederherstellen:</h3>
<form action="{{ url_for('restore_user_datastore') }}" method=post enctype=multipart/form-data>
<form action="{{ url_for('door_app.restore_user_datastore') }}" method=post enctype=multipart/form-data>
<input type=file name=file>
<input type=submit value="Abschicken">
</form>

View File

@ -13,7 +13,7 @@
<body>
<nav class="navbar navbar-expand-lg navbar-light bg-light">
<div class="container-fluid">
<a class="navbar-brand" href="{{ url_for('door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
<a class="navbar-brand" href="{{ url_for('door_app.door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent"
aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
@ -29,10 +29,10 @@
Tokens
</a>
<div class="dropdown-menu" aria-labelledby="navbarDropdown">
<a class="dropdown-item" href="{{ url_for('register') }}">Token Registrierung</a>
<a class="dropdown-item" href="{{ url_for('list_tokens') }}">Token Übersicht</a>
<a class="dropdown-item" href="{{ url_for('door_app.register') }}">Token Registrierung</a>
<a class="dropdown-item" href="{{ url_for('door_app.list_tokens') }}">Token Übersicht</a>
{% if current_user.has_role('super_admin') %}
<a class="dropdown-item" href="{{ url_for('token_log') }}">Token Log</a>
<a class="dropdown-item" href="{{ url_for('door_app.token_log') }}">Token Log</a>
{% endif %}
</div>
</li>
@ -40,7 +40,7 @@
{% if current_user.has_role('super_admin') %}
<li class="nav-item">
<a class="nav-link" href="{{ url_for('manage_admins') }}">Benutzer verwalten</a>
<a class="nav-link" href="{{ url_for('door_app.manage_admins') }}">Benutzer verwalten</a>
</li>
{% endif %}

View File

@ -24,11 +24,11 @@
{% if current_user.is_authenticated %}
<div class="row">
<div class="col-3">
<a href="{{ url_for('open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
<a href="{{ url_for('door_app.open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
</div>
<div class="col-1"></div>
<div class="col-3">
<a href="{{ url_for('close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
<a href="{{ url_for('door_app.close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
</div>
<div class="col-5"></div>
</div>

View File

@ -23,7 +23,7 @@
und diesen zustimmt.
</li>
<li>Achtung: Der Token funktioniert nicht sofort, sondern muss erst explizit aktiviert werden!
Dazu in der <a href="{{ url_for('list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol
Dazu in der <a href="{{ url_for('door_app.list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol
(<img src="static/edit.png" title="Editieren" alt="Edit">) klicken und den Haken bei "Aktiv?" setzen.
</li>
<li>Jetzt kann der Token verwendet werden.</li>

View File

@ -29,9 +29,9 @@
<td>{{ data[field] if data[field] }}</td>
{% endfor %}
<td>
<a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
<a href="{{ url_for('deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a>
<a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
<a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
<a href="{{ url_for('door_app.deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a>
<a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
</td>
</tr>
{% endfor %}
@ -47,14 +47,14 @@
<td>{{ data[field] if data[field] }}</td>
{% endfor %}
<td>
<a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
<a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
<a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
<a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
</td>
</tr>
{% endfor %}
</tbody>
</table>
<form action="{{ url_for('backup_tokens') }}" method="get">
<form action="{{ url_for('door_app.backup_tokens') }}" method="get">
<input type="submit" value="Token Daten sichern">
</form>
{% endblock %}

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +0,0 @@
flask~=1.1.2
Flask-Security-Too~=4.0.0
WTForms~=2.3.3
paho-mqtt~=1.5.1
bleach~=3.3.0

View File

@ -15,3 +15,38 @@ classifiers =
Programming Language :: Python :: 3
License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Operating System :: OS Independent
[options]
python_requires = >=3.8
install_requires =
bleach
Flask
Flask-Mail
Flask-Security-Too>=5.0.1
Flask-SQLAlchemy
Flask-WTF
email_validator
paho-mqtt
ldap3
wtforms
include_package_data = True
packages = find:
setup_requires =
wheel
tests_require = pytest>=3
zip_safe = False
[options.entry_points]
console_scripts =
launch_webinterface = imaginaerraum_door_admin.bin.launch_webadmin:main
[options.extras_require]
dev =
pytest
pytest-cov
pytest-flask
pytest-mock
flake8
selenium
beautifulsoup4

View File

@ -1,17 +1,3 @@
from setuptools import setup
setup(install_requires=[
'bleach',
'Flask',
'Flask-Mail',
'Flask-Security-Too',
'Flask-SQLAlchemy',
'Flask-WTF',
'email_validator',
'paho-mqtt',
'ldap3',
],
include_package_data=True,
scripts=['bin/launch_webadmin'],
packages=['imaginaerraum_door_admin'],
zip_safe=False)
setup()

24
tests/conftest.py Normal file
View File

@ -0,0 +1,24 @@
import pytest
import os
from selenium.webdriver import Chrome
from imaginaerraum_door_admin import create_app
os.environ['APPLICATION_SETTINGS'] = os.getcwd() + '/debug_app_config.py'
@pytest.fixture(scope='session')
def app():
"""Fixture to launch the webapp"""
app = create_app()
return app
@pytest.fixture
def browser():
"""Fixture for a selenium browser to access the webapp."""
driver = Chrome()
driver.implicitly_wait(10)
yield driver
driver.quit()

56
tests/debug_app_config.py Normal file
View File

@ -0,0 +1,56 @@
import tempfile
from pathlib import Path
TESTING = True
DEBUG = True
LOG_FILE = 'webapp.log'
token_file = tempfile.NamedTemporaryFile(delete=False).name
Path(token_file).write_text(
"""# token | name | organization | email | valid_thru
#042979fa186280||||
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
"""
)
TOKEN_FILE = str(token_file)
SECRET_KEY = 'supersecret'
SECURITY_PASSWORD_SALT = 'salty'
admin_file = tempfile.NamedTemporaryFile(delete=False).name
Path(admin_file).write_text(
"""# create new super-admin by putting the following data in this file:
# username email ldap/password
# the third column can either be LDAP, in which case the user will be able to authenticate through LDAP
# or if it's not LDAP then it should be the password for the new user which will be stored in the local database
# examples:
gandalf gandalf@shire.me shadowfax
"""
)
ADMIN_FILE = str(admin_file)
db_file = tempfile.NamedTemporaryFile(delete=False).name
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + str(db_file)
token_log_file = tempfile.NamedTemporaryFile(delete=False).name
Path(token_log_file).write_text(
"""1970-01-01 00:00:12,947 - nfc_log - INFO - Loading tokens
1970-01-01 00:00:12,961 - nfc_log - DEBUG - Opening control socket
1970-01-01 00:00:24,259 - nfc_log - INFO - Got connection
1970-01-01 00:00:28,052 - nfc_log - INFO - Connected to mqtt host with result 0
2021-04-17 13:05:09,344 - nfc_log - DEBUG - Control socket opening door
2021-04-17 13:05:09,346 - nfc_log - INFO - Opening the door
2021-04-17 13:05:25,684 - nfc_log - DEBUG - Control socket closing door
2021-04-17 13:05:25,685 - nfc_log - INFO - Closing the door
2021-04-17 13:08:57,121 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
2021-04-17 13:08:57,123 - nfc_log - INFO - Closing the door
2021-04-17 13:09:06,207 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
2021-04-17 13:09:06,209 - nfc_log - INFO - Opening the door
2021-04-17 13:09:09,017 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
2021-04-17 13:09:09,019 - nfc_log - ERROR - Opening the door
2021-04-17 13:09:43,272 - nfc_log - DEBUG - Reloading tokens
"""
)
NFC_LOG = str(token_log_file)

154
tests/test_door_handle.py Normal file
View File

@ -0,0 +1,154 @@
import datetime
import time
import pytest
from pathlib import Path
from imaginaerraum_door_admin.door_handle import DoorHandle
import paho.mqtt.client as mqtt
@pytest.fixture
def nfc_socket(tmp_path):
# mock up a Unix socket server for testing
import threading
import socketserver
class ThreadedRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
data = str(self.request.recv(1024), 'ascii')
cur_thread = threading.current_thread()
response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
try:
self.request.sendall(response)
except BrokenPipeError:
pass
class TreadedUnixServer(socketserver.ThreadingMixIn,
socketserver.UnixStreamServer):
pass
nfc_socket_file = tmp_path / 'nfc.sock'
server = TreadedUnixServer(str(nfc_socket_file), ThreadedRequestHandler)
with server:
nfc_socket = server.server_address
# Start a thread with the server -- that thread will then start one
# more thread for each request
server_thread = threading.Thread(target=server.serve_forever)
# Exit the server thread when the main thread terminates
server_thread.daemon = True
server_thread.start()
print("Server loop running in thread:", server_thread.name)
yield nfc_socket
server.shutdown()
def test_create_door_handle(nfc_socket, tmp_path):
# test with invalid token file
with pytest.raises(FileNotFoundError):
door_handle = DoorHandle(token_file='no_such_file',
mqtt_host='test.mosquitto.org')
token_file = tmp_path / 'door_tokens'
token_file.write_text('')
door_handle = DoorHandle(
token_file=token_file, mqtt_host='test.mosquitto.org',
nfc_socket=nfc_socket
)
door_handle.nfc_sock.close()
@pytest.fixture
def door_handle(nfc_socket, tmp_path):
token_file = tmp_path / 'door_tokens'
token_file.write_text(
"""# token | name | organization | email | valid_thru
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
"""
)
door_handle = DoorHandle(
token_file=token_file, mqtt_host='test.mosquitto.org',
nfc_socket=nfc_socket
)
yield door_handle
door_handle.nfc_sock.close()
@pytest.fixture
def door_handle_no_nfc(tmp_path):
token_file = tmp_path / 'door_tokens'
token_file.write_text('')
door_handle = DoorHandle(
token_file=token_file, mqtt_host='test.mosquitto.org',
)
yield door_handle
def test_store_tokens(door_handle):
new_tokens = {
'042979fa186280': {
'name': 'Pippin',
'organization': 'Hobbits',
'email': 'pippin@shire.me',
'valid_thru': None,
'inactive': False
}
}
door_handle.store_tokens(new_tokens)
token_file = Path(door_handle.token_file)
assert token_file.exists()
assert '042979fa186280' in token_file.read_text()
def test_mqtt_messages(door_handle):
# test sending messages to the door_handle via MQTT and see if the state
# gets updated accordingly
client = mqtt.Client()
con = client.connect('test.mosquitto.org', 1883)
client.publish('door/position/value', 10).wait_for_publish(1)
time.sleep(1)
assert door_handle.encoder_position == 10
client.publish('door/state/value', 'open').wait_for_publish(1)
time.sleep(1)
assert door_handle.state == 'open'
timestamp = datetime.datetime.now().replace(microsecond=0)
token = '042979fa186280'
client.publish(
'door/token/last_invalid',
f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')};{token}"
).wait_for_publish(1)
time.sleep(1)
most_recent_token = door_handle.get_most_recent_token()
assert most_recent_token['timestamp'] == timestamp
assert most_recent_token['token'] == token
client.disconnect()
def test_open_door(door_handle):
door_handle.open_door()
def test_open_door_broken_socket(door_handle_no_nfc):
# test broken nfc_socket connection
with pytest.raises(RuntimeError):
door_handle_no_nfc.open_door()
def test_close_door(door_handle):
door_handle.close_door()
def test_close_door_broken_socket(door_handle_no_nfc):
# test broken nfc_socket connection
with pytest.raises(RuntimeError):
door_handle_no_nfc.close_door()

622
tests/test_webinterface.py Normal file
View File

@ -0,0 +1,622 @@
import datetime
import pytest
from bs4 import BeautifulSoup
from flask_security.utils import lookup_identity
from imaginaerraum_door_admin.door_handle import DoorHandle
from imaginaerraum_door_admin.auth import ExtendedLoginForm
import re
import secrets
import pathlib
import json
def test_login(browser, live_server):
response = browser.get(f'http://localhost:{live_server.port}')
assert '<h1>Space Zugangsverwaltung</h1>' in browser.page_source
response = browser.get(f'http://localhost:{live_server.port}/login')
email_form = browser.find_element_by_xpath('//input[@id="email"]')
email_form.send_keys('gandalf@shire.me')
password_form = browser.find_element_by_xpath('//input[@id="password"]')
password_form.send_keys('shadowfax')
submit_button = browser.find_element_by_xpath('//input[@id="submit"]')
submit_button.click()
assert 'Tür öffnen' in browser.page_source
def extract_csrf_token(response):
soup = BeautifulSoup(response.data, 'html.parser')
csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value']
return csrf_token
def headless_login(client, user='gandalf@shire.me', password='shadowfax'):
# extract csrf token from the login page source
response = client.get('/login', follow_redirects=True)
csrf_token = extract_csrf_token(response)
# send login information
payload = {
'csrf_token': csrf_token,
'email': user,
'password': password
}
return client.post('/login', data=payload, follow_redirects=True)
def test_login_headless(client):
response = headless_login(client)
soup = BeautifulSoup(response.data, 'html.parser')
# make sure login succeeded -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0]
for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
def test_validate_ldap(client, mocker):
# mock ldap Connection object to simulate successful authentication
import ldap3
def init_success(self, *args, **kwargs):
pass
def init_LDAPBindError(self, *args, **kwargs):
raise ldap3.core.exceptions.LDAPBindError()
def init_LDAPSocketOpenError(self, *args, **kwargs):
raise ldap3.core.exceptions.LDAPSocketOpenError()
def init_Exception(self, *args, **kwargs):
raise Exception()
def search_success(self, *args, **kwargs):
return True
def search_failure(self, *args, **kwargs):
return False
mocker.patch.object(ldap3.Connection, '__init__', init_success)
mocker.patch.object(ldap3.Connection, 'search', search_success)
mock_entries = mocker.MagicMock()
mock_entries[0].mail.value = 'user@example.com'
mocker.patch.object(ldap3.Connection, 'entries', mock_entries)
with client.application.app_context():
# test successful login
form = ExtendedLoginForm()
form.email.data = 'user'
form.password.data = 'password'
result = form.validate_ldap()
assert result[0]
assert result[1]['username'] == 'user'
assert result[1]['email'] == 'user@example.com'
assert result[1]['roles'] == ['admin']
# test failing ldap search
mocker.patch.object(ldap3.Connection, 'search', search_failure)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
# test some errors in ldap Connection and authentication
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPBindError)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPSocketOpenError)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
mocker.patch.object(ldap3.Connection, '__init__', init_Exception)
result = form.validate_ldap()
assert not result[0]
assert result[1] is None
def test_login_ldap(client, temp_user, mocker):
# mock ldap validation for admin user
def mock_validate(self):
auth = self.email.data == temp_user['username'] and self.password.data == temp_user['password']
user_data = {'username': temp_user['username'],
'email': temp_user['email'],
'roles': ['admin'],
'password': temp_user['password']}
return auth, user_data
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
user = lookup_identity(temp_user['username'])
# remove local role so that ldap authentication is the default
user.roles.pop(0)
# log out admin user
client.get('/logout')
# log in temp user using ldap
response = headless_login(client, user=temp_user['username'],
password=temp_user['password'])
soup = BeautifulSoup(response.data, 'html.parser')
# make sure login succeeded -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0]
for link in soup.findAll('a', attrs={'class': ['btn'],
'role': 'button'})])
def test_login_ldap_new_user(client, mocker):
# mock ldap validation for admin user
def mock_validate(self):
auth = True
user_data = {'username': 'Balrog',
'email': 'balrog@moria.me',
'roles': ['admin'],
'password': 'youshallnotpass'}
return auth, user_data
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
# initially, the Balrog user should not exist
user = lookup_identity('Balrog')
assert user is None
# log in temp user using ldap -> this will succeed and create a local user
response = headless_login(client, user='Balrog',
password='youshallnotpass')
soup = BeautifulSoup(response.data, 'html.parser')
# make sure user is now created locally
user = lookup_identity('Balrog')
assert user is not None
# make sure login succeeded -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0]
for link in soup.findAll('a', attrs={'class': ['btn'],
'role': 'button'})])
@pytest.fixture
def client_authenticated(client):
# log in using admin account for testing
headless_login(client)
yield client
@pytest.mark.parametrize("url,function", [('/open', 'open_door'),
('/close', 'close_door')])
def test_access_door_button(client_authenticated, mocker, url, function):
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
# visit route for open
client_authenticated.get(url)
# make sure the open method was called
getattr(DoorHandle, function).assert_called_once_with(user='gandalf')
@pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')])
def test_access_door_unauthenticated(client, mocker, url, function):
# test for trying to visit opening/closing door while not logged in
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
# visit route for open
response = client.get(url, follow_redirects=True)
# we should get redirected to login page
assert 'login' in response.request.url
# the open door function should not be called
getattr(DoorHandle, function).assert_not_called()
def test_manage_admins(client_authenticated):
# visit admin management page
response = client_authenticated.get('/manage_admins')
assert "Nutzer Übersicht" in response.data.decode()
assert "gandalf" in response.data.decode()
assert "gandalf@shire.me" in response.data.decode()
def create_user(client_authenticated, username, email):
# visit admin management page
response = client_authenticated.get('/manage_admins')
csrf_token = extract_csrf_token(response)
# post data for creating a new admin
payload = {'name': username,
'email': email,
'csrf_token': csrf_token}
response = client_authenticated.post('/manage_admins', data=payload,
follow_redirects=True)
# after the new admin user is created, we should have been redirected to the
# /manage_admin page. there, the password for login is displayed
# we test if the newly created user can log in with that password
# extract password displayed on the page
match = re.search('Passwort (?P<password>.*) um', response.data.decode())
assert match is not None
extracted_password = match['password']
return extracted_password
@pytest.fixture
def temp_user(client_authenticated):
"""Fixture for creating a temporary user for testing"""
username = secrets.token_hex(4)
email = username + '@example.com'
# create user for test
password = create_user(client_authenticated, username, email)
return {'username': username,
'email': email,
'password': password}
@pytest.fixture
def temp_admin(client_authenticated):
"""Fixture for creating a temporary admin user for testing"""
username = secrets.token_hex(4)
email = username + '@example.com'
# create user for test
password = create_user(client_authenticated, username, email)
response = client_authenticated.get(
f"/promote_admin/{username}",
follow_redirects=True)
user = lookup_identity(username)
assert user.has_role('admin')
return {'username': username,
'email': email,
'password': password}
def test_backup_users(client_authenticated, temp_user):
# test with invalid token
response = client_authenticated.get("/backup_user_datastore",
follow_redirects=True)
user_data = json.loads(response.data)
users = [d['username'] for d in user_data]
emails = [d['email'] for d in user_data]
assert temp_user['username'] in users
assert temp_user['email'] in emails
def test_create_admin(client_authenticated):
password = create_user(client_authenticated, 'bilbo', 'bilbo@shire.me')
# log out current user
response = client_authenticated.get('/logout')
# try to log in new user using the extracted password
response = headless_login(client_authenticated, user='bilbo@shire.me',
password=password)
# - see if it works
soup = BeautifulSoup(response.data, 'html.parser')
# make sure login succeeded
# -> username should be displayed
assert 'Benutzer <span>bilbo</span>' in soup.decode()
# -> Tür öffnen button will appear
assert any(['Tür öffnen' in link.contents[0] for link in
soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
def test_activate_deactivate_user(temp_user, client_authenticated):
response = client_authenticated.get(
'/admin_toggle_active/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
# deactivate the user
response = client_authenticated.get(
f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True)
# make sure the user is now inactive
user = lookup_identity(temp_user['username'])
assert user is not None
assert not user.active
# activate the user again
client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True)
# now the user should be active again
user = lookup_identity(temp_user['username'])
assert user is not None
assert user.active
# test deactivating super admin
response = client_authenticated.get(f"/admin_toggle_active/gandalf",
follow_redirects=True)
assert 'Super-Admins können nicht deaktiviert werden!' \
in response.data.decode()
def test_delete_admin(temp_user, client_authenticated):
# first we test deleting a non-existing user
response = client_authenticated.post(
'/delete_admins/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
# next, we create a temporary user and try to delete that one
response = client_authenticated.post(
f"/delete_admins/{temp_user['username']}",
follow_redirects=True)
# we need to deactivate the user first
assert 'Bitte den Benutzer zuerst deaktivieren.' in response.data.decode()
# make sure the user still exists
user = lookup_identity(temp_user['username'])
assert user is not None
# deactivate the user and try deleting it again
response = client_authenticated.get(
f"/admin_toggle_active/{temp_user['username']}",
follow_redirects=True)
# try deleting it without filling in the confirmation form
response = client_authenticated.post(
f"/delete_admins/{temp_user['username']}",
follow_redirects=True)
assert 'Der eingegebene Nutzername stimmt nicht überein' \
in response.data.decode()
# make sure the user still exists
user = lookup_identity(temp_user['username'])
assert user is not None
# now we send the confirmation data with the request
response = client_authenticated.get(
f"/delete_admins/{temp_user['username']}",
follow_redirects=True)
csrf_token = extract_csrf_token(response)
payload = {'name': temp_user['username'], 'csrf_token': csrf_token}
response = client_authenticated.post(
f"/delete_admins/{temp_user['username']}",
data=payload,
follow_redirects=True)
assert f"Benutzer {temp_user['username']} wurde gelöscht." in response.data.decode()
# make sure the user now is gone
user = lookup_identity(temp_user['username'])
assert user is None
def test_promote_user(temp_user, client_authenticated):
# first we test with a non-existing user
response = client_authenticated.get(
'/promote_admin/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
user = lookup_identity(temp_user['username'])
assert user is not None
assert not user.has_role('admin')
# grant admin permissions to test user
response = client_authenticated.get(
f"/promote_admin/{temp_user['username']}",
follow_redirects=True)
assert user.has_role('admin')
# try granting admin permissions again
response = client_authenticated.get(
f"/promote_admin/{temp_user['username']}",
follow_redirects=True)
assert f"Benutzer {temp_user['username']} hat bereits Admin-Rechte!"
assert user.has_role('admin')
def test_demote_user(temp_admin, client_authenticated):
# first we test with a non-existing user
response = client_authenticated.get(
'/demote_admin/nosuchuser',
follow_redirects=True)
assert 'Ungültiger Nutzer' in response.data.decode()
user = lookup_identity(temp_admin['username'])
assert user.has_role('admin')
# try removing admin permissions
response = client_authenticated.get(
f"/demote_admin/{temp_admin['username']}",
follow_redirects=True)
assert not user.has_role('admin')
# try removing admin permissions
response = client_authenticated.get(
f"/demote_admin/{temp_admin['username']}",
follow_redirects=True)
assert f"Benutzer {temp_admin['username']} ist bereits kein Admin!" \
in response.data.decode()
assert not user.has_role('admin')
# try removing admin permissions from superadmin
response = client_authenticated.get(
f"/demote_admin/gandalf",
follow_redirects=True)
assert "hat Super-Admin-Rechte und kann nicht verändert werden!" \
in response.data.decode()
def test_list_tokens(client_authenticated):
response = client_authenticated.get(f"/tokens", follow_redirects=True)
# make sure the names for the test tokens are displayed
assert all([user in response.data.decode()
for user in ['Gandalf', 'Bilbo', 'Gimli']])
def test_token_log(client_authenticated):
response = client_authenticated.get(f"/token-log", follow_redirects=True)
page_src = response.data.decode()
# perform some checks for displayed log data
assert all([msg in page_src for msg in
['INFO', 'DEBUG', 'ERROR']])
assert "Valid token 04538cfa186280 of Gandalf" in page_src
assert "2021-04-17 13:09:06,207" in page_src
def test_backup_tokens(client_authenticated):
# test with invalid token
response = client_authenticated.get(f"/backup_tokens",
follow_redirects=True)
token_data = json.loads(response.data)
assert {'04387cfa186280', '043a81fa186280', '04538cfa186280',
'042979fa186280'}.issubset(token_data.keys())
def test_register_token(client_authenticated, mocker):
# test to make sure message is displayed when no tokens were recently
# scanned
response = client_authenticated.get(f"/register-token",
follow_redirects=True)
page_src = response.data.decode()
assert 'Keine unregistrierten Tokens in MQTT Nachrichten.' in page_src
# mockup scanned token
mocker.patch(
'imaginaerraum_door_admin.door_handle.DoorHandle.get_most_recent_token',
lambda x: {'timestamp': datetime.datetime.now(),
'token': '042979fa181280'})
response = client_authenticated.get(f"/register-token", follow_redirects=True)
csrf_token = extract_csrf_token(response)
page_src = response.data.decode()
assert 'Unregistrierter Token gelesen' in page_src
assert '042979fa181280' in page_src
# try registering with incomplete data
response = client_authenticated.post(f"/register-token", data={},
follow_redirects=True)
page_src = response.data.decode()
assert 'Token konnte nicht registiert werden' in page_src
# register the mocked token
payload = {'name': 'Legolas',
'organization': 'Elves',
'email': 'legolas@mirkwood.me',
'dsgvo': True,
'csrf_token': csrf_token}
response = client_authenticated.post(f"/register-token", data=payload,
follow_redirects=True)
page_src = response.data.decode()
# make sure the user info for the new token is displayed
assert '042979fa181280' in page_src
assert 'Legolas' in page_src
assert 'Elves' in page_src
assert 'legolas@mirkwood.me' in page_src
# check that the token is created in the token file
token_data = pathlib.Path(
client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '042979fa181280' in token_data
assert 'Legolas' in token_data
def test_edit_token(client_authenticated):
# test with invalid token
response = client_authenticated.get(f"/edit-token/nosuchtoken",
follow_redirects=True)
page_src = response.data.decode()
assert 'Ungültiger Token' in page_src
response = client_authenticated.post(f"/edit-token/nosuchtoken",
follow_redirects=True)
page_src = response.data.decode()
assert 'Token konnte nicht editiert werden' in page_src
# test using a valid token from the token file
response = client_authenticated.get(f"/edit-token/04538cfa186280",
follow_redirects=True)
csrf_token = extract_csrf_token(response)
payload = {
'name': 'Balin',
'organization': 'Dwarves',
'email': 'balin@erebor.me',
'active': True,
'limit_validity': False,
'valid_thru': datetime.date.today(),
'csrf_token': csrf_token
}
response = client_authenticated.post(f"/edit-token/04538cfa186280",
data=payload,
follow_redirects=True)
page_src = response.data.decode()
# make sure the new user info for the token is displayed
assert '04538cfa186280' in page_src
assert 'Balin' in page_src
assert 'Dwarves' in page_src
assert 'balin@erebor.me' in page_src
# check that the token is changed in the token file
token_data = pathlib.Path(client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '04538cfa186280' in token_data
assert 'Balin' in token_data
def test_delete_token(client_authenticated):
token_data = pathlib.Path(
client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '04538cfa186280' in token_data
# test with invalid token
response = client_authenticated.get(f"/delete-token/nosuchtoken",
follow_redirects=True)
page_src = response.data.decode()
assert 'Ungültiger Token' in page_src
# test using a valid token from the token file
response = client_authenticated.get(f"/delete-token/043a81fa186280",
follow_redirects=True)
csrf_token = extract_csrf_token(response)
# try deleting without form data
response = client_authenticated.post(f"/delete-token/043a81fa186280",
follow_redirects=True)
page_src = response.data.decode()
assert "wurde nicht gelöscht" in page_src
payload = {
'name': 'Bilbo',
'csrf_token': csrf_token
}
response = client_authenticated.post(f"/delete-token/043a81fa186280",
data=payload,
follow_redirects=True)
page_src = response.data.decode()
print(page_src)
assert "wurde gelöscht" in page_src
# check that the token is now gone from the token file
token_data = pathlib.Path(client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '043a81fa186280' not in token_data
def test_deactivate_token(client_authenticated):
token_data = pathlib.Path(
client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '04387cfa186280' in token_data
# test with invalid token
response = client_authenticated.get(f"/deactivate-token/nosuchtoken",
follow_redirects=True)
page_src = response.data.decode()
assert 'Ungültiger Token' in page_src
# deactivate token
response = client_authenticated.get(f"/deactivate-token/04387cfa186280",
follow_redirects=True)
# check that the token is now gone from the token file
token_data = pathlib.Path(
client_authenticated.application.config['TOKEN_FILE']).read_text()
assert '#04387cfa186280' in token_data