Compare commits

...

8 Commits

6 changed files with 65 additions and 61 deletions

View File

@ -1,4 +1,4 @@
Flask-based web interface for user token adminstration of our hackerspace's door lock.
Flask-based web interface for user token administration of our hackerspace's door lock.
## Installation
Clone the repo
@ -49,7 +49,7 @@ definitely should set custom values for the ``SECRET_KEY`` and
``SECURITY_PASSWORD_SALT``.
### Token file
The token file is an ASCII file which list the IDs of RFID tokens that can be
The token file is an ASCII file which lists the IDs of RFID tokens that can be
used to unlock the door. You can specify the path to the token file using the
``TOKEN_FILE`` variable in the configuration file.
Here's an example of a token file (lines starting with ``#`` represent inactive

View File

@ -1,28 +1,11 @@
#!/usr/bin/env python3
import argparse
from imaginaerraum_door_admin import create_app
from imaginaerraum_door_admin.webapp import create_application
parser = argparse.ArgumentParser()
parser.add_argument("--key_file", default='/root/flask_keys', help="Path to file with Flask SECRET_KEY and SECURITY_PASSWORD_SALT")
parser.add_argument("--token_file", default="/etc/door_tokens", help="path to the file with door tokens and users")
parser.add_argument("--nfc_socket", default="/tmp/nfc.sock", help="socket for handling NFC reader commands")
parser.add_argument("--template_folder", default="templates", help="path to Flask templates folder")
parser.add_argument("--static_folder", default="static", help="path to Flask static folder")
parser.add_argument("--admin_file", default="/etc/admins.conf", help="Path to file for creating super admin users")
parser.add_argument("--log_file", default="/var/log/webinterface.log", help="Path to flask log file")
parser.add_argument("--nfc_log", default="/var/log/nfc.log", help="Path to nfc log file")
parser.add_argument("--ldap_url", default="ldaps://ldap.imaginaerraum.de",
help="URL for LDAP server for alternative user authorization")
parser.add_argument("--mqtt_host", default="10.10.21.2", help="IP address of MQTT broker")
parser.add_argument("--flask_port", default=80, help="Port for running the Flask server")
parser.add_argument("--mail_server", default="smtp.googlemail.com", help="email server for sending security messages")
parser.add_argument("--mail_port", default=465, help="port for security email server")
parser.add_argument("--mail_use_tls", default=False, help="use TLS for security emails")
parser.add_argument("--mail_use_ssl", default=True, help="use SSL for security emails")
parser.add_argument("--mail_username", default="admin@example.com", help="email account for sending security messages")
parser.add_argument("--mail_password", default="password", help="Password for email account")
config = parser.parse_args()
def main():
app = create_app()
app.run(host='0.0.0.0', port=app.config.get('PORT', 80))
app = create_application(config)
app.run(host='0.0.0.0', port=config.flask_port)
if __name__ == "__main__":
main()

View File

@ -11,7 +11,7 @@ security = Security()
db = SQLAlchemy()
# create admin users (only if they don't exists already)
# create admin users (only if they don't exist already)
def create_super_admins(app, user_datastore):
admin_file = Path(app.config.get('ADMIN_FILE'))
@ -37,8 +37,10 @@ def create_super_admins(app, user_datastore):
'password': pw})
except Exception as e:
app.logger.error(
f"Error while parsing line {i} in admin config file. Config file should contain lines of "
f"'<username> <email> <password>\\n'\n Exception: {e}\nAdmin account could not be created."
f"Error while parsing line {i} in admin config file. "
f"Config file should contain lines of <username> "
f"<email> <password>\\n'\n "
f"Exception: {e}\nAdmin account could not be created."
)
with app.app_context():
@ -73,10 +75,11 @@ def create_super_admins(app, user_datastore):
def create_app():
app = Flask(__name__)
app.config.from_object('imaginaerraum_door_admin.default_app_config.DefaultConfig')
app.config.from_object(
'imaginaerraum_door_admin.default_app_config.DefaultConfig'
)
app.config.from_envvar('APPLICATION_SETTINGS', silent=True)
token_file = Path(app.config.get('TOKEN_FILE'))
if not token_file.exists():
app.logger.warning(

View File

@ -38,7 +38,10 @@ class ExtendedLoginForm(LoginForm):
authorized = super(ExtendedLoginForm, self).validate()
if authorized:
current_app.logger.info(f"User with credentials '{self.email.data}' authorized through local database")
current_app.logger.info(
f"User with credentials '{self.email.data}' authorized "
f"through local database"
)
else:
# run LDAP authorization
# if the authorization succeeds we also get the new_user_data
@ -71,32 +74,41 @@ class ExtendedLoginForm(LoginForm):
if authorized:
# if there was no user in the database before we create a new
# user
self.user = security.datastore.create_user(username=new_user_data['username'], email=new_user_data['email'],
password=new_user_data['password'], roles=new_user_data['roles'])
self.user = security.datastore.create_user(
username=new_user_data['username'],
email=new_user_data['email'],
password=new_user_data['password'],
roles=new_user_data['roles']
)
security.datastore.commit()
current_app.logger.info(f"New admin user '{new_user_data['username']} <{new_user_data['email']}>' created after"
" successful LDAP authorization")
current_app.logger.info(
f"New admin user '{new_user_data['username']} "
f"<{new_user_data['email']}>' created after successful "
f"LDAP authorization"
)
# if any of the authorization methods is successful we authorize the user
# if any of the authorization methods is successful we authorize
# the user
return authorized
def validate_ldap(self):
"""Validate the user and password through an LDAP server.
If the connection completes successfully the given user and password is authorized.
Then the permissions and additional information of the user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update the entry for the user in the local
database.
If the connection completes successfully the given user and password
is authorized. Then the permissions and additional information of the
user are obtained through an LDAP search.
The data is stored in a dict which will be used later to create/update
the entry for the user in the local database.
Parameters
----------
Returns
-------
bool : result of the authorization process (True = success, False = failure)
dict : dictionary with information about an authorized user (contains username, email, hashed password,
roles)
bool : result of the authorization process (True = success,
False = failure)
dict : dictionary with information about an authorized user
(contains username, email, hashed password, roles)
"""
ldap_server = ldap3.Server(current_app.config['LDAP_URL'])
ldap_user_group = current_app.config['LDAP_USER_GROUP']
@ -107,15 +119,20 @@ class ExtendedLoginForm(LoginForm):
password = self.password.data
try:
user = f"uid={username},ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
con = ldap3.Connection(ldap_server,
user=user,
password=password, auto_bind=True)
user = f"uid={username},ou={ldap_user_group},dc={ldap_domain}," \
f"dc={ldap_domain_ext}"
con = ldap3.Connection(
ldap_server,
user=user,
password=password,
auto_bind=True
)
except ldap3.core.exceptions.LDAPBindError as e:
# server reachable but user unauthorized -> fail
return False, None
except LDAPSocketOpenError as e:
# server not reachable -> fail (but will try authorization from local database later)
# server not reachable -> fail (but will try authorization from
# local database later)
return False, None
except Exception as e:
# for other Exceptions we just fail
@ -127,15 +144,18 @@ class ExtendedLoginForm(LoginForm):
new_user_data['password'] = hash_password(password)
new_user_data['roles'] = []
search_base = f"ou={ldap_user_group},dc={ldap_domain},dc={ldap_domain_ext}"
search_filter = f"(&(uid={username})(memberof=cn=Keyholders,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))"
lock_permission = con.search(search_base, search_filter,
attributes=ldap3.ALL_ATTRIBUTES)
search_filter = f"(&(uid={username})(memberof=cn=Keyholders," \
f"ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))"
lock_permission = con.search(
search_base, search_filter, attributes=ldap3.ALL_ATTRIBUTES
)
if lock_permission:
new_user_data['email'] = con.entries[0].mail.value
else:
return False, None
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,dc={ldap_domain},dc={ldap_domain_ext}))'
search_filter = f'(&(uid={username})(memberof=cn=Vorstand,ou=Groups,' \
f'dc={ldap_domain},dc={ldap_domain_ext}))'
token_granting_permission = con.search(search_base, search_filter)
if token_granting_permission:
new_user_data['roles'].append('admin')

View File

@ -1,13 +1,16 @@
print("loading default config")
import bleach
from flask_security import uia_email_mapper
print("loading default config")
def uia_username_mapper(identity):
# we allow pretty much anything - but we bleach it.
return bleach.clean(identity, strip=True)
class DefaultConfig(object):
DEBUG = False
PORT = 80
SECRET_KEY = 'supersecret'
SECURITY_PASSWORD_SALT = 'salty'
@ -44,7 +47,6 @@ class DefaultConfig(object):
"pool_pre_ping": True,
}
KEY_FILE = '/root/flask_keys'
TOKEN_FILE = "door_tokens"
ADMIN_FILE = 'admins'

View File

@ -40,12 +40,10 @@ class DoorHandle:
self.mqtt_client.on_message = self.on_message
self.mqtt_client.connect_async(host=mqtt_host, port=mqtt_port)
self.mqtt_client.loop_start()
self.logger.info(f"Connected to MQTT broker at {mqtt_host}:{mqtt_port}")
self.data_fields = ['name', 'organization', 'email', 'valid_thru']
pass
# The callback for when the client receives a CONNACK response from the server.
def on_connect(self, client, userdata, flags, rc):
self.logger.info("Connected to MQTT broker with result code " + str(rc))
@ -54,8 +52,6 @@ class DoorHandle:
# reconnect then subscriptions will be renewed.
client.subscribe("door/#")
pass
# The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg):
#print(msg.topic + " " + str(msg.payload))