Compare commits
58 Commits
master
...
blueprint_
Author | SHA1 | Date | |
---|---|---|---|
ca1de7f82d | |||
786df6b552 | |||
7ec0ba251d | |||
a13e7b3e29 | |||
8d29fbe9cd | |||
74c6661f0e | |||
48e076eb16 | |||
d9a078a114 | |||
a88da152c1 | |||
118f492881 | |||
19d330b6d8 | |||
d53334efd4 | |||
7c29f847bc | |||
3a6ea5926b | |||
5fb652c1d2 | |||
6d9e90631a | |||
f66283c328 | |||
f0be983a5a | |||
e8c1effd15 | |||
4470d2fb82 | |||
856a72b0cc | |||
a77dcd1878 | |||
fa7c878cab | |||
2f40732d0b | |||
47f378fe6e | |||
1320fc55ca | |||
33779e31b4 | |||
2855163948 | |||
0c7821cbe5 | |||
e0bbe02bc1 | |||
d0f093ac16 | |||
95f0ff3563 | |||
f945b7f79e | |||
3bbf60b42f | |||
8d02e669f8 | |||
c941d383c1 | |||
a621b6bb78 | |||
ec1e843f57 | |||
3caf17c861 | |||
f1eaf8af4e | |||
4b3aed25d2 | |||
080ea0f3b0 | |||
ee6ee0e111 | |||
2879a69445 | |||
97957e389c | |||
8f8bdb8cc3 | |||
b64b0c7bb6 | |||
b3c585bd27 | |||
42345273dd | |||
ace5868571 | |||
a104a3d00f | |||
ff9d21bcd5 | |||
38164aca4b | |||
ba9379449a | |||
bb022fd1ce | |||
e0a22f770d | |||
03e7425b2a | |||
b4f9e4525b |
80
README.md
80
README.md
|
@ -1 +1,79 @@
|
|||
Flask-based web interface for user token adminstration of our hackerspace's door lock.
|
||||
Flask-based web interface for user token administration of our hackerspace's door lock.
|
||||
|
||||
## Installation
|
||||
Clone the repo
|
||||
```shell
|
||||
git clone <path_to_repo>
|
||||
```
|
||||
Install using pip
|
||||
```shell
|
||||
pip install .
|
||||
```
|
||||
|
||||
## Running the app
|
||||
```shell
|
||||
export FLASK_APP=imaginaerraum_door_admin
|
||||
flask run
|
||||
```
|
||||
|
||||
## Configuration
|
||||
You can set custom configuration options by defining an environment variable
|
||||
``APPLICATION_SETTINGS`` pointing to a file with configuration options.
|
||||
|
||||
For example, consider the following configuration file ``app_config.py``:
|
||||
```
|
||||
# door app configuration
|
||||
SECRET_KEY = 'mysupersecretkey'
|
||||
SECURITY_PASSWORD_SALT = 'saltycaramel'
|
||||
|
||||
TESTING = False
|
||||
DEBUG = False
|
||||
|
||||
TOKEN_FILE = 'door_tokens.txt'
|
||||
ADMIN_FILE = 'admins.txt'
|
||||
|
||||
NFC_SOCKET = "/tmp/nfc.sock"
|
||||
NFC_LOG = "nfc.log"
|
||||
```
|
||||
To instruct the flask app to use this configuration, use the following commands:
|
||||
```shell
|
||||
export APPLICATION_SETTINGS=app_config.py
|
||||
flask run
|
||||
```
|
||||
|
||||
Below, you can find a list of configuration options.
|
||||
|
||||
### Flask app configuration
|
||||
You can override common Flask configuration variables. In particular, you
|
||||
definitely should set custom values for the ``SECRET_KEY`` and
|
||||
``SECURITY_PASSWORD_SALT``.
|
||||
|
||||
### Token file
|
||||
The token file is an ASCII file which lists the IDs of RFID tokens that can be
|
||||
used to unlock the door. You can specify the path to the token file using the
|
||||
``TOKEN_FILE`` variable in the configuration file.
|
||||
Here's an example of a token file (lines starting with ``#`` represent inactive
|
||||
tokens):
|
||||
```
|
||||
# token | name | organization | email | valid_thru
|
||||
#042979fa186280||||
|
||||
04487cfa176280|Frodo|Hobbits|frodo@shire.me|
|
||||
043a85fa1a6280|Gandalf|Wizards|gandalf@middleearth.me|
|
||||
#04206e2aef6880|Boromir|Humans|boromir@gondor.me|
|
||||
```
|
||||
|
||||
### Admin file
|
||||
``ADMIN_FILE`` -> file to create new super admins
|
||||
|
||||
### User database
|
||||
|
||||
### NFC files
|
||||
``NFC_SOCKET = "/tmp/nfc.sock"`` -> unix socket to interact with the door
|
||||
``NFC_LOG = "nfc.log"`` -> log file of door events
|
||||
|
||||
## Development
|
||||
```shell
|
||||
cd tests
|
||||
export APPLICATION_SETTINGS=/home/simon/imaginaerraum/door-lock/webinterface/tests/debug_app_config.py
|
||||
pytest --cov=../imaginaerraum_door_admin --cov-report=html --cov-report=term
|
||||
```
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
|
||||
from imaginaerraum_door_admin.webapp import create_application
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--key_file", default='/root/flask_keys', help="Path to file with Flask SECRET_KEY and SECURITY_PASSWORD_SALT")
|
||||
parser.add_argument("--token_file", default="/etc/door_tokens", help="path to the file with door tokens and users")
|
||||
parser.add_argument("--nfc_socket", default="/tmp/nfc.sock", help="socket for handling NFC reader commands")
|
||||
parser.add_argument("--template_folder", default="templates", help="path to Flask templates folder")
|
||||
parser.add_argument("--static_folder", default="static", help="path to Flask static folder")
|
||||
parser.add_argument("--admin_file", default="/etc/admins.conf", help="Path to file for creating super admin users")
|
||||
parser.add_argument("--log_file", default="/var/log/webinterface.log", help="Path to flask log file")
|
||||
parser.add_argument("--nfc_log", default="/var/log/nfc.log", help="Path to nfc log file")
|
||||
parser.add_argument("--ldap_url", default="ldaps://ldap.imaginaerraum.de",
|
||||
help="URL for LDAP server for alternative user authorization")
|
||||
parser.add_argument("--mqtt_host", default="10.10.21.2", help="IP address of MQTT broker")
|
||||
parser.add_argument("--flask_port", default=80, help="Port for running the Flask server")
|
||||
parser.add_argument("--mail_server", default="smtp.googlemail.com", help="email server for sending security messages")
|
||||
parser.add_argument("--mail_port", default=465, help="port for security email server")
|
||||
parser.add_argument("--mail_use_tls", default=False, help="use TLS for security emails")
|
||||
parser.add_argument("--mail_use_ssl", default=True, help="use SSL for security emails")
|
||||
parser.add_argument("--mail_username", default="admin@example.com", help="email account for sending security messages")
|
||||
parser.add_argument("--mail_password", default="password", help="Password for email account")
|
||||
config = parser.parse_args()
|
||||
|
||||
app = create_application(config)
|
||||
app.run(host='0.0.0.0', port=config.flask_port)
|
|
@ -0,0 +1,117 @@
|
|||
import logging
|
||||
from flask import Flask
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
|
||||
from email_validator import validate_email
|
||||
from pathlib import Path
|
||||
|
||||
from .door_handle import DoorHandle
|
||||
|
||||
security = Security()
|
||||
db = SQLAlchemy()
|
||||
|
||||
|
||||
# create admin users (only if they don't exist already)
|
||||
def create_super_admins(app, user_datastore):
|
||||
admin_file = Path(app.config.get('ADMIN_FILE'))
|
||||
|
||||
# setup user database when starting the app
|
||||
new_admin_data = []
|
||||
if not admin_file.exists():
|
||||
app.logger.warning(
|
||||
f"Admin user creation file not found at path "
|
||||
f"{admin_file.absolute()}."
|
||||
f"No super admins have been created in the datastore."
|
||||
)
|
||||
else:
|
||||
# store data for new admins in memory s.t. the file can be deleted
|
||||
# afterwards
|
||||
admin_data = admin_file.read_text().split('\n')
|
||||
for i, line in enumerate(admin_data):
|
||||
if len(line.strip()) > 0 and not line.strip().startswith('#'):
|
||||
try:
|
||||
user, email, pw = line.split()
|
||||
validate_email(email)
|
||||
new_admin_data.append(
|
||||
{'username': user, 'email': email,
|
||||
'password': pw})
|
||||
except Exception as e:
|
||||
app.logger.error(
|
||||
f"Error while parsing line {i} in admin config file. "
|
||||
f"Config file should contain lines of <username> "
|
||||
f"<email> <password>\\n'\n "
|
||||
f"Exception: {e}\nAdmin account could not be created."
|
||||
)
|
||||
|
||||
with app.app_context():
|
||||
db.create_all()
|
||||
super_admin_role = user_datastore.find_or_create_role(
|
||||
'super_admin') # root admin = can create other admins
|
||||
admin_role = user_datastore.find_or_create_role(
|
||||
'admin') # 'normal' admin
|
||||
local_role = user_datastore.find_or_create_role(
|
||||
'local') # LDAP user or local user
|
||||
|
||||
for d in new_admin_data:
|
||||
if user_datastore.find_user(email=d['email'],
|
||||
username=d['username']) is None:
|
||||
roles = [super_admin_role, admin_role]
|
||||
if not d['password'] == 'LDAP':
|
||||
roles.append(local_role)
|
||||
|
||||
# create new admin (only if admin does not already exist)
|
||||
new_admin = user_datastore.create_user(
|
||||
email=d['email'], username=d['username'],
|
||||
password=hash_password(d['password']), roles=roles
|
||||
)
|
||||
app.logger.info(
|
||||
f"New super admin user created with username "
|
||||
f"'{new_admin.username}' and email '{new_admin.email}'"
|
||||
f", roles = {[r.name for r in new_admin.roles]}"
|
||||
)
|
||||
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def create_app():
|
||||
app = Flask(__name__)
|
||||
app.config.from_object(
|
||||
'imaginaerraum_door_admin.default_app_config.DefaultConfig'
|
||||
)
|
||||
app.config.from_envvar('APPLICATION_SETTINGS')
|
||||
|
||||
logging.basicConfig(filename=app.config['LOG_FILE'], level=logging.INFO)
|
||||
|
||||
token_file = Path(app.config.get('TOKEN_FILE'))
|
||||
if not token_file.exists():
|
||||
app.logger.warning(
|
||||
f"Token file not found at {token_file.absolute()}. "
|
||||
"An empty token file will be created."
|
||||
)
|
||||
token_file.touch()
|
||||
|
||||
# create door objects which provides access to the token file and current
|
||||
# door state via MQTT
|
||||
app.door = DoorHandle(
|
||||
token_file=token_file, mqtt_host=app.config['MQTT_HOST'],
|
||||
nfc_socket=app.config['NFC_SOCKET'], logger=app.logger
|
||||
)
|
||||
|
||||
# Mail Config
|
||||
#mail = Mail(app)
|
||||
|
||||
# Create database connection object
|
||||
db.init_app(app)
|
||||
|
||||
from . webapp import door_app
|
||||
app.register_blueprint(door_app)
|
||||
|
||||
# Setup Flask-Security
|
||||
from .auth import ExtendedLoginForm, User, Role
|
||||
|
||||
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
||||
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
|
||||
|
||||
create_super_admins(app, user_datastore)
|
||||
|
||||
return app
|
163
imaginaerraum_door_admin/auth.py
Normal file
163
imaginaerraum_door_admin/auth.py
Normal file
|
@ -0,0 +1,163 @@
|
|||
from wtforms.fields import StringField, BooleanField
|
||||
from flask import current_app
|
||||
from flask_security import hash_password
|
||||
from flask_security.forms import LoginForm, Required, PasswordField
|
||||
from flask_security.utils import lookup_identity
|
||||
from flask_security.models import fsqla_v2 as fsqla
|
||||
|
||||
import ldap3
|
||||
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
|
||||
|
||||
from imaginaerraum_door_admin import db, security
|
||||
|
||||
# Define models
|
||||
fsqla.FsModels.set_db_info(db)
|
||||
|
||||
|
||||
class Role(db.Model, fsqla.FsRoleMixin):
|
||||
pass
|
||||
|
||||
|
||||
class User(db.Model, fsqla.FsUserMixin):
|
||||
pass
|
||||
|
||||
|
||||
class ExtendedLoginForm(LoginForm):
|
||||
email = StringField('Benutzername oder E-Mail', [Required()])
|
||||
password = PasswordField('Passwort', [Required()])
|
||||
remember = BooleanField('Login merken?')
|
||||
|
||||
def validate(self):
|
||||
# search for user in the current database
|
||||
user = lookup_identity(self.email.data)
|
||||
if user is not None:
|
||||
# if a user is found we check if it is associated with LDAP or with
|
||||
# the local database
|
||||
if user.has_role('local'):
|
||||
# try authorizing locally using Flask security user datastore
|
||||
authorized = super(ExtendedLoginForm, self).validate()
|
||||
|
||||
if authorized:
|
||||
current_app.logger.info(
|
||||
f"User with credentials '{self.email.data}' authorized "
|
||||
f"through local database"
|
||||
)
|
||||
else:
|
||||
# run LDAP authorization
|
||||
# if the authorization succeeds we also get the new_user_data
|
||||
# dict which contains information about
|
||||
# the user's permissions etc.
|
||||
authorized, new_user_data = self.validate_ldap()
|
||||
|
||||
if authorized:
|
||||
current_app.logger.info(
|
||||
f"User with credentials '{self.email.data}' authorized "
|
||||
f"through LDAP")
|
||||
# update permissions and password/email to stay up to date
|
||||
# for login with no network connection
|
||||
user.email = new_user_data['email']
|
||||
user.password = new_user_data['password']
|
||||
for role in new_user_data['roles']:
|
||||
security.datastore.add_role_to_user(user, role)
|
||||
security.datastore.commit()
|
||||
self.user = user
|
||||
else:
|
||||
self.password.errors = ['Invalid password']
|
||||
else:
|
||||
# this means there is no user with that email in the database
|
||||
# we assume that the username was entered instead of an email and
|
||||
# use that for authentication with LDAP
|
||||
username = self.email.data
|
||||
# try LDAP authorization and create a new user if it succeeds
|
||||
authorized, new_user_data = self.validate_ldap()
|
||||
|
||||
if authorized:
|
||||
# if there was no user in the database before we create a new
|
||||
# user
|
||||
self.user = security.datastore.create_user(
|
||||
username=new_user_data['username'],
|
||||
email=new_user_data['email'],
|
||||
password=new_user_data['password'],
|
||||
roles=new_user_data['roles']
|
||||
)
|
||||
security.datastore.commit()
|
||||
current_app.logger.info(
|
||||
f"New admin user '{new_user_data['username']} "
|
||||
f"<{new_user_data['email']}>' created after successful "
|
||||
f"LDAP authorization"
|
||||
)
|
||||
|
||||
# if any of the authorization methods is successful we authorize
|
||||
# the user
|
||||
return authorized
|
||||
|
||||
def validate_ldap(self):
|
||||
"""Validate the user and password through an LDAP server.
|
||||
|
||||
If the connection completes successfully the given user and password
|
||||
is authorized. Then the permissions and additional information of the
|
||||
user are obtained through an LDAP search.
|
||||
The data is stored in a dict which will be used later to create/update
|
||||
the entry for the user in the local database.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool : result of the authorization process (True = success,
|
||||
False = failure)
|
||||
dict : dictionary with information about an authorized user
|
||||
(contains username, email, hashed password, roles)
|
||||
"""
|
||||
ldap_server = ldap3.Server(current_app.config['LDAP_URL'])
|
||||
ldap_user_group = current_app.config['LDAP_USER_GROUP']
|
||||
ldap_domain = current_app.config['LDAP_DOMAIN']
|
||||
ldap_domain_ext = current_app.config['LDAP_DOMAIN_EXT']
|
||||
|
||||
username = self.email.data
|
||||
password = self.password.data
|
||||
|
||||
try:
|
||||
user = f"uid={username},ou={ldap_user_group},dc={ldap_domain}," \
|
||||
f"dc={ldap_domain_ext}"
|
||||
con = ldap3.Connection(
|
||||
ldap_server,
|
||||
user=user,
|
||||
password=password,
|
||||
auto_bind=True
|
||||
)
|
||||
except ldap3.core.exceptions.LDAPBindError as e:
|
||||
# server reachable but user unauthorized -> fail
|
||||
return False, None
|
||||
except LDAPSocketOpenError as e:
|
||||
# server not reachable -> fail (but will try authorization from
|
||||
# local database later)
|
||||
return False, None
|
||||
except Exception as e:
|
||||
# for other Exceptions we just fail
|
||||
return False, None
|
||||
|
||||
# get user data and permissions from LDAP server
|
||||
new_user_data = {}
|
||||
new_user_data['username'] = username
|
||||
new_user_data['password'] = hash_password(password)
|
||||
new_user_data['roles'] = []
|
||||
search_base = f"ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
|
||||
search_filter = f"(&(uid={username})(memberof=cn=Keyholders," \
|
||||
f"ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))"
|
||||
lock_permission = con.search(
|
||||
search_base, search_filter, attributes=ldap3.ALL_ATTRIBUTES
|
||||
)
|
||||
|
||||
if lock_permission:
|
||||
new_user_data['email'] = con.entries[0].mail.value
|
||||
else:
|
||||
return False, None
|
||||
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,' \
|
||||
f'dc={ldap_domain},dc={ldap_domain_ext}))'
|
||||
token_granting_permission = con.search(search_base, search_filter)
|
||||
if token_granting_permission:
|
||||
new_user_data['roles'].append('admin')
|
||||
|
||||
return True, new_user_data
|
11
imaginaerraum_door_admin/bin/launch_webadmin.py
Executable file
11
imaginaerraum_door_admin/bin/launch_webadmin.py
Executable file
|
@ -0,0 +1,11 @@
|
|||
#!/usr/bin/env python3
|
||||
from imaginaerraum_door_admin import create_app
|
||||
|
||||
|
||||
def main():
|
||||
app = create_app()
|
||||
app.run(host='0.0.0.0', port=app.config.get('PORT', 80))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
61
imaginaerraum_door_admin/default_app_config.py
Normal file
61
imaginaerraum_door_admin/default_app_config.py
Normal file
|
@ -0,0 +1,61 @@
|
|||
import bleach
|
||||
from flask_security import uia_email_mapper
|
||||
print("loading default config")
|
||||
|
||||
|
||||
def uia_username_mapper(identity):
|
||||
# we allow pretty much anything - but we bleach it.
|
||||
return bleach.clean(identity, strip=True)
|
||||
|
||||
|
||||
class DefaultConfig(object):
|
||||
DEBUG = False
|
||||
PORT = 80
|
||||
|
||||
SECRET_KEY = 'supersecret'
|
||||
SECURITY_PASSWORD_SALT = 'salty'
|
||||
|
||||
SECURITY_REGISTERABLE = False
|
||||
SECURITY_CHANGEABLE = True
|
||||
SECURITY_RECOVERABLE = True
|
||||
SECURITY_SEND_PASSWORD_CHANGE_EMAIL = False
|
||||
|
||||
SECURITY_POST_LOGIN_VIEW = '/'
|
||||
SECURITY_EMAIL_SUBJECT_PASSWORD_RESET = 'Passwort zurücksetzen'
|
||||
SECURITY_EMAIL_SUBJECT_PASSWORD_NOTICE = 'Passwort wurde zurückgesetzt'
|
||||
SECURITY_PASSWORD_LENGTH_MIN = 10
|
||||
|
||||
SECURITY_USER_IDENTITY_ATTRIBUTES = [
|
||||
{"email": {"mapper": uia_email_mapper, "case_insensitive": True}},
|
||||
{"username": {"mapper": uia_username_mapper}}
|
||||
]
|
||||
|
||||
# mail configuration
|
||||
MAIL_SERVER = ''
|
||||
MAIL_PORT = 465
|
||||
MAIL_USE_SSL = True
|
||||
MAIL_USERNAME = ''
|
||||
MAIL_PASSWORD = ''
|
||||
MAIL_DEFAULT_SENDER = ''
|
||||
|
||||
SQLALCHEMY_DATABASE_URI = 'sqlite:///admin.db'
|
||||
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
|
||||
# underlying engine. This option makes sure that DB connections from the
|
||||
# pool are still valid. Important for entire application since
|
||||
# many DBaaS options automatically close idle connections.
|
||||
SQLALCHEMY_ENGINE_OPTIONS = {
|
||||
"pool_pre_ping": True,
|
||||
}
|
||||
|
||||
TOKEN_FILE = "door_tokens"
|
||||
ADMIN_FILE = 'admins'
|
||||
|
||||
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
|
||||
LDAP_USER_GROUP = 'Users'
|
||||
LDAP_DOMAIN = 'imaginaerraum'
|
||||
LDAP_DOMAIN_EXT = 'de'
|
||||
|
||||
NFC_SOCKET = "/tmp/nfc.sock"
|
||||
LOG_FILE = "webinterface.log"
|
||||
NFC_LOG = "nfc.log"
|
||||
MQTT_HOST = '10.10.21.2'
|
|
@ -4,23 +4,22 @@ from pathlib import Path
|
|||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class DoorHandle:
|
||||
def __init__(self, token_file, mqtt_host, mqtt_port=1883, nfc_socket='/tmp/nfc.sock', logger=None):
|
||||
def __init__(self, token_file, mqtt_host, mqtt_port=1883,
|
||||
nfc_socket='/tmp/nfc.sock', logger=None):
|
||||
self.state = None
|
||||
self.encoder_position = None
|
||||
|
||||
if not Path(token_file).exists():
|
||||
raise FileNotFoundError(f"File with door tokens could not be found at {Path(token_file).absolute()}")
|
||||
raise FileNotFoundError(
|
||||
"File with door tokens could not be found at "
|
||||
f"{Path(token_file).absolute()}"
|
||||
)
|
||||
|
||||
self.token_file = token_file
|
||||
self.last_invalid = {}
|
||||
|
||||
self.mqtt_client = mqtt.Client()
|
||||
self.mqtt_client.on_connect = self.on_connect
|
||||
self.mqtt_client.on_message = self.on_message
|
||||
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
|
||||
self.mqtt_client.loop_start()
|
||||
|
||||
if logger:
|
||||
self.logger = logger
|
||||
else:
|
||||
|
@ -31,10 +30,18 @@ class DoorHandle:
|
|||
self.nfc_sock.connect(nfc_socket)
|
||||
self.logger.info(f"Connected to NFC socket at {nfc_socket}.")
|
||||
except Exception as e:
|
||||
print(f"Could not connect to NFC socket at {nfc_socket}. Exception: {e}")
|
||||
self.logger.error(f"Could not connect to NFC socket at {nfc_socket}. "
|
||||
f"Exception: {e}")
|
||||
self.nfc_sock = None
|
||||
#raise
|
||||
|
||||
self.mqtt_client = mqtt.Client()
|
||||
self.mqtt_client.on_connect = self.on_connect
|
||||
self.mqtt_client.on_message = self.on_message
|
||||
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
|
||||
self.mqtt_client.loop_start()
|
||||
self.logger.info(f"Connected to MQTT broker at {mqtt_host}:{mqtt_port}")
|
||||
|
||||
self.data_fields = ['name', 'organization', 'email', 'valid_thru']
|
||||
|
||||
# The callback for when the client receives a CONNACK response from the server.
|
||||
|
@ -43,7 +50,7 @@ class DoorHandle:
|
|||
|
||||
# Subscribing in on_connect() means that if we lose the connection and
|
||||
# reconnect then subscriptions will be renewed.
|
||||
client.subscribe("#")
|
||||
client.subscribe("door/#")
|
||||
|
||||
# The callback for when a PUBLISH message is received from the server.
|
||||
def on_message(self, client, userdata, msg):
|
||||
|
@ -70,11 +77,13 @@ class DoorHandle:
|
|||
email = data[3].strip() if len(data) > 3 else None
|
||||
valid_thru = data[4].strip() if len(data) > 4 else None
|
||||
|
||||
tokens[token] = {'name': name,
|
||||
tokens[token] = {
|
||||
'name': name,
|
||||
'organization': organization,
|
||||
'email': email,
|
||||
'valid_thru': valid_thru,
|
||||
'inactive': inactive}
|
||||
'inactive': inactive
|
||||
}
|
||||
return tokens
|
||||
|
||||
def store_tokens(self, tokens):
|
||||
|
@ -98,13 +107,15 @@ class DoorHandle:
|
|||
if self.nfc_sock is not None:
|
||||
self.nfc_sock.send(b'open ' + user.encode() + b'\n')
|
||||
else:
|
||||
raise Exception("No connection to NFC socket. Cannot close door!")
|
||||
self.logger.error("No connection to NFC socket. Cannot open door!")
|
||||
raise RuntimeError("No connection to NFC socket. Cannot open door!")
|
||||
|
||||
def close_door(self, user=''):
|
||||
if self.nfc_sock is not None:
|
||||
self.nfc_sock.send(b'close ' + user.encode() + b'\n')
|
||||
else:
|
||||
raise Exception("No connection to NFC socket. Cannot close door!")
|
||||
self.logger.error("No connection to NFC socket. Cannot close door!")
|
||||
raise RuntimeError("No connection to NFC socket. Cannot close door!")
|
||||
|
||||
def get_most_recent_token(self):
|
||||
# read last invalid token from logfile
|
||||
|
|
46
imaginaerraum_door_admin/forms.py
Normal file
46
imaginaerraum_door_admin/forms.py
Normal file
|
@ -0,0 +1,46 @@
|
|||
from flask import flash
|
||||
from flask_wtf import FlaskForm
|
||||
from wtforms.fields import DateField, EmailField
|
||||
from wtforms.fields import StringField, BooleanField
|
||||
from wtforms.validators import DataRequired, ValidationError, EqualTo
|
||||
from datetime import date
|
||||
|
||||
|
||||
def validate_valid_thru_date(form, field):
|
||||
if form.limit_validity.data:
|
||||
# only check date format if limited validity of token is set
|
||||
try:
|
||||
if not field.data >= date.today():
|
||||
raise ValueError
|
||||
except ValueError as e:
|
||||
flash("Ungültiges Datum")
|
||||
raise ValidationError
|
||||
return True
|
||||
|
||||
|
||||
class TokenForm(FlaskForm):
|
||||
name = StringField("Name", validators=[DataRequired()])
|
||||
email = EmailField("E-Mail", validators=[DataRequired()])
|
||||
organization = StringField("Organization", validators=[DataRequired()])
|
||||
limit_validity = BooleanField("Gültigkeit begrenzen?")
|
||||
valid_thru = DateField("Gültig bis", validators=[validate_valid_thru_date])
|
||||
active = BooleanField("Aktiv?")
|
||||
dsgvo = BooleanField(
|
||||
"Einwilligung Nutzungsbedingungen erfragt?", validators=[DataRequired()]
|
||||
)
|
||||
|
||||
|
||||
class ConfirmDeleteForm(FlaskForm):
|
||||
name = StringField(
|
||||
"Name",
|
||||
validators=[
|
||||
DataRequired(),
|
||||
EqualTo("name_confirm", "Name stimmt nicht überein"),
|
||||
],
|
||||
)
|
||||
name_confirm = StringField("Name confirm")
|
||||
|
||||
|
||||
class AdminCreationForm(FlaskForm):
|
||||
name = StringField("Name", validators=[DataRequired()])
|
||||
email = EmailField("E-Mail", validators=[DataRequired()])
|
|
@ -27,15 +27,15 @@
|
|||
{% endfor %}
|
||||
<td>
|
||||
{% if not data['super_admin'] %}
|
||||
<a href="{{ url_for('admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
|
||||
<a href="{{ url_for('delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
||||
<a href="{{ url_for('door_app.admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
|
||||
<a href="{{ url_for('door_app.delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
||||
{% endif %}
|
||||
{% if data['admin'] %}
|
||||
{% if not data['super_admin'] %}
|
||||
<a href="{{ url_for('demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a>
|
||||
<a href="{{ url_for('door_app.demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a>
|
||||
{% endif %}
|
||||
{% else %}
|
||||
<a href="{{ url_for('promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a>
|
||||
<a href="{{ url_for('door_app.promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a>
|
||||
{% endif %}
|
||||
</td>
|
||||
</tr>
|
||||
|
@ -68,14 +68,14 @@
|
|||
</div>
|
||||
<div class="p-2 bg-light border">
|
||||
<h3>Nutzerdaten sichern:</h3>
|
||||
<form action="{{ url_for('backup_user_datastore') }}" method="get">
|
||||
<form action="{{ url_for('door_app.backup_user_datastore') }}" method="get">
|
||||
<input type="submit" value="Download">
|
||||
</form>
|
||||
</div>
|
||||
|
||||
<div class="p-2 bg-light border">
|
||||
<h3>Nutzerdaten wiederherstellen:</h3>
|
||||
<form action="{{ url_for('restore_user_datastore') }}" method=post enctype=multipart/form-data>
|
||||
<form action="{{ url_for('door_app.restore_user_datastore') }}" method=post enctype=multipart/form-data>
|
||||
<input type=file name=file>
|
||||
<input type=submit value="Abschicken">
|
||||
</form>
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
<body>
|
||||
<nav class="navbar navbar-expand-lg navbar-light bg-light">
|
||||
<div class="container-fluid">
|
||||
<a class="navbar-brand" href="{{ url_for('door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
|
||||
<a class="navbar-brand" href="{{ url_for('door_app.door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
|
||||
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent"
|
||||
aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
|
||||
<span class="navbar-toggler-icon"></span>
|
||||
|
@ -29,10 +29,10 @@
|
|||
Tokens
|
||||
</a>
|
||||
<div class="dropdown-menu" aria-labelledby="navbarDropdown">
|
||||
<a class="dropdown-item" href="{{ url_for('register') }}">Token Registrierung</a>
|
||||
<a class="dropdown-item" href="{{ url_for('list_tokens') }}">Token Übersicht</a>
|
||||
<a class="dropdown-item" href="{{ url_for('door_app.register') }}">Token Registrierung</a>
|
||||
<a class="dropdown-item" href="{{ url_for('door_app.list_tokens') }}">Token Übersicht</a>
|
||||
{% if current_user.has_role('super_admin') %}
|
||||
<a class="dropdown-item" href="{{ url_for('token_log') }}">Token Log</a>
|
||||
<a class="dropdown-item" href="{{ url_for('door_app.token_log') }}">Token Log</a>
|
||||
{% endif %}
|
||||
</div>
|
||||
</li>
|
||||
|
@ -40,7 +40,7 @@
|
|||
|
||||
{% if current_user.has_role('super_admin') %}
|
||||
<li class="nav-item">
|
||||
<a class="nav-link" href="{{ url_for('manage_admins') }}">Benutzer verwalten</a>
|
||||
<a class="nav-link" href="{{ url_for('door_app.manage_admins') }}">Benutzer verwalten</a>
|
||||
</li>
|
||||
{% endif %}
|
||||
|
||||
|
|
|
@ -24,11 +24,11 @@
|
|||
{% if current_user.is_authenticated %}
|
||||
<div class="row">
|
||||
<div class="col-3">
|
||||
<a href="{{ url_for('open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
|
||||
<a href="{{ url_for('door_app.open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
|
||||
</div>
|
||||
<div class="col-1"></div>
|
||||
<div class="col-3">
|
||||
<a href="{{ url_for('close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
|
||||
<a href="{{ url_for('door_app.close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
|
||||
</div>
|
||||
<div class="col-5"></div>
|
||||
</div>
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
und diesen zustimmt.
|
||||
</li>
|
||||
<li>Achtung: Der Token funktioniert nicht sofort, sondern muss erst explizit aktiviert werden!
|
||||
Dazu in der <a href="{{ url_for('list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol
|
||||
Dazu in der <a href="{{ url_for('door_app.list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol
|
||||
(<img src="static/edit.png" title="Editieren" alt="Edit">) klicken und den Haken bei "Aktiv?" setzen.
|
||||
</li>
|
||||
<li>Jetzt kann der Token verwendet werden.</li>
|
||||
|
|
|
@ -29,9 +29,9 @@
|
|||
<td>{{ data[field] if data[field] }}</td>
|
||||
{% endfor %}
|
||||
<td>
|
||||
<a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
|
||||
<a href="{{ url_for('deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a>
|
||||
<a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
||||
<a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
|
||||
<a href="{{ url_for('door_app.deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a>
|
||||
<a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
||||
</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
|
@ -47,14 +47,14 @@
|
|||
<td>{{ data[field] if data[field] }}</td>
|
||||
{% endfor %}
|
||||
<td>
|
||||
<a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
|
||||
<a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
||||
<a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
|
||||
<a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
||||
</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
<form action="{{ url_for('backup_tokens') }}" method="get">
|
||||
<form action="{{ url_for('door_app.backup_tokens') }}" method="get">
|
||||
<input type="submit" value="Token Daten sichern">
|
||||
</form>
|
||||
{% endblock %}
|
File diff suppressed because it is too large
Load Diff
|
@ -1,5 +0,0 @@
|
|||
flask~=1.1.2
|
||||
Flask-Security-Too~=4.0.0
|
||||
WTForms~=2.3.3
|
||||
paho-mqtt~=1.5.1
|
||||
bleach~=3.3.0
|
35
setup.cfg
35
setup.cfg
|
@ -15,3 +15,38 @@ classifiers =
|
|||
Programming Language :: Python :: 3
|
||||
License :: OSI Approved :: GNU General Public License v3 (GPLv3)
|
||||
Operating System :: OS Independent
|
||||
|
||||
[options]
|
||||
python_requires = >=3.8
|
||||
install_requires =
|
||||
bleach
|
||||
Flask
|
||||
Flask-Mail
|
||||
Flask-Security-Too>=5.0.1
|
||||
Flask-SQLAlchemy
|
||||
Flask-WTF
|
||||
email_validator
|
||||
paho-mqtt
|
||||
ldap3
|
||||
wtforms
|
||||
|
||||
include_package_data = True
|
||||
packages = find:
|
||||
setup_requires =
|
||||
wheel
|
||||
tests_require = pytest>=3
|
||||
zip_safe = False
|
||||
|
||||
[options.entry_points]
|
||||
console_scripts =
|
||||
launch_webinterface = imaginaerraum_door_admin.bin.launch_webadmin:main
|
||||
|
||||
[options.extras_require]
|
||||
dev =
|
||||
pytest
|
||||
pytest-cov
|
||||
pytest-flask
|
||||
pytest-mock
|
||||
flake8
|
||||
selenium
|
||||
beautifulsoup4
|
16
setup.py
16
setup.py
|
@ -1,17 +1,3 @@
|
|||
from setuptools import setup
|
||||
|
||||
setup(install_requires=[
|
||||
'bleach',
|
||||
'Flask',
|
||||
'Flask-Mail',
|
||||
'Flask-Security-Too',
|
||||
'Flask-SQLAlchemy',
|
||||
'Flask-WTF',
|
||||
'email_validator',
|
||||
'paho-mqtt',
|
||||
'ldap3',
|
||||
],
|
||||
include_package_data=True,
|
||||
scripts=['bin/launch_webadmin'],
|
||||
packages=['imaginaerraum_door_admin'],
|
||||
zip_safe=False)
|
||||
setup()
|
||||
|
|
24
tests/conftest.py
Normal file
24
tests/conftest.py
Normal file
|
@ -0,0 +1,24 @@
|
|||
import pytest
|
||||
import os
|
||||
|
||||
from selenium.webdriver import Chrome
|
||||
from imaginaerraum_door_admin import create_app
|
||||
|
||||
os.environ['APPLICATION_SETTINGS'] = os.getcwd() + '/debug_app_config.py'
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def app():
|
||||
"""Fixture to launch the webapp"""
|
||||
app = create_app()
|
||||
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def browser():
|
||||
"""Fixture for a selenium browser to access the webapp."""
|
||||
driver = Chrome()
|
||||
driver.implicitly_wait(10)
|
||||
yield driver
|
||||
driver.quit()
|
56
tests/debug_app_config.py
Normal file
56
tests/debug_app_config.py
Normal file
|
@ -0,0 +1,56 @@
|
|||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
TESTING = True
|
||||
DEBUG = True
|
||||
LOG_FILE = 'webapp.log'
|
||||
|
||||
token_file = tempfile.NamedTemporaryFile(delete=False).name
|
||||
Path(token_file).write_text(
|
||||
"""# token | name | organization | email | valid_thru
|
||||
#042979fa186280||||
|
||||
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
|
||||
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
|
||||
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
|
||||
"""
|
||||
)
|
||||
TOKEN_FILE = str(token_file)
|
||||
|
||||
SECRET_KEY = 'supersecret'
|
||||
SECURITY_PASSWORD_SALT = 'salty'
|
||||
|
||||
admin_file = tempfile.NamedTemporaryFile(delete=False).name
|
||||
Path(admin_file).write_text(
|
||||
"""# create new super-admin by putting the following data in this file:
|
||||
# username email ldap/password
|
||||
# the third column can either be LDAP, in which case the user will be able to authenticate through LDAP
|
||||
# or if it's not LDAP then it should be the password for the new user which will be stored in the local database
|
||||
# examples:
|
||||
gandalf gandalf@shire.me shadowfax
|
||||
"""
|
||||
)
|
||||
ADMIN_FILE = str(admin_file)
|
||||
|
||||
db_file = tempfile.NamedTemporaryFile(delete=False).name
|
||||
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + str(db_file)
|
||||
|
||||
token_log_file = tempfile.NamedTemporaryFile(delete=False).name
|
||||
Path(token_log_file).write_text(
|
||||
"""1970-01-01 00:00:12,947 - nfc_log - INFO - Loading tokens
|
||||
1970-01-01 00:00:12,961 - nfc_log - DEBUG - Opening control socket
|
||||
1970-01-01 00:00:24,259 - nfc_log - INFO - Got connection
|
||||
1970-01-01 00:00:28,052 - nfc_log - INFO - Connected to mqtt host with result 0
|
||||
2021-04-17 13:05:09,344 - nfc_log - DEBUG - Control socket opening door
|
||||
2021-04-17 13:05:09,346 - nfc_log - INFO - Opening the door
|
||||
2021-04-17 13:05:25,684 - nfc_log - DEBUG - Control socket closing door
|
||||
2021-04-17 13:05:25,685 - nfc_log - INFO - Closing the door
|
||||
2021-04-17 13:08:57,121 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
|
||||
2021-04-17 13:08:57,123 - nfc_log - INFO - Closing the door
|
||||
2021-04-17 13:09:06,207 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
|
||||
2021-04-17 13:09:06,209 - nfc_log - INFO - Opening the door
|
||||
2021-04-17 13:09:09,017 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
|
||||
2021-04-17 13:09:09,019 - nfc_log - ERROR - Opening the door
|
||||
2021-04-17 13:09:43,272 - nfc_log - DEBUG - Reloading tokens
|
||||
"""
|
||||
)
|
||||
NFC_LOG = str(token_log_file)
|
154
tests/test_door_handle.py
Normal file
154
tests/test_door_handle.py
Normal file
|
@ -0,0 +1,154 @@
|
|||
import datetime
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from imaginaerraum_door_admin.door_handle import DoorHandle
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def nfc_socket(tmp_path):
|
||||
# mock up a Unix socket server for testing
|
||||
import threading
|
||||
import socketserver
|
||||
|
||||
class ThreadedRequestHandler(socketserver.BaseRequestHandler):
|
||||
def handle(self):
|
||||
data = str(self.request.recv(1024), 'ascii')
|
||||
cur_thread = threading.current_thread()
|
||||
response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
|
||||
try:
|
||||
self.request.sendall(response)
|
||||
except BrokenPipeError:
|
||||
pass
|
||||
|
||||
class TreadedUnixServer(socketserver.ThreadingMixIn,
|
||||
socketserver.UnixStreamServer):
|
||||
pass
|
||||
|
||||
nfc_socket_file = tmp_path / 'nfc.sock'
|
||||
server = TreadedUnixServer(str(nfc_socket_file), ThreadedRequestHandler)
|
||||
with server:
|
||||
nfc_socket = server.server_address
|
||||
|
||||
# Start a thread with the server -- that thread will then start one
|
||||
# more thread for each request
|
||||
server_thread = threading.Thread(target=server.serve_forever)
|
||||
# Exit the server thread when the main thread terminates
|
||||
server_thread.daemon = True
|
||||
server_thread.start()
|
||||
print("Server loop running in thread:", server_thread.name)
|
||||
|
||||
yield nfc_socket
|
||||
|
||||
server.shutdown()
|
||||
|
||||
|
||||
def test_create_door_handle(nfc_socket, tmp_path):
|
||||
# test with invalid token file
|
||||
with pytest.raises(FileNotFoundError):
|
||||
door_handle = DoorHandle(token_file='no_such_file',
|
||||
mqtt_host='test.mosquitto.org')
|
||||
|
||||
token_file = tmp_path / 'door_tokens'
|
||||
token_file.write_text('')
|
||||
door_handle = DoorHandle(
|
||||
token_file=token_file, mqtt_host='test.mosquitto.org',
|
||||
nfc_socket=nfc_socket
|
||||
)
|
||||
door_handle.nfc_sock.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def door_handle(nfc_socket, tmp_path):
|
||||
token_file = tmp_path / 'door_tokens'
|
||||
token_file.write_text(
|
||||
"""# token | name | organization | email | valid_thru
|
||||
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
|
||||
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
|
||||
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
|
||||
"""
|
||||
)
|
||||
door_handle = DoorHandle(
|
||||
token_file=token_file, mqtt_host='test.mosquitto.org',
|
||||
nfc_socket=nfc_socket
|
||||
)
|
||||
yield door_handle
|
||||
door_handle.nfc_sock.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def door_handle_no_nfc(tmp_path):
|
||||
token_file = tmp_path / 'door_tokens'
|
||||
token_file.write_text('')
|
||||
door_handle = DoorHandle(
|
||||
token_file=token_file, mqtt_host='test.mosquitto.org',
|
||||
)
|
||||
yield door_handle
|
||||
|
||||
|
||||
def test_store_tokens(door_handle):
|
||||
new_tokens = {
|
||||
'042979fa186280': {
|
||||
'name': 'Pippin',
|
||||
'organization': 'Hobbits',
|
||||
'email': 'pippin@shire.me',
|
||||
'valid_thru': None,
|
||||
'inactive': False
|
||||
}
|
||||
}
|
||||
|
||||
door_handle.store_tokens(new_tokens)
|
||||
|
||||
token_file = Path(door_handle.token_file)
|
||||
assert token_file.exists()
|
||||
assert '042979fa186280' in token_file.read_text()
|
||||
|
||||
|
||||
def test_mqtt_messages(door_handle):
|
||||
# test sending messages to the door_handle via MQTT and see if the state
|
||||
# gets updated accordingly
|
||||
client = mqtt.Client()
|
||||
|
||||
con = client.connect('test.mosquitto.org', 1883)
|
||||
client.publish('door/position/value', 10).wait_for_publish(1)
|
||||
time.sleep(1)
|
||||
assert door_handle.encoder_position == 10
|
||||
|
||||
client.publish('door/state/value', 'open').wait_for_publish(1)
|
||||
time.sleep(1)
|
||||
assert door_handle.state == 'open'
|
||||
|
||||
timestamp = datetime.datetime.now().replace(microsecond=0)
|
||||
token = '042979fa186280'
|
||||
client.publish(
|
||||
'door/token/last_invalid',
|
||||
f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')};{token}"
|
||||
).wait_for_publish(1)
|
||||
time.sleep(1)
|
||||
most_recent_token = door_handle.get_most_recent_token()
|
||||
assert most_recent_token['timestamp'] == timestamp
|
||||
assert most_recent_token['token'] == token
|
||||
|
||||
client.disconnect()
|
||||
|
||||
|
||||
def test_open_door(door_handle):
|
||||
door_handle.open_door()
|
||||
|
||||
|
||||
def test_open_door_broken_socket(door_handle_no_nfc):
|
||||
# test broken nfc_socket connection
|
||||
with pytest.raises(RuntimeError):
|
||||
door_handle_no_nfc.open_door()
|
||||
|
||||
|
||||
def test_close_door(door_handle):
|
||||
door_handle.close_door()
|
||||
|
||||
|
||||
def test_close_door_broken_socket(door_handle_no_nfc):
|
||||
# test broken nfc_socket connection
|
||||
with pytest.raises(RuntimeError):
|
||||
door_handle_no_nfc.close_door()
|
622
tests/test_webinterface.py
Normal file
622
tests/test_webinterface.py
Normal file
|
@ -0,0 +1,622 @@
|
|||
import datetime
|
||||
|
||||
import pytest
|
||||
from bs4 import BeautifulSoup
|
||||
from flask_security.utils import lookup_identity
|
||||
from imaginaerraum_door_admin.door_handle import DoorHandle
|
||||
from imaginaerraum_door_admin.auth import ExtendedLoginForm
|
||||
import re
|
||||
import secrets
|
||||
import pathlib
|
||||
import json
|
||||
|
||||
|
||||
def test_login(browser, live_server):
|
||||
response = browser.get(f'http://localhost:{live_server.port}')
|
||||
|
||||
assert '<h1>Space Zugangsverwaltung</h1>' in browser.page_source
|
||||
|
||||
response = browser.get(f'http://localhost:{live_server.port}/login')
|
||||
|
||||
email_form = browser.find_element_by_xpath('//input[@id="email"]')
|
||||
email_form.send_keys('gandalf@shire.me')
|
||||
password_form = browser.find_element_by_xpath('//input[@id="password"]')
|
||||
password_form.send_keys('shadowfax')
|
||||
submit_button = browser.find_element_by_xpath('//input[@id="submit"]')
|
||||
submit_button.click()
|
||||
|
||||
assert 'Tür öffnen' in browser.page_source
|
||||
|
||||
|
||||
def extract_csrf_token(response):
|
||||
soup = BeautifulSoup(response.data, 'html.parser')
|
||||
csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value']
|
||||
return csrf_token
|
||||
|
||||
|
||||
def headless_login(client, user='gandalf@shire.me', password='shadowfax'):
|
||||
# extract csrf token from the login page source
|
||||
response = client.get('/login', follow_redirects=True)
|
||||
csrf_token = extract_csrf_token(response)
|
||||
|
||||
# send login information
|
||||
payload = {
|
||||
'csrf_token': csrf_token,
|
||||
'email': user,
|
||||
'password': password
|
||||
}
|
||||
return client.post('/login', data=payload, follow_redirects=True)
|
||||
|
||||
|
||||
def test_login_headless(client):
|
||||
response = headless_login(client)
|
||||
soup = BeautifulSoup(response.data, 'html.parser')
|
||||
|
||||
# make sure login succeeded -> Tür öffnen button will appear
|
||||
assert any(['Tür öffnen' in link.contents[0]
|
||||
for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
||||
|
||||
|
||||
def test_validate_ldap(client, mocker):
|
||||
# mock ldap Connection object to simulate successful authentication
|
||||
import ldap3
|
||||
def init_success(self, *args, **kwargs):
|
||||
pass
|
||||
def init_LDAPBindError(self, *args, **kwargs):
|
||||
raise ldap3.core.exceptions.LDAPBindError()
|
||||
def init_LDAPSocketOpenError(self, *args, **kwargs):
|
||||
raise ldap3.core.exceptions.LDAPSocketOpenError()
|
||||
def init_Exception(self, *args, **kwargs):
|
||||
raise Exception()
|
||||
def search_success(self, *args, **kwargs):
|
||||
return True
|
||||
def search_failure(self, *args, **kwargs):
|
||||
return False
|
||||
|
||||
mocker.patch.object(ldap3.Connection, '__init__', init_success)
|
||||
mocker.patch.object(ldap3.Connection, 'search', search_success)
|
||||
mock_entries = mocker.MagicMock()
|
||||
mock_entries[0].mail.value = 'user@example.com'
|
||||
mocker.patch.object(ldap3.Connection, 'entries', mock_entries)
|
||||
|
||||
with client.application.app_context():
|
||||
# test successful login
|
||||
form = ExtendedLoginForm()
|
||||
form.email.data = 'user'
|
||||
form.password.data = 'password'
|
||||
result = form.validate_ldap()
|
||||
assert result[0]
|
||||
assert result[1]['username'] == 'user'
|
||||
assert result[1]['email'] == 'user@example.com'
|
||||
assert result[1]['roles'] == ['admin']
|
||||
|
||||
# test failing ldap search
|
||||
mocker.patch.object(ldap3.Connection, 'search', search_failure)
|
||||
result = form.validate_ldap()
|
||||
|
||||
assert not result[0]
|
||||
assert result[1] is None
|
||||
|
||||
# test some errors in ldap Connection and authentication
|
||||
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPBindError)
|
||||
result = form.validate_ldap()
|
||||
assert not result[0]
|
||||
assert result[1] is None
|
||||
|
||||
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPSocketOpenError)
|
||||
result = form.validate_ldap()
|
||||
assert not result[0]
|
||||
assert result[1] is None
|
||||
|
||||
mocker.patch.object(ldap3.Connection, '__init__', init_Exception)
|
||||
result = form.validate_ldap()
|
||||
assert not result[0]
|
||||
assert result[1] is None
|
||||
|
||||
|
||||
def test_login_ldap(client, temp_user, mocker):
|
||||
# mock ldap validation for admin user
|
||||
def mock_validate(self):
|
||||
auth = self.email.data == temp_user['username'] and self.password.data == temp_user['password']
|
||||
user_data = {'username': temp_user['username'],
|
||||
'email': temp_user['email'],
|
||||
'roles': ['admin'],
|
||||
'password': temp_user['password']}
|
||||
return auth, user_data
|
||||
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
|
||||
|
||||
user = lookup_identity(temp_user['username'])
|
||||
# remove local role so that ldap authentication is the default
|
||||
user.roles.pop(0)
|
||||
|
||||
# log out admin user
|
||||
client.get('/logout')
|
||||
|
||||
# log in temp user using ldap
|
||||
response = headless_login(client, user=temp_user['username'],
|
||||
password=temp_user['password'])
|
||||
soup = BeautifulSoup(response.data, 'html.parser')
|
||||
|
||||
# make sure login succeeded -> Tür öffnen button will appear
|
||||
assert any(['Tür öffnen' in link.contents[0]
|
||||
for link in soup.findAll('a', attrs={'class': ['btn'],
|
||||
'role': 'button'})])
|
||||
|
||||
|
||||
def test_login_ldap_new_user(client, mocker):
|
||||
# mock ldap validation for admin user
|
||||
def mock_validate(self):
|
||||
auth = True
|
||||
user_data = {'username': 'Balrog',
|
||||
'email': 'balrog@moria.me',
|
||||
'roles': ['admin'],
|
||||
'password': 'youshallnotpass'}
|
||||
return auth, user_data
|
||||
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
|
||||
|
||||
# initially, the Balrog user should not exist
|
||||
user = lookup_identity('Balrog')
|
||||
assert user is None
|
||||
|
||||
# log in temp user using ldap -> this will succeed and create a local user
|
||||
response = headless_login(client, user='Balrog',
|
||||
password='youshallnotpass')
|
||||
soup = BeautifulSoup(response.data, 'html.parser')
|
||||
|
||||
# make sure user is now created locally
|
||||
user = lookup_identity('Balrog')
|
||||
assert user is not None
|
||||
|
||||
# make sure login succeeded -> Tür öffnen button will appear
|
||||
assert any(['Tür öffnen' in link.contents[0]
|
||||
for link in soup.findAll('a', attrs={'class': ['btn'],
|
||||
'role': 'button'})])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client_authenticated(client):
|
||||
# log in using admin account for testing
|
||||
headless_login(client)
|
||||
|
||||
yield client
|
||||
|
||||
|
||||
@pytest.mark.parametrize("url,function", [('/open', 'open_door'),
|
||||
('/close', 'close_door')])
|
||||
def test_access_door_button(client_authenticated, mocker, url, function):
|
||||
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
|
||||
|
||||
# visit route for open
|
||||
client_authenticated.get(url)
|
||||
|
||||
# make sure the open method was called
|
||||
getattr(DoorHandle, function).assert_called_once_with(user='gandalf')
|
||||
|
||||
|
||||
@pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')])
|
||||
def test_access_door_unauthenticated(client, mocker, url, function):
|
||||
# test for trying to visit opening/closing door while not logged in
|
||||
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
|
||||
|
||||
# visit route for open
|
||||
response = client.get(url, follow_redirects=True)
|
||||
|
||||
# we should get redirected to login page
|
||||
assert 'login' in response.request.url
|
||||
|
||||
# the open door function should not be called
|
||||
getattr(DoorHandle, function).assert_not_called()
|
||||
|
||||
|
||||
def test_manage_admins(client_authenticated):
|
||||
# visit admin management page
|
||||
response = client_authenticated.get('/manage_admins')
|
||||
|
||||
assert "Nutzer Übersicht" in response.data.decode()
|
||||
assert "gandalf" in response.data.decode()
|
||||
assert "gandalf@shire.me" in response.data.decode()
|
||||
|
||||
|
||||
def create_user(client_authenticated, username, email):
|
||||
# visit admin management page
|
||||
response = client_authenticated.get('/manage_admins')
|
||||
csrf_token = extract_csrf_token(response)
|
||||
|
||||
# post data for creating a new admin
|
||||
payload = {'name': username,
|
||||
'email': email,
|
||||
'csrf_token': csrf_token}
|
||||
response = client_authenticated.post('/manage_admins', data=payload,
|
||||
follow_redirects=True)
|
||||
|
||||
# after the new admin user is created, we should have been redirected to the
|
||||
# /manage_admin page. there, the password for login is displayed
|
||||
# we test if the newly created user can log in with that password
|
||||
# extract password displayed on the page
|
||||
match = re.search('Passwort (?P<password>.*) um', response.data.decode())
|
||||
assert match is not None
|
||||
extracted_password = match['password']
|
||||
|
||||
return extracted_password
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_user(client_authenticated):
|
||||
"""Fixture for creating a temporary user for testing"""
|
||||
username = secrets.token_hex(4)
|
||||
email = username + '@example.com'
|
||||
|
||||
# create user for test
|
||||
password = create_user(client_authenticated, username, email)
|
||||
|
||||
return {'username': username,
|
||||
'email': email,
|
||||
'password': password}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_admin(client_authenticated):
|
||||
"""Fixture for creating a temporary admin user for testing"""
|
||||
username = secrets.token_hex(4)
|
||||
email = username + '@example.com'
|
||||
|
||||
# create user for test
|
||||
password = create_user(client_authenticated, username, email)
|
||||
|
||||
response = client_authenticated.get(
|
||||
f"/promote_admin/{username}",
|
||||
follow_redirects=True)
|
||||
user = lookup_identity(username)
|
||||
assert user.has_role('admin')
|
||||
|
||||
return {'username': username,
|
||||
'email': email,
|
||||
'password': password}
|
||||
|
||||
|
||||
def test_backup_users(client_authenticated, temp_user):
|
||||
# test with invalid token
|
||||
response = client_authenticated.get("/backup_user_datastore",
|
||||
follow_redirects=True)
|
||||
user_data = json.loads(response.data)
|
||||
users = [d['username'] for d in user_data]
|
||||
emails = [d['email'] for d in user_data]
|
||||
|
||||
assert temp_user['username'] in users
|
||||
assert temp_user['email'] in emails
|
||||
|
||||
|
||||
def test_create_admin(client_authenticated):
|
||||
password = create_user(client_authenticated, 'bilbo', 'bilbo@shire.me')
|
||||
|
||||
# log out current user
|
||||
response = client_authenticated.get('/logout')
|
||||
|
||||
# try to log in new user using the extracted password
|
||||
response = headless_login(client_authenticated, user='bilbo@shire.me',
|
||||
password=password)
|
||||
# - see if it works
|
||||
soup = BeautifulSoup(response.data, 'html.parser')
|
||||
|
||||
# make sure login succeeded
|
||||
# -> username should be displayed
|
||||
assert 'Benutzer <span>bilbo</span>' in soup.decode()
|
||||
# -> Tür öffnen button will appear
|
||||
assert any(['Tür öffnen' in link.contents[0] for link in
|
||||
soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
||||
|
||||
|
||||
def test_activate_deactivate_user(temp_user, client_authenticated):
|
||||
response = client_authenticated.get(
|
||||
'/admin_toggle_active/nosuchuser',
|
||||
follow_redirects=True)
|
||||
assert 'Ungültiger Nutzer' in response.data.decode()
|
||||
|
||||
# deactivate the user
|
||||
response = client_authenticated.get(
|
||||
f"/admin_toggle_active/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
# make sure the user is now inactive
|
||||
user = lookup_identity(temp_user['username'])
|
||||
assert user is not None
|
||||
assert not user.active
|
||||
|
||||
# activate the user again
|
||||
client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
|
||||
# now the user should be active again
|
||||
user = lookup_identity(temp_user['username'])
|
||||
assert user is not None
|
||||
assert user.active
|
||||
|
||||
# test deactivating super admin
|
||||
response = client_authenticated.get(f"/admin_toggle_active/gandalf",
|
||||
follow_redirects=True)
|
||||
assert 'Super-Admins können nicht deaktiviert werden!' \
|
||||
in response.data.decode()
|
||||
|
||||
|
||||
def test_delete_admin(temp_user, client_authenticated):
|
||||
# first we test deleting a non-existing user
|
||||
response = client_authenticated.post(
|
||||
'/delete_admins/nosuchuser',
|
||||
follow_redirects=True)
|
||||
assert 'Ungültiger Nutzer' in response.data.decode()
|
||||
|
||||
# next, we create a temporary user and try to delete that one
|
||||
response = client_authenticated.post(
|
||||
f"/delete_admins/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
|
||||
# we need to deactivate the user first
|
||||
assert 'Bitte den Benutzer zuerst deaktivieren.' in response.data.decode()
|
||||
# make sure the user still exists
|
||||
user = lookup_identity(temp_user['username'])
|
||||
assert user is not None
|
||||
|
||||
# deactivate the user and try deleting it again
|
||||
response = client_authenticated.get(
|
||||
f"/admin_toggle_active/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
|
||||
# try deleting it without filling in the confirmation form
|
||||
response = client_authenticated.post(
|
||||
f"/delete_admins/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
assert 'Der eingegebene Nutzername stimmt nicht überein' \
|
||||
in response.data.decode()
|
||||
# make sure the user still exists
|
||||
user = lookup_identity(temp_user['username'])
|
||||
assert user is not None
|
||||
|
||||
# now we send the confirmation data with the request
|
||||
response = client_authenticated.get(
|
||||
f"/delete_admins/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
csrf_token = extract_csrf_token(response)
|
||||
payload = {'name': temp_user['username'], 'csrf_token': csrf_token}
|
||||
response = client_authenticated.post(
|
||||
f"/delete_admins/{temp_user['username']}",
|
||||
data=payload,
|
||||
follow_redirects=True)
|
||||
assert f"Benutzer {temp_user['username']} wurde gelöscht." in response.data.decode()
|
||||
|
||||
# make sure the user now is gone
|
||||
user = lookup_identity(temp_user['username'])
|
||||
assert user is None
|
||||
|
||||
|
||||
def test_promote_user(temp_user, client_authenticated):
|
||||
# first we test with a non-existing user
|
||||
response = client_authenticated.get(
|
||||
'/promote_admin/nosuchuser',
|
||||
follow_redirects=True)
|
||||
assert 'Ungültiger Nutzer' in response.data.decode()
|
||||
|
||||
user = lookup_identity(temp_user['username'])
|
||||
assert user is not None
|
||||
assert not user.has_role('admin')
|
||||
# grant admin permissions to test user
|
||||
response = client_authenticated.get(
|
||||
f"/promote_admin/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
assert user.has_role('admin')
|
||||
|
||||
# try granting admin permissions again
|
||||
response = client_authenticated.get(
|
||||
f"/promote_admin/{temp_user['username']}",
|
||||
follow_redirects=True)
|
||||
assert f"Benutzer {temp_user['username']} hat bereits Admin-Rechte!"
|
||||
assert user.has_role('admin')
|
||||
|
||||
|
||||
def test_demote_user(temp_admin, client_authenticated):
|
||||
# first we test with a non-existing user
|
||||
response = client_authenticated.get(
|
||||
'/demote_admin/nosuchuser',
|
||||
follow_redirects=True)
|
||||
assert 'Ungültiger Nutzer' in response.data.decode()
|
||||
|
||||
user = lookup_identity(temp_admin['username'])
|
||||
assert user.has_role('admin')
|
||||
# try removing admin permissions
|
||||
response = client_authenticated.get(
|
||||
f"/demote_admin/{temp_admin['username']}",
|
||||
follow_redirects=True)
|
||||
assert not user.has_role('admin')
|
||||
|
||||
# try removing admin permissions
|
||||
response = client_authenticated.get(
|
||||
f"/demote_admin/{temp_admin['username']}",
|
||||
follow_redirects=True)
|
||||
assert f"Benutzer {temp_admin['username']} ist bereits kein Admin!" \
|
||||
in response.data.decode()
|
||||
assert not user.has_role('admin')
|
||||
|
||||
# try removing admin permissions from superadmin
|
||||
response = client_authenticated.get(
|
||||
f"/demote_admin/gandalf",
|
||||
follow_redirects=True)
|
||||
|
||||
assert "hat Super-Admin-Rechte und kann nicht verändert werden!" \
|
||||
in response.data.decode()
|
||||
|
||||
|
||||
def test_list_tokens(client_authenticated):
|
||||
response = client_authenticated.get(f"/tokens", follow_redirects=True)
|
||||
|
||||
# make sure the names for the test tokens are displayed
|
||||
assert all([user in response.data.decode()
|
||||
for user in ['Gandalf', 'Bilbo', 'Gimli']])
|
||||
|
||||
|
||||
def test_token_log(client_authenticated):
|
||||
response = client_authenticated.get(f"/token-log", follow_redirects=True)
|
||||
|
||||
page_src = response.data.decode()
|
||||
|
||||
# perform some checks for displayed log data
|
||||
assert all([msg in page_src for msg in
|
||||
['INFO', 'DEBUG', 'ERROR']])
|
||||
assert "Valid token 04538cfa186280 of Gandalf" in page_src
|
||||
assert "2021-04-17 13:09:06,207" in page_src
|
||||
|
||||
|
||||
def test_backup_tokens(client_authenticated):
|
||||
# test with invalid token
|
||||
response = client_authenticated.get(f"/backup_tokens",
|
||||
follow_redirects=True)
|
||||
token_data = json.loads(response.data)
|
||||
|
||||
assert {'04387cfa186280', '043a81fa186280', '04538cfa186280',
|
||||
'042979fa186280'}.issubset(token_data.keys())
|
||||
|
||||
|
||||
def test_register_token(client_authenticated, mocker):
|
||||
# test to make sure message is displayed when no tokens were recently
|
||||
# scanned
|
||||
response = client_authenticated.get(f"/register-token",
|
||||
follow_redirects=True)
|
||||
page_src = response.data.decode()
|
||||
assert 'Keine unregistrierten Tokens in MQTT Nachrichten.' in page_src
|
||||
|
||||
# mockup scanned token
|
||||
mocker.patch(
|
||||
'imaginaerraum_door_admin.door_handle.DoorHandle.get_most_recent_token',
|
||||
lambda x: {'timestamp': datetime.datetime.now(),
|
||||
'token': '042979fa181280'})
|
||||
response = client_authenticated.get(f"/register-token", follow_redirects=True)
|
||||
csrf_token = extract_csrf_token(response)
|
||||
page_src = response.data.decode()
|
||||
assert 'Unregistrierter Token gelesen' in page_src
|
||||
assert '042979fa181280' in page_src
|
||||
|
||||
# try registering with incomplete data
|
||||
response = client_authenticated.post(f"/register-token", data={},
|
||||
follow_redirects=True)
|
||||
page_src = response.data.decode()
|
||||
assert 'Token konnte nicht registiert werden' in page_src
|
||||
|
||||
# register the mocked token
|
||||
payload = {'name': 'Legolas',
|
||||
'organization': 'Elves',
|
||||
'email': 'legolas@mirkwood.me',
|
||||
'dsgvo': True,
|
||||
'csrf_token': csrf_token}
|
||||
response = client_authenticated.post(f"/register-token", data=payload,
|
||||
follow_redirects=True)
|
||||
page_src = response.data.decode()
|
||||
|
||||
# make sure the user info for the new token is displayed
|
||||
assert '042979fa181280' in page_src
|
||||
assert 'Legolas' in page_src
|
||||
assert 'Elves' in page_src
|
||||
assert 'legolas@mirkwood.me' in page_src
|
||||
|
||||
# check that the token is created in the token file
|
||||
token_data = pathlib.Path(
|
||||
client_authenticated.application.config['TOKEN_FILE']).read_text()
|
||||
assert '042979fa181280' in token_data
|
||||
assert 'Legolas' in token_data
|
||||
|
||||
|
||||
def test_edit_token(client_authenticated):
|
||||
# test with invalid token
|
||||
response = client_authenticated.get(f"/edit-token/nosuchtoken",
|
||||
follow_redirects=True)
|
||||
page_src = response.data.decode()
|
||||
assert 'Ungültiger Token' in page_src
|
||||
|
||||
response = client_authenticated.post(f"/edit-token/nosuchtoken",
|
||||
follow_redirects=True)
|
||||
page_src = response.data.decode()
|
||||
assert 'Token konnte nicht editiert werden' in page_src
|
||||
|
||||
# test using a valid token from the token file
|
||||
response = client_authenticated.get(f"/edit-token/04538cfa186280",
|
||||
follow_redirects=True)
|
||||
csrf_token = extract_csrf_token(response)
|
||||
|
||||
payload = {
|
||||
'name': 'Balin',
|
||||
'organization': 'Dwarves',
|
||||
'email': 'balin@erebor.me',
|
||||
'active': True,
|
||||
'limit_validity': False,
|
||||
'valid_thru': datetime.date.today(),
|
||||
'csrf_token': csrf_token
|
||||
}
|
||||
response = client_authenticated.post(f"/edit-token/04538cfa186280",
|
||||
data=payload,
|
||||
follow_redirects=True)
|
||||
page_src = response.data.decode()
|
||||
# make sure the new user info for the token is displayed
|
||||
assert '04538cfa186280' in page_src
|
||||
assert 'Balin' in page_src
|
||||
assert 'Dwarves' in page_src
|
||||
assert 'balin@erebor.me' in page_src
|
||||
|
||||
# check that the token is changed in the token file
|
||||
token_data = pathlib.Path(client_authenticated.application.config['TOKEN_FILE']).read_text()
|
||||
assert '04538cfa186280' in token_data
|
||||
assert 'Balin' in token_data
|
||||
|
||||
|
||||
def test_delete_token(client_authenticated):
|
||||
token_data = pathlib.Path(
|
||||
client_authenticated.application.config['TOKEN_FILE']).read_text()
|
||||
assert '04538cfa186280' in token_data
|
||||
|
||||
# test with invalid token
|
||||
response = client_authenticated.get(f"/delete-token/nosuchtoken",
|
||||
follow_redirects=True)
|
||||
page_src = response.data.decode()
|
||||
assert 'Ungültiger Token' in page_src
|
||||
|
||||
# test using a valid token from the token file
|
||||
response = client_authenticated.get(f"/delete-token/043a81fa186280",
|
||||
follow_redirects=True)
|
||||
csrf_token = extract_csrf_token(response)
|
||||
|
||||
# try deleting without form data
|
||||
response = client_authenticated.post(f"/delete-token/043a81fa186280",
|
||||
follow_redirects=True)
|
||||
page_src = response.data.decode()
|
||||
assert "wurde nicht gelöscht" in page_src
|
||||
|
||||
payload = {
|
||||
'name': 'Bilbo',
|
||||
'csrf_token': csrf_token
|
||||
}
|
||||
response = client_authenticated.post(f"/delete-token/043a81fa186280",
|
||||
data=payload,
|
||||
follow_redirects=True)
|
||||
page_src = response.data.decode()
|
||||
print(page_src)
|
||||
assert "wurde gelöscht" in page_src
|
||||
|
||||
# check that the token is now gone from the token file
|
||||
token_data = pathlib.Path(client_authenticated.application.config['TOKEN_FILE']).read_text()
|
||||
assert '043a81fa186280' not in token_data
|
||||
|
||||
|
||||
def test_deactivate_token(client_authenticated):
|
||||
token_data = pathlib.Path(
|
||||
client_authenticated.application.config['TOKEN_FILE']).read_text()
|
||||
assert '04387cfa186280' in token_data
|
||||
|
||||
# test with invalid token
|
||||
response = client_authenticated.get(f"/deactivate-token/nosuchtoken",
|
||||
follow_redirects=True)
|
||||
page_src = response.data.decode()
|
||||
assert 'Ungültiger Token' in page_src
|
||||
|
||||
# deactivate token
|
||||
response = client_authenticated.get(f"/deactivate-token/04387cfa186280",
|
||||
follow_redirects=True)
|
||||
|
||||
# check that the token is now gone from the token file
|
||||
token_data = pathlib.Path(
|
||||
client_authenticated.application.config['TOKEN_FILE']).read_text()
|
||||
assert '#04387cfa186280' in token_data
|
Loading…
Reference in New Issue
Block a user