Compare commits
No commits in common. "blueprint_refactoring" and "master" have entirely different histories.
blueprint_
...
master
80
README.md
80
README.md
|
@ -1,79 +1 @@
|
||||||
Flask-based web interface for user token administration of our hackerspace's door lock.
|
Flask-based web interface for user token adminstration of our hackerspace's door lock.
|
||||||
|
|
||||||
## Installation
|
|
||||||
Clone the repo
|
|
||||||
```shell
|
|
||||||
git clone <path_to_repo>
|
|
||||||
```
|
|
||||||
Install using pip
|
|
||||||
```shell
|
|
||||||
pip install .
|
|
||||||
```
|
|
||||||
|
|
||||||
## Running the app
|
|
||||||
```shell
|
|
||||||
export FLASK_APP=imaginaerraum_door_admin
|
|
||||||
flask run
|
|
||||||
```
|
|
||||||
|
|
||||||
## Configuration
|
|
||||||
You can set custom configuration options by defining an environment variable
|
|
||||||
``APPLICATION_SETTINGS`` pointing to a file with configuration options.
|
|
||||||
|
|
||||||
For example, consider the following configuration file ``app_config.py``:
|
|
||||||
```
|
|
||||||
# door app configuration
|
|
||||||
SECRET_KEY = 'mysupersecretkey'
|
|
||||||
SECURITY_PASSWORD_SALT = 'saltycaramel'
|
|
||||||
|
|
||||||
TESTING = False
|
|
||||||
DEBUG = False
|
|
||||||
|
|
||||||
TOKEN_FILE = 'door_tokens.txt'
|
|
||||||
ADMIN_FILE = 'admins.txt'
|
|
||||||
|
|
||||||
NFC_SOCKET = "/tmp/nfc.sock"
|
|
||||||
NFC_LOG = "nfc.log"
|
|
||||||
```
|
|
||||||
To instruct the flask app to use this configuration, use the following commands:
|
|
||||||
```shell
|
|
||||||
export APPLICATION_SETTINGS=app_config.py
|
|
||||||
flask run
|
|
||||||
```
|
|
||||||
|
|
||||||
Below, you can find a list of configuration options.
|
|
||||||
|
|
||||||
### Flask app configuration
|
|
||||||
You can override common Flask configuration variables. In particular, you
|
|
||||||
definitely should set custom values for the ``SECRET_KEY`` and
|
|
||||||
``SECURITY_PASSWORD_SALT``.
|
|
||||||
|
|
||||||
### Token file
|
|
||||||
The token file is an ASCII file which lists the IDs of RFID tokens that can be
|
|
||||||
used to unlock the door. You can specify the path to the token file using the
|
|
||||||
``TOKEN_FILE`` variable in the configuration file.
|
|
||||||
Here's an example of a token file (lines starting with ``#`` represent inactive
|
|
||||||
tokens):
|
|
||||||
```
|
|
||||||
# token | name | organization | email | valid_thru
|
|
||||||
#042979fa186280||||
|
|
||||||
04487cfa176280|Frodo|Hobbits|frodo@shire.me|
|
|
||||||
043a85fa1a6280|Gandalf|Wizards|gandalf@middleearth.me|
|
|
||||||
#04206e2aef6880|Boromir|Humans|boromir@gondor.me|
|
|
||||||
```
|
|
||||||
|
|
||||||
### Admin file
|
|
||||||
``ADMIN_FILE`` -> file to create new super admins
|
|
||||||
|
|
||||||
### User database
|
|
||||||
|
|
||||||
### NFC files
|
|
||||||
``NFC_SOCKET = "/tmp/nfc.sock"`` -> unix socket to interact with the door
|
|
||||||
``NFC_LOG = "nfc.log"`` -> log file of door events
|
|
||||||
|
|
||||||
## Development
|
|
||||||
```shell
|
|
||||||
cd tests
|
|
||||||
export APPLICATION_SETTINGS=/home/simon/imaginaerraum/door-lock/webinterface/tests/debug_app_config.py
|
|
||||||
pytest --cov=../imaginaerraum_door_admin --cov-report=html --cov-report=term
|
|
||||||
```
|
|
||||||
|
|
28
bin/launch_webadmin
Executable file
28
bin/launch_webadmin
Executable file
|
@ -0,0 +1,28 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
from imaginaerraum_door_admin.webapp import create_application
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--key_file", default='/root/flask_keys', help="Path to file with Flask SECRET_KEY and SECURITY_PASSWORD_SALT")
|
||||||
|
parser.add_argument("--token_file", default="/etc/door_tokens", help="path to the file with door tokens and users")
|
||||||
|
parser.add_argument("--nfc_socket", default="/tmp/nfc.sock", help="socket for handling NFC reader commands")
|
||||||
|
parser.add_argument("--template_folder", default="templates", help="path to Flask templates folder")
|
||||||
|
parser.add_argument("--static_folder", default="static", help="path to Flask static folder")
|
||||||
|
parser.add_argument("--admin_file", default="/etc/admins.conf", help="Path to file for creating super admin users")
|
||||||
|
parser.add_argument("--log_file", default="/var/log/webinterface.log", help="Path to flask log file")
|
||||||
|
parser.add_argument("--nfc_log", default="/var/log/nfc.log", help="Path to nfc log file")
|
||||||
|
parser.add_argument("--ldap_url", default="ldaps://ldap.imaginaerraum.de",
|
||||||
|
help="URL for LDAP server for alternative user authorization")
|
||||||
|
parser.add_argument("--mqtt_host", default="10.10.21.2", help="IP address of MQTT broker")
|
||||||
|
parser.add_argument("--flask_port", default=80, help="Port for running the Flask server")
|
||||||
|
parser.add_argument("--mail_server", default="smtp.googlemail.com", help="email server for sending security messages")
|
||||||
|
parser.add_argument("--mail_port", default=465, help="port for security email server")
|
||||||
|
parser.add_argument("--mail_use_tls", default=False, help="use TLS for security emails")
|
||||||
|
parser.add_argument("--mail_use_ssl", default=True, help="use SSL for security emails")
|
||||||
|
parser.add_argument("--mail_username", default="admin@example.com", help="email account for sending security messages")
|
||||||
|
parser.add_argument("--mail_password", default="password", help="Password for email account")
|
||||||
|
config = parser.parse_args()
|
||||||
|
|
||||||
|
app = create_application(config)
|
||||||
|
app.run(host='0.0.0.0', port=config.flask_port)
|
|
@ -1,117 +0,0 @@
|
||||||
import logging
|
|
||||||
from flask import Flask
|
|
||||||
from flask_sqlalchemy import SQLAlchemy
|
|
||||||
from flask_security import Security, SQLAlchemyUserDatastore, hash_password
|
|
||||||
from email_validator import validate_email
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
from .door_handle import DoorHandle
|
|
||||||
|
|
||||||
security = Security()
|
|
||||||
db = SQLAlchemy()
|
|
||||||
|
|
||||||
|
|
||||||
# create admin users (only if they don't exist already)
|
|
||||||
def create_super_admins(app, user_datastore):
|
|
||||||
admin_file = Path(app.config.get('ADMIN_FILE'))
|
|
||||||
|
|
||||||
# setup user database when starting the app
|
|
||||||
new_admin_data = []
|
|
||||||
if not admin_file.exists():
|
|
||||||
app.logger.warning(
|
|
||||||
f"Admin user creation file not found at path "
|
|
||||||
f"{admin_file.absolute()}."
|
|
||||||
f"No super admins have been created in the datastore."
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# store data for new admins in memory s.t. the file can be deleted
|
|
||||||
# afterwards
|
|
||||||
admin_data = admin_file.read_text().split('\n')
|
|
||||||
for i, line in enumerate(admin_data):
|
|
||||||
if len(line.strip()) > 0 and not line.strip().startswith('#'):
|
|
||||||
try:
|
|
||||||
user, email, pw = line.split()
|
|
||||||
validate_email(email)
|
|
||||||
new_admin_data.append(
|
|
||||||
{'username': user, 'email': email,
|
|
||||||
'password': pw})
|
|
||||||
except Exception as e:
|
|
||||||
app.logger.error(
|
|
||||||
f"Error while parsing line {i} in admin config file. "
|
|
||||||
f"Config file should contain lines of <username> "
|
|
||||||
f"<email> <password>\\n'\n "
|
|
||||||
f"Exception: {e}\nAdmin account could not be created."
|
|
||||||
)
|
|
||||||
|
|
||||||
with app.app_context():
|
|
||||||
db.create_all()
|
|
||||||
super_admin_role = user_datastore.find_or_create_role(
|
|
||||||
'super_admin') # root admin = can create other admins
|
|
||||||
admin_role = user_datastore.find_or_create_role(
|
|
||||||
'admin') # 'normal' admin
|
|
||||||
local_role = user_datastore.find_or_create_role(
|
|
||||||
'local') # LDAP user or local user
|
|
||||||
|
|
||||||
for d in new_admin_data:
|
|
||||||
if user_datastore.find_user(email=d['email'],
|
|
||||||
username=d['username']) is None:
|
|
||||||
roles = [super_admin_role, admin_role]
|
|
||||||
if not d['password'] == 'LDAP':
|
|
||||||
roles.append(local_role)
|
|
||||||
|
|
||||||
# create new admin (only if admin does not already exist)
|
|
||||||
new_admin = user_datastore.create_user(
|
|
||||||
email=d['email'], username=d['username'],
|
|
||||||
password=hash_password(d['password']), roles=roles
|
|
||||||
)
|
|
||||||
app.logger.info(
|
|
||||||
f"New super admin user created with username "
|
|
||||||
f"'{new_admin.username}' and email '{new_admin.email}'"
|
|
||||||
f", roles = {[r.name for r in new_admin.roles]}"
|
|
||||||
)
|
|
||||||
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
|
|
||||||
def create_app():
|
|
||||||
app = Flask(__name__)
|
|
||||||
app.config.from_object(
|
|
||||||
'imaginaerraum_door_admin.default_app_config.DefaultConfig'
|
|
||||||
)
|
|
||||||
app.config.from_envvar('APPLICATION_SETTINGS')
|
|
||||||
|
|
||||||
logging.basicConfig(filename=app.config['LOG_FILE'], level=logging.INFO)
|
|
||||||
|
|
||||||
token_file = Path(app.config.get('TOKEN_FILE'))
|
|
||||||
if not token_file.exists():
|
|
||||||
app.logger.warning(
|
|
||||||
f"Token file not found at {token_file.absolute()}. "
|
|
||||||
"An empty token file will be created."
|
|
||||||
)
|
|
||||||
token_file.touch()
|
|
||||||
|
|
||||||
# create door objects which provides access to the token file and current
|
|
||||||
# door state via MQTT
|
|
||||||
app.door = DoorHandle(
|
|
||||||
token_file=token_file, mqtt_host=app.config['MQTT_HOST'],
|
|
||||||
nfc_socket=app.config['NFC_SOCKET'], logger=app.logger
|
|
||||||
)
|
|
||||||
|
|
||||||
# Mail Config
|
|
||||||
#mail = Mail(app)
|
|
||||||
|
|
||||||
# Create database connection object
|
|
||||||
db.init_app(app)
|
|
||||||
|
|
||||||
from . webapp import door_app
|
|
||||||
app.register_blueprint(door_app)
|
|
||||||
|
|
||||||
# Setup Flask-Security
|
|
||||||
from .auth import ExtendedLoginForm, User, Role
|
|
||||||
|
|
||||||
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
|
|
||||||
security.init_app(app, user_datastore, login_form=ExtendedLoginForm)
|
|
||||||
|
|
||||||
create_super_admins(app, user_datastore)
|
|
||||||
|
|
||||||
return app
|
|
|
@ -1,163 +0,0 @@
|
||||||
from wtforms.fields import StringField, BooleanField
|
|
||||||
from flask import current_app
|
|
||||||
from flask_security import hash_password
|
|
||||||
from flask_security.forms import LoginForm, Required, PasswordField
|
|
||||||
from flask_security.utils import lookup_identity
|
|
||||||
from flask_security.models import fsqla_v2 as fsqla
|
|
||||||
|
|
||||||
import ldap3
|
|
||||||
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError
|
|
||||||
|
|
||||||
from imaginaerraum_door_admin import db, security
|
|
||||||
|
|
||||||
# Define models
|
|
||||||
fsqla.FsModels.set_db_info(db)
|
|
||||||
|
|
||||||
|
|
||||||
class Role(db.Model, fsqla.FsRoleMixin):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class User(db.Model, fsqla.FsUserMixin):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ExtendedLoginForm(LoginForm):
|
|
||||||
email = StringField('Benutzername oder E-Mail', [Required()])
|
|
||||||
password = PasswordField('Passwort', [Required()])
|
|
||||||
remember = BooleanField('Login merken?')
|
|
||||||
|
|
||||||
def validate(self):
|
|
||||||
# search for user in the current database
|
|
||||||
user = lookup_identity(self.email.data)
|
|
||||||
if user is not None:
|
|
||||||
# if a user is found we check if it is associated with LDAP or with
|
|
||||||
# the local database
|
|
||||||
if user.has_role('local'):
|
|
||||||
# try authorizing locally using Flask security user datastore
|
|
||||||
authorized = super(ExtendedLoginForm, self).validate()
|
|
||||||
|
|
||||||
if authorized:
|
|
||||||
current_app.logger.info(
|
|
||||||
f"User with credentials '{self.email.data}' authorized "
|
|
||||||
f"through local database"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# run LDAP authorization
|
|
||||||
# if the authorization succeeds we also get the new_user_data
|
|
||||||
# dict which contains information about
|
|
||||||
# the user's permissions etc.
|
|
||||||
authorized, new_user_data = self.validate_ldap()
|
|
||||||
|
|
||||||
if authorized:
|
|
||||||
current_app.logger.info(
|
|
||||||
f"User with credentials '{self.email.data}' authorized "
|
|
||||||
f"through LDAP")
|
|
||||||
# update permissions and password/email to stay up to date
|
|
||||||
# for login with no network connection
|
|
||||||
user.email = new_user_data['email']
|
|
||||||
user.password = new_user_data['password']
|
|
||||||
for role in new_user_data['roles']:
|
|
||||||
security.datastore.add_role_to_user(user, role)
|
|
||||||
security.datastore.commit()
|
|
||||||
self.user = user
|
|
||||||
else:
|
|
||||||
self.password.errors = ['Invalid password']
|
|
||||||
else:
|
|
||||||
# this means there is no user with that email in the database
|
|
||||||
# we assume that the username was entered instead of an email and
|
|
||||||
# use that for authentication with LDAP
|
|
||||||
username = self.email.data
|
|
||||||
# try LDAP authorization and create a new user if it succeeds
|
|
||||||
authorized, new_user_data = self.validate_ldap()
|
|
||||||
|
|
||||||
if authorized:
|
|
||||||
# if there was no user in the database before we create a new
|
|
||||||
# user
|
|
||||||
self.user = security.datastore.create_user(
|
|
||||||
username=new_user_data['username'],
|
|
||||||
email=new_user_data['email'],
|
|
||||||
password=new_user_data['password'],
|
|
||||||
roles=new_user_data['roles']
|
|
||||||
)
|
|
||||||
security.datastore.commit()
|
|
||||||
current_app.logger.info(
|
|
||||||
f"New admin user '{new_user_data['username']} "
|
|
||||||
f"<{new_user_data['email']}>' created after successful "
|
|
||||||
f"LDAP authorization"
|
|
||||||
)
|
|
||||||
|
|
||||||
# if any of the authorization methods is successful we authorize
|
|
||||||
# the user
|
|
||||||
return authorized
|
|
||||||
|
|
||||||
def validate_ldap(self):
|
|
||||||
"""Validate the user and password through an LDAP server.
|
|
||||||
|
|
||||||
If the connection completes successfully the given user and password
|
|
||||||
is authorized. Then the permissions and additional information of the
|
|
||||||
user are obtained through an LDAP search.
|
|
||||||
The data is stored in a dict which will be used later to create/update
|
|
||||||
the entry for the user in the local database.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
bool : result of the authorization process (True = success,
|
|
||||||
False = failure)
|
|
||||||
dict : dictionary with information about an authorized user
|
|
||||||
(contains username, email, hashed password, roles)
|
|
||||||
"""
|
|
||||||
ldap_server = ldap3.Server(current_app.config['LDAP_URL'])
|
|
||||||
ldap_user_group = current_app.config['LDAP_USER_GROUP']
|
|
||||||
ldap_domain = current_app.config['LDAP_DOMAIN']
|
|
||||||
ldap_domain_ext = current_app.config['LDAP_DOMAIN_EXT']
|
|
||||||
|
|
||||||
username = self.email.data
|
|
||||||
password = self.password.data
|
|
||||||
|
|
||||||
try:
|
|
||||||
user = f"uid={username},ou={ldap_user_group},dc={ldap_domain}," \
|
|
||||||
f"dc={ldap_domain_ext}"
|
|
||||||
con = ldap3.Connection(
|
|
||||||
ldap_server,
|
|
||||||
user=user,
|
|
||||||
password=password,
|
|
||||||
auto_bind=True
|
|
||||||
)
|
|
||||||
except ldap3.core.exceptions.LDAPBindError as e:
|
|
||||||
# server reachable but user unauthorized -> fail
|
|
||||||
return False, None
|
|
||||||
except LDAPSocketOpenError as e:
|
|
||||||
# server not reachable -> fail (but will try authorization from
|
|
||||||
# local database later)
|
|
||||||
return False, None
|
|
||||||
except Exception as e:
|
|
||||||
# for other Exceptions we just fail
|
|
||||||
return False, None
|
|
||||||
|
|
||||||
# get user data and permissions from LDAP server
|
|
||||||
new_user_data = {}
|
|
||||||
new_user_data['username'] = username
|
|
||||||
new_user_data['password'] = hash_password(password)
|
|
||||||
new_user_data['roles'] = []
|
|
||||||
search_base = f"ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
|
|
||||||
search_filter = f"(&(uid={username})(memberof=cn=Keyholders," \
|
|
||||||
f"ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))"
|
|
||||||
lock_permission = con.search(
|
|
||||||
search_base, search_filter, attributes=ldap3.ALL_ATTRIBUTES
|
|
||||||
)
|
|
||||||
|
|
||||||
if lock_permission:
|
|
||||||
new_user_data['email'] = con.entries[0].mail.value
|
|
||||||
else:
|
|
||||||
return False, None
|
|
||||||
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,' \
|
|
||||||
f'dc={ldap_domain},dc={ldap_domain_ext}))'
|
|
||||||
token_granting_permission = con.search(search_base, search_filter)
|
|
||||||
if token_granting_permission:
|
|
||||||
new_user_data['roles'].append('admin')
|
|
||||||
|
|
||||||
return True, new_user_data
|
|
|
@ -1,11 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
from imaginaerraum_door_admin import create_app
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
app = create_app()
|
|
||||||
app.run(host='0.0.0.0', port=app.config.get('PORT', 80))
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
|
@ -1,61 +0,0 @@
|
||||||
import bleach
|
|
||||||
from flask_security import uia_email_mapper
|
|
||||||
print("loading default config")
|
|
||||||
|
|
||||||
|
|
||||||
def uia_username_mapper(identity):
|
|
||||||
# we allow pretty much anything - but we bleach it.
|
|
||||||
return bleach.clean(identity, strip=True)
|
|
||||||
|
|
||||||
|
|
||||||
class DefaultConfig(object):
|
|
||||||
DEBUG = False
|
|
||||||
PORT = 80
|
|
||||||
|
|
||||||
SECRET_KEY = 'supersecret'
|
|
||||||
SECURITY_PASSWORD_SALT = 'salty'
|
|
||||||
|
|
||||||
SECURITY_REGISTERABLE = False
|
|
||||||
SECURITY_CHANGEABLE = True
|
|
||||||
SECURITY_RECOVERABLE = True
|
|
||||||
SECURITY_SEND_PASSWORD_CHANGE_EMAIL = False
|
|
||||||
|
|
||||||
SECURITY_POST_LOGIN_VIEW = '/'
|
|
||||||
SECURITY_EMAIL_SUBJECT_PASSWORD_RESET = 'Passwort zurücksetzen'
|
|
||||||
SECURITY_EMAIL_SUBJECT_PASSWORD_NOTICE = 'Passwort wurde zurückgesetzt'
|
|
||||||
SECURITY_PASSWORD_LENGTH_MIN = 10
|
|
||||||
|
|
||||||
SECURITY_USER_IDENTITY_ATTRIBUTES = [
|
|
||||||
{"email": {"mapper": uia_email_mapper, "case_insensitive": True}},
|
|
||||||
{"username": {"mapper": uia_username_mapper}}
|
|
||||||
]
|
|
||||||
|
|
||||||
# mail configuration
|
|
||||||
MAIL_SERVER = ''
|
|
||||||
MAIL_PORT = 465
|
|
||||||
MAIL_USE_SSL = True
|
|
||||||
MAIL_USERNAME = ''
|
|
||||||
MAIL_PASSWORD = ''
|
|
||||||
MAIL_DEFAULT_SENDER = ''
|
|
||||||
|
|
||||||
SQLALCHEMY_DATABASE_URI = 'sqlite:///admin.db'
|
|
||||||
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
|
|
||||||
# underlying engine. This option makes sure that DB connections from the
|
|
||||||
# pool are still valid. Important for entire application since
|
|
||||||
# many DBaaS options automatically close idle connections.
|
|
||||||
SQLALCHEMY_ENGINE_OPTIONS = {
|
|
||||||
"pool_pre_ping": True,
|
|
||||||
}
|
|
||||||
|
|
||||||
TOKEN_FILE = "door_tokens"
|
|
||||||
ADMIN_FILE = 'admins'
|
|
||||||
|
|
||||||
LDAP_URL = "ldaps://ldap.imaginaerraum.de"
|
|
||||||
LDAP_USER_GROUP = 'Users'
|
|
||||||
LDAP_DOMAIN = 'imaginaerraum'
|
|
||||||
LDAP_DOMAIN_EXT = 'de'
|
|
||||||
|
|
||||||
NFC_SOCKET = "/tmp/nfc.sock"
|
|
||||||
LOG_FILE = "webinterface.log"
|
|
||||||
NFC_LOG = "nfc.log"
|
|
||||||
MQTT_HOST = '10.10.21.2'
|
|
|
@ -4,22 +4,23 @@ from pathlib import Path
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
class DoorHandle:
|
class DoorHandle:
|
||||||
def __init__(self, token_file, mqtt_host, mqtt_port=1883,
|
def __init__(self, token_file, mqtt_host, mqtt_port=1883, nfc_socket='/tmp/nfc.sock', logger=None):
|
||||||
nfc_socket='/tmp/nfc.sock', logger=None):
|
|
||||||
self.state = None
|
self.state = None
|
||||||
self.encoder_position = None
|
self.encoder_position = None
|
||||||
|
|
||||||
if not Path(token_file).exists():
|
if not Path(token_file).exists():
|
||||||
raise FileNotFoundError(
|
raise FileNotFoundError(f"File with door tokens could not be found at {Path(token_file).absolute()}")
|
||||||
"File with door tokens could not be found at "
|
|
||||||
f"{Path(token_file).absolute()}"
|
|
||||||
)
|
|
||||||
|
|
||||||
self.token_file = token_file
|
self.token_file = token_file
|
||||||
self.last_invalid = {}
|
self.last_invalid = {}
|
||||||
|
|
||||||
|
self.mqtt_client = mqtt.Client()
|
||||||
|
self.mqtt_client.on_connect = self.on_connect
|
||||||
|
self.mqtt_client.on_message = self.on_message
|
||||||
|
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
|
||||||
|
self.mqtt_client.loop_start()
|
||||||
|
|
||||||
if logger:
|
if logger:
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
else:
|
else:
|
||||||
|
@ -30,18 +31,10 @@ class DoorHandle:
|
||||||
self.nfc_sock.connect(nfc_socket)
|
self.nfc_sock.connect(nfc_socket)
|
||||||
self.logger.info(f"Connected to NFC socket at {nfc_socket}.")
|
self.logger.info(f"Connected to NFC socket at {nfc_socket}.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Could not connect to NFC socket at {nfc_socket}. "
|
print(f"Could not connect to NFC socket at {nfc_socket}. Exception: {e}")
|
||||||
f"Exception: {e}")
|
|
||||||
self.nfc_sock = None
|
self.nfc_sock = None
|
||||||
#raise
|
#raise
|
||||||
|
|
||||||
self.mqtt_client = mqtt.Client()
|
|
||||||
self.mqtt_client.on_connect = self.on_connect
|
|
||||||
self.mqtt_client.on_message = self.on_message
|
|
||||||
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
|
|
||||||
self.mqtt_client.loop_start()
|
|
||||||
self.logger.info(f"Connected to MQTT broker at {mqtt_host}:{mqtt_port}")
|
|
||||||
|
|
||||||
self.data_fields = ['name', 'organization', 'email', 'valid_thru']
|
self.data_fields = ['name', 'organization', 'email', 'valid_thru']
|
||||||
|
|
||||||
# The callback for when the client receives a CONNACK response from the server.
|
# The callback for when the client receives a CONNACK response from the server.
|
||||||
|
@ -50,7 +43,7 @@ class DoorHandle:
|
||||||
|
|
||||||
# Subscribing in on_connect() means that if we lose the connection and
|
# Subscribing in on_connect() means that if we lose the connection and
|
||||||
# reconnect then subscriptions will be renewed.
|
# reconnect then subscriptions will be renewed.
|
||||||
client.subscribe("door/#")
|
client.subscribe("#")
|
||||||
|
|
||||||
# The callback for when a PUBLISH message is received from the server.
|
# The callback for when a PUBLISH message is received from the server.
|
||||||
def on_message(self, client, userdata, msg):
|
def on_message(self, client, userdata, msg):
|
||||||
|
@ -77,13 +70,11 @@ class DoorHandle:
|
||||||
email = data[3].strip() if len(data) > 3 else None
|
email = data[3].strip() if len(data) > 3 else None
|
||||||
valid_thru = data[4].strip() if len(data) > 4 else None
|
valid_thru = data[4].strip() if len(data) > 4 else None
|
||||||
|
|
||||||
tokens[token] = {
|
tokens[token] = {'name': name,
|
||||||
'name': name,
|
'organization': organization,
|
||||||
'organization': organization,
|
'email': email,
|
||||||
'email': email,
|
'valid_thru': valid_thru,
|
||||||
'valid_thru': valid_thru,
|
'inactive': inactive}
|
||||||
'inactive': inactive
|
|
||||||
}
|
|
||||||
return tokens
|
return tokens
|
||||||
|
|
||||||
def store_tokens(self, tokens):
|
def store_tokens(self, tokens):
|
||||||
|
@ -107,15 +98,13 @@ class DoorHandle:
|
||||||
if self.nfc_sock is not None:
|
if self.nfc_sock is not None:
|
||||||
self.nfc_sock.send(b'open ' + user.encode() + b'\n')
|
self.nfc_sock.send(b'open ' + user.encode() + b'\n')
|
||||||
else:
|
else:
|
||||||
self.logger.error("No connection to NFC socket. Cannot open door!")
|
raise Exception("No connection to NFC socket. Cannot close door!")
|
||||||
raise RuntimeError("No connection to NFC socket. Cannot open door!")
|
|
||||||
|
|
||||||
def close_door(self, user=''):
|
def close_door(self, user=''):
|
||||||
if self.nfc_sock is not None:
|
if self.nfc_sock is not None:
|
||||||
self.nfc_sock.send(b'close ' + user.encode() + b'\n')
|
self.nfc_sock.send(b'close ' + user.encode() + b'\n')
|
||||||
else:
|
else:
|
||||||
self.logger.error("No connection to NFC socket. Cannot close door!")
|
raise Exception("No connection to NFC socket. Cannot close door!")
|
||||||
raise RuntimeError("No connection to NFC socket. Cannot close door!")
|
|
||||||
|
|
||||||
def get_most_recent_token(self):
|
def get_most_recent_token(self):
|
||||||
# read last invalid token from logfile
|
# read last invalid token from logfile
|
||||||
|
|
|
@ -1,46 +0,0 @@
|
||||||
from flask import flash
|
|
||||||
from flask_wtf import FlaskForm
|
|
||||||
from wtforms.fields import DateField, EmailField
|
|
||||||
from wtforms.fields import StringField, BooleanField
|
|
||||||
from wtforms.validators import DataRequired, ValidationError, EqualTo
|
|
||||||
from datetime import date
|
|
||||||
|
|
||||||
|
|
||||||
def validate_valid_thru_date(form, field):
|
|
||||||
if form.limit_validity.data:
|
|
||||||
# only check date format if limited validity of token is set
|
|
||||||
try:
|
|
||||||
if not field.data >= date.today():
|
|
||||||
raise ValueError
|
|
||||||
except ValueError as e:
|
|
||||||
flash("Ungültiges Datum")
|
|
||||||
raise ValidationError
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
class TokenForm(FlaskForm):
|
|
||||||
name = StringField("Name", validators=[DataRequired()])
|
|
||||||
email = EmailField("E-Mail", validators=[DataRequired()])
|
|
||||||
organization = StringField("Organization", validators=[DataRequired()])
|
|
||||||
limit_validity = BooleanField("Gültigkeit begrenzen?")
|
|
||||||
valid_thru = DateField("Gültig bis", validators=[validate_valid_thru_date])
|
|
||||||
active = BooleanField("Aktiv?")
|
|
||||||
dsgvo = BooleanField(
|
|
||||||
"Einwilligung Nutzungsbedingungen erfragt?", validators=[DataRequired()]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class ConfirmDeleteForm(FlaskForm):
|
|
||||||
name = StringField(
|
|
||||||
"Name",
|
|
||||||
validators=[
|
|
||||||
DataRequired(),
|
|
||||||
EqualTo("name_confirm", "Name stimmt nicht überein"),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
name_confirm = StringField("Name confirm")
|
|
||||||
|
|
||||||
|
|
||||||
class AdminCreationForm(FlaskForm):
|
|
||||||
name = StringField("Name", validators=[DataRequired()])
|
|
||||||
email = EmailField("E-Mail", validators=[DataRequired()])
|
|
|
@ -27,15 +27,15 @@
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
<td>
|
<td>
|
||||||
{% if not data['super_admin'] %}
|
{% if not data['super_admin'] %}
|
||||||
<a href="{{ url_for('door_app.admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
|
<a href="{{ url_for('admin_toggle_active', username=data['username']) }}"><img src="static/stop.png" title="Aktivieren/Deaktivieren" alt="Toggle active"></a>
|
||||||
<a href="{{ url_for('door_app.delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
<a href="{{ url_for('delete_admins', username=data['username']) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
{% if data['admin'] %}
|
{% if data['admin'] %}
|
||||||
{% if not data['super_admin'] %}
|
{% if not data['super_admin'] %}
|
||||||
<a href="{{ url_for('door_app.demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a>
|
<a href="{{ url_for('demote_admin', username=data['username']) }}"><img src="static/demote.png" title="Admin-Rechte widerrufen" alt="Demote"></a>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
{% else %}
|
{% else %}
|
||||||
<a href="{{ url_for('door_app.promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a>
|
<a href="{{ url_for('promote_admin', username=data['username']) }}"><img src="static/promote.png" title="Zu Admin machen" alt="Promote"></a>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
|
@ -68,14 +68,14 @@
|
||||||
</div>
|
</div>
|
||||||
<div class="p-2 bg-light border">
|
<div class="p-2 bg-light border">
|
||||||
<h3>Nutzerdaten sichern:</h3>
|
<h3>Nutzerdaten sichern:</h3>
|
||||||
<form action="{{ url_for('door_app.backup_user_datastore') }}" method="get">
|
<form action="{{ url_for('backup_user_datastore') }}" method="get">
|
||||||
<input type="submit" value="Download">
|
<input type="submit" value="Download">
|
||||||
</form>
|
</form>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<div class="p-2 bg-light border">
|
<div class="p-2 bg-light border">
|
||||||
<h3>Nutzerdaten wiederherstellen:</h3>
|
<h3>Nutzerdaten wiederherstellen:</h3>
|
||||||
<form action="{{ url_for('door_app.restore_user_datastore') }}" method=post enctype=multipart/form-data>
|
<form action="{{ url_for('restore_user_datastore') }}" method=post enctype=multipart/form-data>
|
||||||
<input type=file name=file>
|
<input type=file name=file>
|
||||||
<input type=submit value="Abschicken">
|
<input type=submit value="Abschicken">
|
||||||
</form>
|
</form>
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
<body>
|
<body>
|
||||||
<nav class="navbar navbar-expand-lg navbar-light bg-light">
|
<nav class="navbar navbar-expand-lg navbar-light bg-light">
|
||||||
<div class="container-fluid">
|
<div class="container-fluid">
|
||||||
<a class="navbar-brand" href="{{ url_for('door_app.door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
|
<a class="navbar-brand" href="{{ url_for('door_lock') }}"><img src="{{ url_for('static', filename='iR.svg') }}" alt="iR Logo"></a>
|
||||||
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent"
|
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent"
|
||||||
aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
|
aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
|
||||||
<span class="navbar-toggler-icon"></span>
|
<span class="navbar-toggler-icon"></span>
|
||||||
|
@ -29,10 +29,10 @@
|
||||||
Tokens
|
Tokens
|
||||||
</a>
|
</a>
|
||||||
<div class="dropdown-menu" aria-labelledby="navbarDropdown">
|
<div class="dropdown-menu" aria-labelledby="navbarDropdown">
|
||||||
<a class="dropdown-item" href="{{ url_for('door_app.register') }}">Token Registrierung</a>
|
<a class="dropdown-item" href="{{ url_for('register') }}">Token Registrierung</a>
|
||||||
<a class="dropdown-item" href="{{ url_for('door_app.list_tokens') }}">Token Übersicht</a>
|
<a class="dropdown-item" href="{{ url_for('list_tokens') }}">Token Übersicht</a>
|
||||||
{% if current_user.has_role('super_admin') %}
|
{% if current_user.has_role('super_admin') %}
|
||||||
<a class="dropdown-item" href="{{ url_for('door_app.token_log') }}">Token Log</a>
|
<a class="dropdown-item" href="{{ url_for('token_log') }}">Token Log</a>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
</li>
|
</li>
|
||||||
|
@ -40,7 +40,7 @@
|
||||||
|
|
||||||
{% if current_user.has_role('super_admin') %}
|
{% if current_user.has_role('super_admin') %}
|
||||||
<li class="nav-item">
|
<li class="nav-item">
|
||||||
<a class="nav-link" href="{{ url_for('door_app.manage_admins') }}">Benutzer verwalten</a>
|
<a class="nav-link" href="{{ url_for('manage_admins') }}">Benutzer verwalten</a>
|
||||||
</li>
|
</li>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
|
||||||
|
|
|
@ -24,11 +24,11 @@
|
||||||
{% if current_user.is_authenticated %}
|
{% if current_user.is_authenticated %}
|
||||||
<div class="row">
|
<div class="row">
|
||||||
<div class="col-3">
|
<div class="col-3">
|
||||||
<a href="{{ url_for('door_app.open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
|
<a href="{{ url_for('open_door') }}" class="btn btn-success" role="button">Tür öffnen</a>
|
||||||
</div>
|
</div>
|
||||||
<div class="col-1"></div>
|
<div class="col-1"></div>
|
||||||
<div class="col-3">
|
<div class="col-3">
|
||||||
<a href="{{ url_for('door_app.close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
|
<a href="{{ url_for('close_door') }}" class="btn btn-danger" role="button">Tür schließen</a>
|
||||||
</div>
|
</div>
|
||||||
<div class="col-5"></div>
|
<div class="col-5"></div>
|
||||||
</div>
|
</div>
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
und diesen zustimmt.
|
und diesen zustimmt.
|
||||||
</li>
|
</li>
|
||||||
<li>Achtung: Der Token funktioniert nicht sofort, sondern muss erst explizit aktiviert werden!
|
<li>Achtung: Der Token funktioniert nicht sofort, sondern muss erst explizit aktiviert werden!
|
||||||
Dazu in der <a href="{{ url_for('door_app.list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol
|
Dazu in der <a href="{{ url_for('list_tokens') }}">Token-Übersicht</a> auf das Bearbeiten-Symbol
|
||||||
(<img src="static/edit.png" title="Editieren" alt="Edit">) klicken und den Haken bei "Aktiv?" setzen.
|
(<img src="static/edit.png" title="Editieren" alt="Edit">) klicken und den Haken bei "Aktiv?" setzen.
|
||||||
</li>
|
</li>
|
||||||
<li>Jetzt kann der Token verwendet werden.</li>
|
<li>Jetzt kann der Token verwendet werden.</li>
|
||||||
|
|
|
@ -29,9 +29,9 @@
|
||||||
<td>{{ data[field] if data[field] }}</td>
|
<td>{{ data[field] if data[field] }}</td>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
<td>
|
<td>
|
||||||
<a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
|
<a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
|
||||||
<a href="{{ url_for('door_app.deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a>
|
<a href="{{ url_for('deactivate_token', token=t) }}"><img src="static/stop.png" title="Deaktivieren" alt="Deactivate"></a>
|
||||||
<a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
<a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
|
@ -47,14 +47,14 @@
|
||||||
<td>{{ data[field] if data[field] }}</td>
|
<td>{{ data[field] if data[field] }}</td>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
<td>
|
<td>
|
||||||
<a href="{{ url_for('door_app.edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
|
<a href="{{ url_for('edit_token', token=t) }}"><img src="static/edit.png" title="Editieren" alt="Edit"></a>
|
||||||
<a href="{{ url_for('door_app.delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
<a href="{{ url_for('delete_token', token=t) }}"><img src="static/delete.png" title="Löschen" alt="Delete"></a>
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
</tbody>
|
</tbody>
|
||||||
</table>
|
</table>
|
||||||
<form action="{{ url_for('door_app.backup_tokens') }}" method="get">
|
<form action="{{ url_for('backup_tokens') }}" method="get">
|
||||||
<input type="submit" value="Token Daten sichern">
|
<input type="submit" value="Token Daten sichern">
|
||||||
</form>
|
</form>
|
||||||
{% endblock %}
|
{% endblock %}
|
File diff suppressed because it is too large
Load Diff
5
requirements.txt
Normal file
5
requirements.txt
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
flask~=1.1.2
|
||||||
|
Flask-Security-Too~=4.0.0
|
||||||
|
WTForms~=2.3.3
|
||||||
|
paho-mqtt~=1.5.1
|
||||||
|
bleach~=3.3.0
|
35
setup.cfg
35
setup.cfg
|
@ -15,38 +15,3 @@ classifiers =
|
||||||
Programming Language :: Python :: 3
|
Programming Language :: Python :: 3
|
||||||
License :: OSI Approved :: GNU General Public License v3 (GPLv3)
|
License :: OSI Approved :: GNU General Public License v3 (GPLv3)
|
||||||
Operating System :: OS Independent
|
Operating System :: OS Independent
|
||||||
|
|
||||||
[options]
|
|
||||||
python_requires = >=3.8
|
|
||||||
install_requires =
|
|
||||||
bleach
|
|
||||||
Flask
|
|
||||||
Flask-Mail
|
|
||||||
Flask-Security-Too>=5.0.1
|
|
||||||
Flask-SQLAlchemy
|
|
||||||
Flask-WTF
|
|
||||||
email_validator
|
|
||||||
paho-mqtt
|
|
||||||
ldap3
|
|
||||||
wtforms
|
|
||||||
|
|
||||||
include_package_data = True
|
|
||||||
packages = find:
|
|
||||||
setup_requires =
|
|
||||||
wheel
|
|
||||||
tests_require = pytest>=3
|
|
||||||
zip_safe = False
|
|
||||||
|
|
||||||
[options.entry_points]
|
|
||||||
console_scripts =
|
|
||||||
launch_webinterface = imaginaerraum_door_admin.bin.launch_webadmin:main
|
|
||||||
|
|
||||||
[options.extras_require]
|
|
||||||
dev =
|
|
||||||
pytest
|
|
||||||
pytest-cov
|
|
||||||
pytest-flask
|
|
||||||
pytest-mock
|
|
||||||
flake8
|
|
||||||
selenium
|
|
||||||
beautifulsoup4
|
|
16
setup.py
16
setup.py
|
@ -1,3 +1,17 @@
|
||||||
from setuptools import setup
|
from setuptools import setup
|
||||||
|
|
||||||
setup()
|
setup(install_requires=[
|
||||||
|
'bleach',
|
||||||
|
'Flask',
|
||||||
|
'Flask-Mail',
|
||||||
|
'Flask-Security-Too',
|
||||||
|
'Flask-SQLAlchemy',
|
||||||
|
'Flask-WTF',
|
||||||
|
'email_validator',
|
||||||
|
'paho-mqtt',
|
||||||
|
'ldap3',
|
||||||
|
],
|
||||||
|
include_package_data=True,
|
||||||
|
scripts=['bin/launch_webadmin'],
|
||||||
|
packages=['imaginaerraum_door_admin'],
|
||||||
|
zip_safe=False)
|
||||||
|
|
|
@ -1,24 +0,0 @@
|
||||||
import pytest
|
|
||||||
import os
|
|
||||||
|
|
||||||
from selenium.webdriver import Chrome
|
|
||||||
from imaginaerraum_door_admin import create_app
|
|
||||||
|
|
||||||
os.environ['APPLICATION_SETTINGS'] = os.getcwd() + '/debug_app_config.py'
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
|
||||||
def app():
|
|
||||||
"""Fixture to launch the webapp"""
|
|
||||||
app = create_app()
|
|
||||||
|
|
||||||
return app
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def browser():
|
|
||||||
"""Fixture for a selenium browser to access the webapp."""
|
|
||||||
driver = Chrome()
|
|
||||||
driver.implicitly_wait(10)
|
|
||||||
yield driver
|
|
||||||
driver.quit()
|
|
|
@ -1,56 +0,0 @@
|
||||||
import tempfile
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
TESTING = True
|
|
||||||
DEBUG = True
|
|
||||||
LOG_FILE = 'webapp.log'
|
|
||||||
|
|
||||||
token_file = tempfile.NamedTemporaryFile(delete=False).name
|
|
||||||
Path(token_file).write_text(
|
|
||||||
"""# token | name | organization | email | valid_thru
|
|
||||||
#042979fa186280||||
|
|
||||||
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
|
|
||||||
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
|
|
||||||
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
TOKEN_FILE = str(token_file)
|
|
||||||
|
|
||||||
SECRET_KEY = 'supersecret'
|
|
||||||
SECURITY_PASSWORD_SALT = 'salty'
|
|
||||||
|
|
||||||
admin_file = tempfile.NamedTemporaryFile(delete=False).name
|
|
||||||
Path(admin_file).write_text(
|
|
||||||
"""# create new super-admin by putting the following data in this file:
|
|
||||||
# username email ldap/password
|
|
||||||
# the third column can either be LDAP, in which case the user will be able to authenticate through LDAP
|
|
||||||
# or if it's not LDAP then it should be the password for the new user which will be stored in the local database
|
|
||||||
# examples:
|
|
||||||
gandalf gandalf@shire.me shadowfax
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
ADMIN_FILE = str(admin_file)
|
|
||||||
|
|
||||||
db_file = tempfile.NamedTemporaryFile(delete=False).name
|
|
||||||
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + str(db_file)
|
|
||||||
|
|
||||||
token_log_file = tempfile.NamedTemporaryFile(delete=False).name
|
|
||||||
Path(token_log_file).write_text(
|
|
||||||
"""1970-01-01 00:00:12,947 - nfc_log - INFO - Loading tokens
|
|
||||||
1970-01-01 00:00:12,961 - nfc_log - DEBUG - Opening control socket
|
|
||||||
1970-01-01 00:00:24,259 - nfc_log - INFO - Got connection
|
|
||||||
1970-01-01 00:00:28,052 - nfc_log - INFO - Connected to mqtt host with result 0
|
|
||||||
2021-04-17 13:05:09,344 - nfc_log - DEBUG - Control socket opening door
|
|
||||||
2021-04-17 13:05:09,346 - nfc_log - INFO - Opening the door
|
|
||||||
2021-04-17 13:05:25,684 - nfc_log - DEBUG - Control socket closing door
|
|
||||||
2021-04-17 13:05:25,685 - nfc_log - INFO - Closing the door
|
|
||||||
2021-04-17 13:08:57,121 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
|
|
||||||
2021-04-17 13:08:57,123 - nfc_log - INFO - Closing the door
|
|
||||||
2021-04-17 13:09:06,207 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
|
|
||||||
2021-04-17 13:09:06,209 - nfc_log - INFO - Opening the door
|
|
||||||
2021-04-17 13:09:09,017 - nfc_log - INFO - Valid token 04538cfa186280 of Gandalf
|
|
||||||
2021-04-17 13:09:09,019 - nfc_log - ERROR - Opening the door
|
|
||||||
2021-04-17 13:09:43,272 - nfc_log - DEBUG - Reloading tokens
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
NFC_LOG = str(token_log_file)
|
|
|
@ -1,154 +0,0 @@
|
||||||
import datetime
|
|
||||||
import time
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
from pathlib import Path
|
|
||||||
from imaginaerraum_door_admin.door_handle import DoorHandle
|
|
||||||
import paho.mqtt.client as mqtt
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def nfc_socket(tmp_path):
|
|
||||||
# mock up a Unix socket server for testing
|
|
||||||
import threading
|
|
||||||
import socketserver
|
|
||||||
|
|
||||||
class ThreadedRequestHandler(socketserver.BaseRequestHandler):
|
|
||||||
def handle(self):
|
|
||||||
data = str(self.request.recv(1024), 'ascii')
|
|
||||||
cur_thread = threading.current_thread()
|
|
||||||
response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
|
|
||||||
try:
|
|
||||||
self.request.sendall(response)
|
|
||||||
except BrokenPipeError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
class TreadedUnixServer(socketserver.ThreadingMixIn,
|
|
||||||
socketserver.UnixStreamServer):
|
|
||||||
pass
|
|
||||||
|
|
||||||
nfc_socket_file = tmp_path / 'nfc.sock'
|
|
||||||
server = TreadedUnixServer(str(nfc_socket_file), ThreadedRequestHandler)
|
|
||||||
with server:
|
|
||||||
nfc_socket = server.server_address
|
|
||||||
|
|
||||||
# Start a thread with the server -- that thread will then start one
|
|
||||||
# more thread for each request
|
|
||||||
server_thread = threading.Thread(target=server.serve_forever)
|
|
||||||
# Exit the server thread when the main thread terminates
|
|
||||||
server_thread.daemon = True
|
|
||||||
server_thread.start()
|
|
||||||
print("Server loop running in thread:", server_thread.name)
|
|
||||||
|
|
||||||
yield nfc_socket
|
|
||||||
|
|
||||||
server.shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_door_handle(nfc_socket, tmp_path):
|
|
||||||
# test with invalid token file
|
|
||||||
with pytest.raises(FileNotFoundError):
|
|
||||||
door_handle = DoorHandle(token_file='no_such_file',
|
|
||||||
mqtt_host='test.mosquitto.org')
|
|
||||||
|
|
||||||
token_file = tmp_path / 'door_tokens'
|
|
||||||
token_file.write_text('')
|
|
||||||
door_handle = DoorHandle(
|
|
||||||
token_file=token_file, mqtt_host='test.mosquitto.org',
|
|
||||||
nfc_socket=nfc_socket
|
|
||||||
)
|
|
||||||
door_handle.nfc_sock.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def door_handle(nfc_socket, tmp_path):
|
|
||||||
token_file = tmp_path / 'door_tokens'
|
|
||||||
token_file.write_text(
|
|
||||||
"""# token | name | organization | email | valid_thru
|
|
||||||
04387cfa186280|Gandalf|Wizards|gandalf@shire.me|
|
|
||||||
043a81fa186280|Bilbo|Hobbits|bilbo@shire.me|
|
|
||||||
#04538cfa186280|Gimli|Dwarves|gimli@shire.me|
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
door_handle = DoorHandle(
|
|
||||||
token_file=token_file, mqtt_host='test.mosquitto.org',
|
|
||||||
nfc_socket=nfc_socket
|
|
||||||
)
|
|
||||||
yield door_handle
|
|
||||||
door_handle.nfc_sock.close()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def door_handle_no_nfc(tmp_path):
|
|
||||||
token_file = tmp_path / 'door_tokens'
|
|
||||||
token_file.write_text('')
|
|
||||||
door_handle = DoorHandle(
|
|
||||||
token_file=token_file, mqtt_host='test.mosquitto.org',
|
|
||||||
)
|
|
||||||
yield door_handle
|
|
||||||
|
|
||||||
|
|
||||||
def test_store_tokens(door_handle):
|
|
||||||
new_tokens = {
|
|
||||||
'042979fa186280': {
|
|
||||||
'name': 'Pippin',
|
|
||||||
'organization': 'Hobbits',
|
|
||||||
'email': 'pippin@shire.me',
|
|
||||||
'valid_thru': None,
|
|
||||||
'inactive': False
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
door_handle.store_tokens(new_tokens)
|
|
||||||
|
|
||||||
token_file = Path(door_handle.token_file)
|
|
||||||
assert token_file.exists()
|
|
||||||
assert '042979fa186280' in token_file.read_text()
|
|
||||||
|
|
||||||
|
|
||||||
def test_mqtt_messages(door_handle):
|
|
||||||
# test sending messages to the door_handle via MQTT and see if the state
|
|
||||||
# gets updated accordingly
|
|
||||||
client = mqtt.Client()
|
|
||||||
|
|
||||||
con = client.connect('test.mosquitto.org', 1883)
|
|
||||||
client.publish('door/position/value', 10).wait_for_publish(1)
|
|
||||||
time.sleep(1)
|
|
||||||
assert door_handle.encoder_position == 10
|
|
||||||
|
|
||||||
client.publish('door/state/value', 'open').wait_for_publish(1)
|
|
||||||
time.sleep(1)
|
|
||||||
assert door_handle.state == 'open'
|
|
||||||
|
|
||||||
timestamp = datetime.datetime.now().replace(microsecond=0)
|
|
||||||
token = '042979fa186280'
|
|
||||||
client.publish(
|
|
||||||
'door/token/last_invalid',
|
|
||||||
f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')};{token}"
|
|
||||||
).wait_for_publish(1)
|
|
||||||
time.sleep(1)
|
|
||||||
most_recent_token = door_handle.get_most_recent_token()
|
|
||||||
assert most_recent_token['timestamp'] == timestamp
|
|
||||||
assert most_recent_token['token'] == token
|
|
||||||
|
|
||||||
client.disconnect()
|
|
||||||
|
|
||||||
|
|
||||||
def test_open_door(door_handle):
|
|
||||||
door_handle.open_door()
|
|
||||||
|
|
||||||
|
|
||||||
def test_open_door_broken_socket(door_handle_no_nfc):
|
|
||||||
# test broken nfc_socket connection
|
|
||||||
with pytest.raises(RuntimeError):
|
|
||||||
door_handle_no_nfc.open_door()
|
|
||||||
|
|
||||||
|
|
||||||
def test_close_door(door_handle):
|
|
||||||
door_handle.close_door()
|
|
||||||
|
|
||||||
|
|
||||||
def test_close_door_broken_socket(door_handle_no_nfc):
|
|
||||||
# test broken nfc_socket connection
|
|
||||||
with pytest.raises(RuntimeError):
|
|
||||||
door_handle_no_nfc.close_door()
|
|
|
@ -1,622 +0,0 @@
|
||||||
import datetime
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
from flask_security.utils import lookup_identity
|
|
||||||
from imaginaerraum_door_admin.door_handle import DoorHandle
|
|
||||||
from imaginaerraum_door_admin.auth import ExtendedLoginForm
|
|
||||||
import re
|
|
||||||
import secrets
|
|
||||||
import pathlib
|
|
||||||
import json
|
|
||||||
|
|
||||||
|
|
||||||
def test_login(browser, live_server):
|
|
||||||
response = browser.get(f'http://localhost:{live_server.port}')
|
|
||||||
|
|
||||||
assert '<h1>Space Zugangsverwaltung</h1>' in browser.page_source
|
|
||||||
|
|
||||||
response = browser.get(f'http://localhost:{live_server.port}/login')
|
|
||||||
|
|
||||||
email_form = browser.find_element_by_xpath('//input[@id="email"]')
|
|
||||||
email_form.send_keys('gandalf@shire.me')
|
|
||||||
password_form = browser.find_element_by_xpath('//input[@id="password"]')
|
|
||||||
password_form.send_keys('shadowfax')
|
|
||||||
submit_button = browser.find_element_by_xpath('//input[@id="submit"]')
|
|
||||||
submit_button.click()
|
|
||||||
|
|
||||||
assert 'Tür öffnen' in browser.page_source
|
|
||||||
|
|
||||||
|
|
||||||
def extract_csrf_token(response):
|
|
||||||
soup = BeautifulSoup(response.data, 'html.parser')
|
|
||||||
csrf_token = soup.find('input', attrs={'id': 'csrf_token'})['value']
|
|
||||||
return csrf_token
|
|
||||||
|
|
||||||
|
|
||||||
def headless_login(client, user='gandalf@shire.me', password='shadowfax'):
|
|
||||||
# extract csrf token from the login page source
|
|
||||||
response = client.get('/login', follow_redirects=True)
|
|
||||||
csrf_token = extract_csrf_token(response)
|
|
||||||
|
|
||||||
# send login information
|
|
||||||
payload = {
|
|
||||||
'csrf_token': csrf_token,
|
|
||||||
'email': user,
|
|
||||||
'password': password
|
|
||||||
}
|
|
||||||
return client.post('/login', data=payload, follow_redirects=True)
|
|
||||||
|
|
||||||
|
|
||||||
def test_login_headless(client):
|
|
||||||
response = headless_login(client)
|
|
||||||
soup = BeautifulSoup(response.data, 'html.parser')
|
|
||||||
|
|
||||||
# make sure login succeeded -> Tür öffnen button will appear
|
|
||||||
assert any(['Tür öffnen' in link.contents[0]
|
|
||||||
for link in soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
|
||||||
|
|
||||||
|
|
||||||
def test_validate_ldap(client, mocker):
|
|
||||||
# mock ldap Connection object to simulate successful authentication
|
|
||||||
import ldap3
|
|
||||||
def init_success(self, *args, **kwargs):
|
|
||||||
pass
|
|
||||||
def init_LDAPBindError(self, *args, **kwargs):
|
|
||||||
raise ldap3.core.exceptions.LDAPBindError()
|
|
||||||
def init_LDAPSocketOpenError(self, *args, **kwargs):
|
|
||||||
raise ldap3.core.exceptions.LDAPSocketOpenError()
|
|
||||||
def init_Exception(self, *args, **kwargs):
|
|
||||||
raise Exception()
|
|
||||||
def search_success(self, *args, **kwargs):
|
|
||||||
return True
|
|
||||||
def search_failure(self, *args, **kwargs):
|
|
||||||
return False
|
|
||||||
|
|
||||||
mocker.patch.object(ldap3.Connection, '__init__', init_success)
|
|
||||||
mocker.patch.object(ldap3.Connection, 'search', search_success)
|
|
||||||
mock_entries = mocker.MagicMock()
|
|
||||||
mock_entries[0].mail.value = 'user@example.com'
|
|
||||||
mocker.patch.object(ldap3.Connection, 'entries', mock_entries)
|
|
||||||
|
|
||||||
with client.application.app_context():
|
|
||||||
# test successful login
|
|
||||||
form = ExtendedLoginForm()
|
|
||||||
form.email.data = 'user'
|
|
||||||
form.password.data = 'password'
|
|
||||||
result = form.validate_ldap()
|
|
||||||
assert result[0]
|
|
||||||
assert result[1]['username'] == 'user'
|
|
||||||
assert result[1]['email'] == 'user@example.com'
|
|
||||||
assert result[1]['roles'] == ['admin']
|
|
||||||
|
|
||||||
# test failing ldap search
|
|
||||||
mocker.patch.object(ldap3.Connection, 'search', search_failure)
|
|
||||||
result = form.validate_ldap()
|
|
||||||
|
|
||||||
assert not result[0]
|
|
||||||
assert result[1] is None
|
|
||||||
|
|
||||||
# test some errors in ldap Connection and authentication
|
|
||||||
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPBindError)
|
|
||||||
result = form.validate_ldap()
|
|
||||||
assert not result[0]
|
|
||||||
assert result[1] is None
|
|
||||||
|
|
||||||
mocker.patch.object(ldap3.Connection, '__init__', init_LDAPSocketOpenError)
|
|
||||||
result = form.validate_ldap()
|
|
||||||
assert not result[0]
|
|
||||||
assert result[1] is None
|
|
||||||
|
|
||||||
mocker.patch.object(ldap3.Connection, '__init__', init_Exception)
|
|
||||||
result = form.validate_ldap()
|
|
||||||
assert not result[0]
|
|
||||||
assert result[1] is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_login_ldap(client, temp_user, mocker):
|
|
||||||
# mock ldap validation for admin user
|
|
||||||
def mock_validate(self):
|
|
||||||
auth = self.email.data == temp_user['username'] and self.password.data == temp_user['password']
|
|
||||||
user_data = {'username': temp_user['username'],
|
|
||||||
'email': temp_user['email'],
|
|
||||||
'roles': ['admin'],
|
|
||||||
'password': temp_user['password']}
|
|
||||||
return auth, user_data
|
|
||||||
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
|
|
||||||
|
|
||||||
user = lookup_identity(temp_user['username'])
|
|
||||||
# remove local role so that ldap authentication is the default
|
|
||||||
user.roles.pop(0)
|
|
||||||
|
|
||||||
# log out admin user
|
|
||||||
client.get('/logout')
|
|
||||||
|
|
||||||
# log in temp user using ldap
|
|
||||||
response = headless_login(client, user=temp_user['username'],
|
|
||||||
password=temp_user['password'])
|
|
||||||
soup = BeautifulSoup(response.data, 'html.parser')
|
|
||||||
|
|
||||||
# make sure login succeeded -> Tür öffnen button will appear
|
|
||||||
assert any(['Tür öffnen' in link.contents[0]
|
|
||||||
for link in soup.findAll('a', attrs={'class': ['btn'],
|
|
||||||
'role': 'button'})])
|
|
||||||
|
|
||||||
|
|
||||||
def test_login_ldap_new_user(client, mocker):
|
|
||||||
# mock ldap validation for admin user
|
|
||||||
def mock_validate(self):
|
|
||||||
auth = True
|
|
||||||
user_data = {'username': 'Balrog',
|
|
||||||
'email': 'balrog@moria.me',
|
|
||||||
'roles': ['admin'],
|
|
||||||
'password': 'youshallnotpass'}
|
|
||||||
return auth, user_data
|
|
||||||
mocker.patch('imaginaerraum_door_admin.auth.ExtendedLoginForm.validate_ldap', mock_validate)
|
|
||||||
|
|
||||||
# initially, the Balrog user should not exist
|
|
||||||
user = lookup_identity('Balrog')
|
|
||||||
assert user is None
|
|
||||||
|
|
||||||
# log in temp user using ldap -> this will succeed and create a local user
|
|
||||||
response = headless_login(client, user='Balrog',
|
|
||||||
password='youshallnotpass')
|
|
||||||
soup = BeautifulSoup(response.data, 'html.parser')
|
|
||||||
|
|
||||||
# make sure user is now created locally
|
|
||||||
user = lookup_identity('Balrog')
|
|
||||||
assert user is not None
|
|
||||||
|
|
||||||
# make sure login succeeded -> Tür öffnen button will appear
|
|
||||||
assert any(['Tür öffnen' in link.contents[0]
|
|
||||||
for link in soup.findAll('a', attrs={'class': ['btn'],
|
|
||||||
'role': 'button'})])
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def client_authenticated(client):
|
|
||||||
# log in using admin account for testing
|
|
||||||
headless_login(client)
|
|
||||||
|
|
||||||
yield client
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("url,function", [('/open', 'open_door'),
|
|
||||||
('/close', 'close_door')])
|
|
||||||
def test_access_door_button(client_authenticated, mocker, url, function):
|
|
||||||
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
|
|
||||||
|
|
||||||
# visit route for open
|
|
||||||
client_authenticated.get(url)
|
|
||||||
|
|
||||||
# make sure the open method was called
|
|
||||||
getattr(DoorHandle, function).assert_called_once_with(user='gandalf')
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("url,function", [('/open', 'open_door'), ('/close', 'close_door')])
|
|
||||||
def test_access_door_unauthenticated(client, mocker, url, function):
|
|
||||||
# test for trying to visit opening/closing door while not logged in
|
|
||||||
mocker.patch('imaginaerraum_door_admin.door_handle.DoorHandle.' + function)
|
|
||||||
|
|
||||||
# visit route for open
|
|
||||||
response = client.get(url, follow_redirects=True)
|
|
||||||
|
|
||||||
# we should get redirected to login page
|
|
||||||
assert 'login' in response.request.url
|
|
||||||
|
|
||||||
# the open door function should not be called
|
|
||||||
getattr(DoorHandle, function).assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_manage_admins(client_authenticated):
|
|
||||||
# visit admin management page
|
|
||||||
response = client_authenticated.get('/manage_admins')
|
|
||||||
|
|
||||||
assert "Nutzer Übersicht" in response.data.decode()
|
|
||||||
assert "gandalf" in response.data.decode()
|
|
||||||
assert "gandalf@shire.me" in response.data.decode()
|
|
||||||
|
|
||||||
|
|
||||||
def create_user(client_authenticated, username, email):
|
|
||||||
# visit admin management page
|
|
||||||
response = client_authenticated.get('/manage_admins')
|
|
||||||
csrf_token = extract_csrf_token(response)
|
|
||||||
|
|
||||||
# post data for creating a new admin
|
|
||||||
payload = {'name': username,
|
|
||||||
'email': email,
|
|
||||||
'csrf_token': csrf_token}
|
|
||||||
response = client_authenticated.post('/manage_admins', data=payload,
|
|
||||||
follow_redirects=True)
|
|
||||||
|
|
||||||
# after the new admin user is created, we should have been redirected to the
|
|
||||||
# /manage_admin page. there, the password for login is displayed
|
|
||||||
# we test if the newly created user can log in with that password
|
|
||||||
# extract password displayed on the page
|
|
||||||
match = re.search('Passwort (?P<password>.*) um', response.data.decode())
|
|
||||||
assert match is not None
|
|
||||||
extracted_password = match['password']
|
|
||||||
|
|
||||||
return extracted_password
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def temp_user(client_authenticated):
|
|
||||||
"""Fixture for creating a temporary user for testing"""
|
|
||||||
username = secrets.token_hex(4)
|
|
||||||
email = username + '@example.com'
|
|
||||||
|
|
||||||
# create user for test
|
|
||||||
password = create_user(client_authenticated, username, email)
|
|
||||||
|
|
||||||
return {'username': username,
|
|
||||||
'email': email,
|
|
||||||
'password': password}
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def temp_admin(client_authenticated):
|
|
||||||
"""Fixture for creating a temporary admin user for testing"""
|
|
||||||
username = secrets.token_hex(4)
|
|
||||||
email = username + '@example.com'
|
|
||||||
|
|
||||||
# create user for test
|
|
||||||
password = create_user(client_authenticated, username, email)
|
|
||||||
|
|
||||||
response = client_authenticated.get(
|
|
||||||
f"/promote_admin/{username}",
|
|
||||||
follow_redirects=True)
|
|
||||||
user = lookup_identity(username)
|
|
||||||
assert user.has_role('admin')
|
|
||||||
|
|
||||||
return {'username': username,
|
|
||||||
'email': email,
|
|
||||||
'password': password}
|
|
||||||
|
|
||||||
|
|
||||||
def test_backup_users(client_authenticated, temp_user):
|
|
||||||
# test with invalid token
|
|
||||||
response = client_authenticated.get("/backup_user_datastore",
|
|
||||||
follow_redirects=True)
|
|
||||||
user_data = json.loads(response.data)
|
|
||||||
users = [d['username'] for d in user_data]
|
|
||||||
emails = [d['email'] for d in user_data]
|
|
||||||
|
|
||||||
assert temp_user['username'] in users
|
|
||||||
assert temp_user['email'] in emails
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_admin(client_authenticated):
|
|
||||||
password = create_user(client_authenticated, 'bilbo', 'bilbo@shire.me')
|
|
||||||
|
|
||||||
# log out current user
|
|
||||||
response = client_authenticated.get('/logout')
|
|
||||||
|
|
||||||
# try to log in new user using the extracted password
|
|
||||||
response = headless_login(client_authenticated, user='bilbo@shire.me',
|
|
||||||
password=password)
|
|
||||||
# - see if it works
|
|
||||||
soup = BeautifulSoup(response.data, 'html.parser')
|
|
||||||
|
|
||||||
# make sure login succeeded
|
|
||||||
# -> username should be displayed
|
|
||||||
assert 'Benutzer <span>bilbo</span>' in soup.decode()
|
|
||||||
# -> Tür öffnen button will appear
|
|
||||||
assert any(['Tür öffnen' in link.contents[0] for link in
|
|
||||||
soup.findAll('a', attrs={'class': ['btn'], 'role': 'button'})])
|
|
||||||
|
|
||||||
|
|
||||||
def test_activate_deactivate_user(temp_user, client_authenticated):
|
|
||||||
response = client_authenticated.get(
|
|
||||||
'/admin_toggle_active/nosuchuser',
|
|
||||||
follow_redirects=True)
|
|
||||||
assert 'Ungültiger Nutzer' in response.data.decode()
|
|
||||||
|
|
||||||
# deactivate the user
|
|
||||||
response = client_authenticated.get(
|
|
||||||
f"/admin_toggle_active/{temp_user['username']}",
|
|
||||||
follow_redirects=True)
|
|
||||||
# make sure the user is now inactive
|
|
||||||
user = lookup_identity(temp_user['username'])
|
|
||||||
assert user is not None
|
|
||||||
assert not user.active
|
|
||||||
|
|
||||||
# activate the user again
|
|
||||||
client_authenticated.get(f"/admin_toggle_active/{temp_user['username']}",
|
|
||||||
follow_redirects=True)
|
|
||||||
|
|
||||||
# now the user should be active again
|
|
||||||
user = lookup_identity(temp_user['username'])
|
|
||||||
assert user is not None
|
|
||||||
assert user.active
|
|
||||||
|
|
||||||
# test deactivating super admin
|
|
||||||
response = client_authenticated.get(f"/admin_toggle_active/gandalf",
|
|
||||||
follow_redirects=True)
|
|
||||||
assert 'Super-Admins können nicht deaktiviert werden!' \
|
|
||||||
in response.data.decode()
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_admin(temp_user, client_authenticated):
|
|
||||||
# first we test deleting a non-existing user
|
|
||||||
response = client_authenticated.post(
|
|
||||||
'/delete_admins/nosuchuser',
|
|
||||||
follow_redirects=True)
|
|
||||||
assert 'Ungültiger Nutzer' in response.data.decode()
|
|
||||||
|
|
||||||
# next, we create a temporary user and try to delete that one
|
|
||||||
response = client_authenticated.post(
|
|
||||||
f"/delete_admins/{temp_user['username']}",
|
|
||||||
follow_redirects=True)
|
|
||||||
|
|
||||||
# we need to deactivate the user first
|
|
||||||
assert 'Bitte den Benutzer zuerst deaktivieren.' in response.data.decode()
|
|
||||||
# make sure the user still exists
|
|
||||||
user = lookup_identity(temp_user['username'])
|
|
||||||
assert user is not None
|
|
||||||
|
|
||||||
# deactivate the user and try deleting it again
|
|
||||||
response = client_authenticated.get(
|
|
||||||
f"/admin_toggle_active/{temp_user['username']}",
|
|
||||||
follow_redirects=True)
|
|
||||||
|
|
||||||
# try deleting it without filling in the confirmation form
|
|
||||||
response = client_authenticated.post(
|
|
||||||
f"/delete_admins/{temp_user['username']}",
|
|
||||||
follow_redirects=True)
|
|
||||||
assert 'Der eingegebene Nutzername stimmt nicht überein' \
|
|
||||||
in response.data.decode()
|
|
||||||
# make sure the user still exists
|
|
||||||
user = lookup_identity(temp_user['username'])
|
|
||||||
assert user is not None
|
|
||||||
|
|
||||||
# now we send the confirmation data with the request
|
|
||||||
response = client_authenticated.get(
|
|
||||||
f"/delete_admins/{temp_user['username']}",
|
|
||||||
follow_redirects=True)
|
|
||||||
csrf_token = extract_csrf_token(response)
|
|
||||||
payload = {'name': temp_user['username'], 'csrf_token': csrf_token}
|
|
||||||
response = client_authenticated.post(
|
|
||||||
f"/delete_admins/{temp_user['username']}",
|
|
||||||
data=payload,
|
|
||||||
follow_redirects=True)
|
|
||||||
assert f"Benutzer {temp_user['username']} wurde gelöscht." in response.data.decode()
|
|
||||||
|
|
||||||
# make sure the user now is gone
|
|
||||||
user = lookup_identity(temp_user['username'])
|
|
||||||
assert user is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_promote_user(temp_user, client_authenticated):
|
|
||||||
# first we test with a non-existing user
|
|
||||||
response = client_authenticated.get(
|
|
||||||
'/promote_admin/nosuchuser',
|
|
||||||
follow_redirects=True)
|
|
||||||
assert 'Ungültiger Nutzer' in response.data.decode()
|
|
||||||
|
|
||||||
user = lookup_identity(temp_user['username'])
|
|
||||||
assert user is not None
|
|
||||||
assert not user.has_role('admin')
|
|
||||||
# grant admin permissions to test user
|
|
||||||
response = client_authenticated.get(
|
|
||||||
f"/promote_admin/{temp_user['username']}",
|
|
||||||
follow_redirects=True)
|
|
||||||
assert user.has_role('admin')
|
|
||||||
|
|
||||||
# try granting admin permissions again
|
|
||||||
response = client_authenticated.get(
|
|
||||||
f"/promote_admin/{temp_user['username']}",
|
|
||||||
follow_redirects=True)
|
|
||||||
assert f"Benutzer {temp_user['username']} hat bereits Admin-Rechte!"
|
|
||||||
assert user.has_role('admin')
|
|
||||||
|
|
||||||
|
|
||||||
def test_demote_user(temp_admin, client_authenticated):
|
|
||||||
# first we test with a non-existing user
|
|
||||||
response = client_authenticated.get(
|
|
||||||
'/demote_admin/nosuchuser',
|
|
||||||
follow_redirects=True)
|
|
||||||
assert 'Ungültiger Nutzer' in response.data.decode()
|
|
||||||
|
|
||||||
user = lookup_identity(temp_admin['username'])
|
|
||||||
assert user.has_role('admin')
|
|
||||||
# try removing admin permissions
|
|
||||||
response = client_authenticated.get(
|
|
||||||
f"/demote_admin/{temp_admin['username']}",
|
|
||||||
follow_redirects=True)
|
|
||||||
assert not user.has_role('admin')
|
|
||||||
|
|
||||||
# try removing admin permissions
|
|
||||||
response = client_authenticated.get(
|
|
||||||
f"/demote_admin/{temp_admin['username']}",
|
|
||||||
follow_redirects=True)
|
|
||||||
assert f"Benutzer {temp_admin['username']} ist bereits kein Admin!" \
|
|
||||||
in response.data.decode()
|
|
||||||
assert not user.has_role('admin')
|
|
||||||
|
|
||||||
# try removing admin permissions from superadmin
|
|
||||||
response = client_authenticated.get(
|
|
||||||
f"/demote_admin/gandalf",
|
|
||||||
follow_redirects=True)
|
|
||||||
|
|
||||||
assert "hat Super-Admin-Rechte und kann nicht verändert werden!" \
|
|
||||||
in response.data.decode()
|
|
||||||
|
|
||||||
|
|
||||||
def test_list_tokens(client_authenticated):
|
|
||||||
response = client_authenticated.get(f"/tokens", follow_redirects=True)
|
|
||||||
|
|
||||||
# make sure the names for the test tokens are displayed
|
|
||||||
assert all([user in response.data.decode()
|
|
||||||
for user in ['Gandalf', 'Bilbo', 'Gimli']])
|
|
||||||
|
|
||||||
|
|
||||||
def test_token_log(client_authenticated):
|
|
||||||
response = client_authenticated.get(f"/token-log", follow_redirects=True)
|
|
||||||
|
|
||||||
page_src = response.data.decode()
|
|
||||||
|
|
||||||
# perform some checks for displayed log data
|
|
||||||
assert all([msg in page_src for msg in
|
|
||||||
['INFO', 'DEBUG', 'ERROR']])
|
|
||||||
assert "Valid token 04538cfa186280 of Gandalf" in page_src
|
|
||||||
assert "2021-04-17 13:09:06,207" in page_src
|
|
||||||
|
|
||||||
|
|
||||||
def test_backup_tokens(client_authenticated):
|
|
||||||
# test with invalid token
|
|
||||||
response = client_authenticated.get(f"/backup_tokens",
|
|
||||||
follow_redirects=True)
|
|
||||||
token_data = json.loads(response.data)
|
|
||||||
|
|
||||||
assert {'04387cfa186280', '043a81fa186280', '04538cfa186280',
|
|
||||||
'042979fa186280'}.issubset(token_data.keys())
|
|
||||||
|
|
||||||
|
|
||||||
def test_register_token(client_authenticated, mocker):
|
|
||||||
# test to make sure message is displayed when no tokens were recently
|
|
||||||
# scanned
|
|
||||||
response = client_authenticated.get(f"/register-token",
|
|
||||||
follow_redirects=True)
|
|
||||||
page_src = response.data.decode()
|
|
||||||
assert 'Keine unregistrierten Tokens in MQTT Nachrichten.' in page_src
|
|
||||||
|
|
||||||
# mockup scanned token
|
|
||||||
mocker.patch(
|
|
||||||
'imaginaerraum_door_admin.door_handle.DoorHandle.get_most_recent_token',
|
|
||||||
lambda x: {'timestamp': datetime.datetime.now(),
|
|
||||||
'token': '042979fa181280'})
|
|
||||||
response = client_authenticated.get(f"/register-token", follow_redirects=True)
|
|
||||||
csrf_token = extract_csrf_token(response)
|
|
||||||
page_src = response.data.decode()
|
|
||||||
assert 'Unregistrierter Token gelesen' in page_src
|
|
||||||
assert '042979fa181280' in page_src
|
|
||||||
|
|
||||||
# try registering with incomplete data
|
|
||||||
response = client_authenticated.post(f"/register-token", data={},
|
|
||||||
follow_redirects=True)
|
|
||||||
page_src = response.data.decode()
|
|
||||||
assert 'Token konnte nicht registiert werden' in page_src
|
|
||||||
|
|
||||||
# register the mocked token
|
|
||||||
payload = {'name': 'Legolas',
|
|
||||||
'organization': 'Elves',
|
|
||||||
'email': 'legolas@mirkwood.me',
|
|
||||||
'dsgvo': True,
|
|
||||||
'csrf_token': csrf_token}
|
|
||||||
response = client_authenticated.post(f"/register-token", data=payload,
|
|
||||||
follow_redirects=True)
|
|
||||||
page_src = response.data.decode()
|
|
||||||
|
|
||||||
# make sure the user info for the new token is displayed
|
|
||||||
assert '042979fa181280' in page_src
|
|
||||||
assert 'Legolas' in page_src
|
|
||||||
assert 'Elves' in page_src
|
|
||||||
assert 'legolas@mirkwood.me' in page_src
|
|
||||||
|
|
||||||
# check that the token is created in the token file
|
|
||||||
token_data = pathlib.Path(
|
|
||||||
client_authenticated.application.config['TOKEN_FILE']).read_text()
|
|
||||||
assert '042979fa181280' in token_data
|
|
||||||
assert 'Legolas' in token_data
|
|
||||||
|
|
||||||
|
|
||||||
def test_edit_token(client_authenticated):
|
|
||||||
# test with invalid token
|
|
||||||
response = client_authenticated.get(f"/edit-token/nosuchtoken",
|
|
||||||
follow_redirects=True)
|
|
||||||
page_src = response.data.decode()
|
|
||||||
assert 'Ungültiger Token' in page_src
|
|
||||||
|
|
||||||
response = client_authenticated.post(f"/edit-token/nosuchtoken",
|
|
||||||
follow_redirects=True)
|
|
||||||
page_src = response.data.decode()
|
|
||||||
assert 'Token konnte nicht editiert werden' in page_src
|
|
||||||
|
|
||||||
# test using a valid token from the token file
|
|
||||||
response = client_authenticated.get(f"/edit-token/04538cfa186280",
|
|
||||||
follow_redirects=True)
|
|
||||||
csrf_token = extract_csrf_token(response)
|
|
||||||
|
|
||||||
payload = {
|
|
||||||
'name': 'Balin',
|
|
||||||
'organization': 'Dwarves',
|
|
||||||
'email': 'balin@erebor.me',
|
|
||||||
'active': True,
|
|
||||||
'limit_validity': False,
|
|
||||||
'valid_thru': datetime.date.today(),
|
|
||||||
'csrf_token': csrf_token
|
|
||||||
}
|
|
||||||
response = client_authenticated.post(f"/edit-token/04538cfa186280",
|
|
||||||
data=payload,
|
|
||||||
follow_redirects=True)
|
|
||||||
page_src = response.data.decode()
|
|
||||||
# make sure the new user info for the token is displayed
|
|
||||||
assert '04538cfa186280' in page_src
|
|
||||||
assert 'Balin' in page_src
|
|
||||||
assert 'Dwarves' in page_src
|
|
||||||
assert 'balin@erebor.me' in page_src
|
|
||||||
|
|
||||||
# check that the token is changed in the token file
|
|
||||||
token_data = pathlib.Path(client_authenticated.application.config['TOKEN_FILE']).read_text()
|
|
||||||
assert '04538cfa186280' in token_data
|
|
||||||
assert 'Balin' in token_data
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_token(client_authenticated):
|
|
||||||
token_data = pathlib.Path(
|
|
||||||
client_authenticated.application.config['TOKEN_FILE']).read_text()
|
|
||||||
assert '04538cfa186280' in token_data
|
|
||||||
|
|
||||||
# test with invalid token
|
|
||||||
response = client_authenticated.get(f"/delete-token/nosuchtoken",
|
|
||||||
follow_redirects=True)
|
|
||||||
page_src = response.data.decode()
|
|
||||||
assert 'Ungültiger Token' in page_src
|
|
||||||
|
|
||||||
# test using a valid token from the token file
|
|
||||||
response = client_authenticated.get(f"/delete-token/043a81fa186280",
|
|
||||||
follow_redirects=True)
|
|
||||||
csrf_token = extract_csrf_token(response)
|
|
||||||
|
|
||||||
# try deleting without form data
|
|
||||||
response = client_authenticated.post(f"/delete-token/043a81fa186280",
|
|
||||||
follow_redirects=True)
|
|
||||||
page_src = response.data.decode()
|
|
||||||
assert "wurde nicht gelöscht" in page_src
|
|
||||||
|
|
||||||
payload = {
|
|
||||||
'name': 'Bilbo',
|
|
||||||
'csrf_token': csrf_token
|
|
||||||
}
|
|
||||||
response = client_authenticated.post(f"/delete-token/043a81fa186280",
|
|
||||||
data=payload,
|
|
||||||
follow_redirects=True)
|
|
||||||
page_src = response.data.decode()
|
|
||||||
print(page_src)
|
|
||||||
assert "wurde gelöscht" in page_src
|
|
||||||
|
|
||||||
# check that the token is now gone from the token file
|
|
||||||
token_data = pathlib.Path(client_authenticated.application.config['TOKEN_FILE']).read_text()
|
|
||||||
assert '043a81fa186280' not in token_data
|
|
||||||
|
|
||||||
|
|
||||||
def test_deactivate_token(client_authenticated):
|
|
||||||
token_data = pathlib.Path(
|
|
||||||
client_authenticated.application.config['TOKEN_FILE']).read_text()
|
|
||||||
assert '04387cfa186280' in token_data
|
|
||||||
|
|
||||||
# test with invalid token
|
|
||||||
response = client_authenticated.get(f"/deactivate-token/nosuchtoken",
|
|
||||||
follow_redirects=True)
|
|
||||||
page_src = response.data.decode()
|
|
||||||
assert 'Ungültiger Token' in page_src
|
|
||||||
|
|
||||||
# deactivate token
|
|
||||||
response = client_authenticated.get(f"/deactivate-token/04387cfa186280",
|
|
||||||
follow_redirects=True)
|
|
||||||
|
|
||||||
# check that the token is now gone from the token file
|
|
||||||
token_data = pathlib.Path(
|
|
||||||
client_authenticated.application.config['TOKEN_FILE']).read_text()
|
|
||||||
assert '#04387cfa186280' in token_data
|
|
Loading…
Reference in New Issue
Block a user