added command line argument parser
use MQTT to get last invalid token instead of reading it from the logfile
This commit is contained in:
parent
55c4f6ea7f
commit
0b0d7f9a93
42
app.py
42
app.py
|
@ -5,27 +5,46 @@ from wtforms.fields.html5 import DateField, EmailField
|
||||||
from wtforms.fields import StringField, BooleanField
|
from wtforms.fields import StringField, BooleanField
|
||||||
from wtforms.validators import DataRequired, ValidationError
|
from wtforms.validators import DataRequired, ValidationError
|
||||||
from flask_sqlalchemy import SQLAlchemy
|
from flask_sqlalchemy import SQLAlchemy
|
||||||
from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password
|
from flask_security import Security, SQLAlchemyUserDatastore, auth_required, hash_password, uia_email_mapper
|
||||||
from flask_security.models import fsqla_v2 as fsqla
|
from flask_security.models import fsqla_v2 as fsqla
|
||||||
from flask_security.forms import LoginForm, Required, PasswordField
|
from flask_security.forms import LoginForm, Required, PasswordField
|
||||||
|
import bleach
|
||||||
|
|
||||||
from datetime import date
|
from datetime import date
|
||||||
from door_handle import DoorHandle
|
from door_handle import DoorHandle
|
||||||
|
|
||||||
MQTT_BROKER = '10.10.21.2'
|
import argparse
|
||||||
|
|
||||||
door = DoorHandle(MQTT_BROKER)
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--token_file", default="/etc/door_tokens")
|
||||||
|
parser.add_argument("--template_folder", default="/usr/share/door_admin/templates")
|
||||||
|
parser.add_argument("--static_folder", default="/usr/share/door_admin/static")
|
||||||
|
parser.add_argument("--mqtt_host", default="10.10.21.2")
|
||||||
|
config = parser.parse_args()
|
||||||
|
|
||||||
app = Flask(__name__)
|
# create door objects which provides access to the token file and current door state via MQTT
|
||||||
|
door = DoorHandle(config.mqtt_host, token_file=config.token_file)
|
||||||
|
|
||||||
|
app = Flask(__name__, template_folder=config.template_folder, static_folder=config.static_folder)
|
||||||
|
|
||||||
# Generate a nice key using secrets.token_urlsafe()
|
# Generate a nice key using secrets.token_urlsafe()
|
||||||
app.config['SECRET_KEY'] = os.environ.get("SECRET_KEY", 'Q7PJu2fg2jabYwP-Psop6c6f2G4')
|
app.config['SECRET_KEY'] = os.environ.get("SECRET_KEY", 'Q7PJu2fg2jabYwP-Psop6c6f2G4')
|
||||||
# Bcrypt is set as default SECURITY_PASSWORD_HASH, which requires a salt
|
# Bcrypt is set as default SECURITY_PASSWORD_HASH, which requires a salt
|
||||||
# Generate a good salt using: secrets.SystemRandom().getrandbits(128)
|
# Generate a good salt using: secrets.SystemRandom().getrandbits(128)
|
||||||
app.config['SECURITY_PASSWORD_SALT'] = os.environ.get("SECURITY_PASSWORD_SALT", '10036796768252925167749545152988277953')
|
app.config['SECURITY_PASSWORD_SALT'] = os.environ.get("SECURITY_PASSWORD_SALT",
|
||||||
app.config['SECURITY_USER_IDENTITY_ATTRIBUTES'] = ('username', 'email')
|
'10036796768252925167749545152988277953')
|
||||||
|
|
||||||
|
|
||||||
|
def uia_username_mapper(identity):
|
||||||
|
# we allow pretty much anything - but we bleach it.
|
||||||
|
return bleach.clean(identity, strip=True)
|
||||||
|
|
||||||
|
|
||||||
|
app.config['SECURITY_USER_IDENTITY_ATTRIBUTES'] = [
|
||||||
|
{"email": {"mapper": uia_email_mapper, "case_insensitive": True}},
|
||||||
|
{"username": {"mapper": uia_username_mapper}}
|
||||||
|
]
|
||||||
|
|
||||||
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///admin.db'
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///admin.db'
|
||||||
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
|
# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the
|
||||||
# underlying engine. This option makes sure that DB connections from the
|
# underlying engine. This option makes sure that DB connections from the
|
||||||
|
@ -41,12 +60,15 @@ db = SQLAlchemy(app)
|
||||||
# Define models
|
# Define models
|
||||||
fsqla.FsModels.set_db_info(db)
|
fsqla.FsModels.set_db_info(db)
|
||||||
|
|
||||||
|
|
||||||
class Role(db.Model, fsqla.FsRoleMixin):
|
class Role(db.Model, fsqla.FsRoleMixin):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class User(db.Model, fsqla.FsUserMixin):
|
class User(db.Model, fsqla.FsUserMixin):
|
||||||
username = db.Column(db.String(255))
|
username = db.Column(db.String(255))
|
||||||
|
|
||||||
|
|
||||||
class ExtendedLoginForm(LoginForm):
|
class ExtendedLoginForm(LoginForm):
|
||||||
email = StringField('Benutzername oder E-Mail', [Required()])
|
email = StringField('Benutzername oder E-Mail', [Required()])
|
||||||
password = PasswordField('Passwort', [Required()])
|
password = PasswordField('Passwort', [Required()])
|
||||||
|
@ -58,7 +80,7 @@ security = Security(app, user_datastore, login_form=ExtendedLoginForm)
|
||||||
|
|
||||||
|
|
||||||
def validate_valid_thru_date(form, field):
|
def validate_valid_thru_date(form, field):
|
||||||
if form.limit_validity.data: # only check date format if limited validity of token is set
|
if form.limit_validity.data: # only check date format if limited validity of token is set
|
||||||
try:
|
try:
|
||||||
if not field.data >= date.today():
|
if not field.data >= date.today():
|
||||||
raise ValueError
|
raise ValueError
|
||||||
|
@ -67,6 +89,7 @@ def validate_valid_thru_date(form, field):
|
||||||
raise ValidationError
|
raise ValidationError
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
class TokenForm(FlaskForm):
|
class TokenForm(FlaskForm):
|
||||||
name = StringField('Name', validators=[DataRequired()])
|
name = StringField('Name', validators=[DataRequired()])
|
||||||
email = EmailField('E-Mail', validators=[DataRequired()])
|
email = EmailField('E-Mail', validators=[DataRequired()])
|
||||||
|
@ -167,7 +190,7 @@ def edit_token(token):
|
||||||
except Exception:
|
except Exception:
|
||||||
form.valid_thru.data = date.today()
|
form.valid_thru.data = date.today()
|
||||||
|
|
||||||
return render_template('edit.html',token=token, form=form)
|
return render_template('edit.html', token=token, form=form)
|
||||||
else:
|
else:
|
||||||
# flash an error message if the route is accessed with an invalid token
|
# flash an error message if the route is accessed with an invalid token
|
||||||
flash(f'Ausgewaehlter Token {token} in Tokenfile nicht gefunden.')
|
flash(f'Ausgewaehlter Token {token} in Tokenfile nicht gefunden.')
|
||||||
|
@ -189,7 +212,6 @@ def edit_token(token):
|
||||||
return render_template('edit.html', token=token, form=form)
|
return render_template('edit.html', token=token, form=form)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@app.route('/store-token')
|
@app.route('/store-token')
|
||||||
@auth_required()
|
@auth_required()
|
||||||
def store_token():
|
def store_token():
|
||||||
|
@ -221,7 +243,7 @@ def delete_token():
|
||||||
"""
|
"""
|
||||||
token = request.form.get('token')
|
token = request.form.get('token')
|
||||||
tokens = door.get_tokens()
|
tokens = door.get_tokens()
|
||||||
if token in tokens: # check if token exists
|
if token in tokens: # check if token exists
|
||||||
tokens.pop(token)
|
tokens.pop(token)
|
||||||
door.store_tokens(tokens)
|
door.store_tokens(tokens)
|
||||||
return "success"
|
return "success"
|
||||||
|
|
|
@ -1,14 +1,17 @@
|
||||||
import paho.mqtt.client as mqtt
|
import paho.mqtt.client as mqtt
|
||||||
import regex as re
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
class DoorHandle:
|
class DoorHandle:
|
||||||
def __init__(self, host, port=1883):
|
def __init__(self, host, port=1883, token_file='/etc/door_tokens'):
|
||||||
self.state = None
|
self.state = None
|
||||||
self.encoder_position = None
|
self.encoder_position = None
|
||||||
|
|
||||||
self.token_file = 'door_tokens'
|
if not Path(token_file).exists():
|
||||||
self.nfc_logfile = 'nfc.log'
|
raise FileNotFoundError(f"File with door tokens could not be found at {Path(token_file).absolute()}")
|
||||||
|
|
||||||
|
self.token_file = token_file
|
||||||
|
self.last_invalid = {}
|
||||||
|
|
||||||
self.mqtt_client = mqtt.Client()
|
self.mqtt_client = mqtt.Client()
|
||||||
self.mqtt_client.on_connect = self.on_connect
|
self.mqtt_client.on_connect = self.on_connect
|
||||||
|
@ -33,6 +36,9 @@ class DoorHandle:
|
||||||
self.state = msg.payload.decode()
|
self.state = msg.payload.decode()
|
||||||
elif msg.topic == 'door/position/value':
|
elif msg.topic == 'door/position/value':
|
||||||
self.encoder_position = int(msg.payload)
|
self.encoder_position = int(msg.payload)
|
||||||
|
elif msg.topic == 'door/token/last_invalid':
|
||||||
|
timestamp, token = msg.payload.decode().split(";")
|
||||||
|
self.last_invalid = {'timestamp': timestamp, 'token': token}
|
||||||
|
|
||||||
def get_tokens(self):
|
def get_tokens(self):
|
||||||
tokens = {}
|
tokens = {}
|
||||||
|
@ -67,12 +73,6 @@ class DoorHandle:
|
||||||
with open(self.token_file, 'w') as f:
|
with open(self.token_file, 'w') as f:
|
||||||
f.write(output)
|
f.write(output)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def get_most_recent_token(self):
|
def get_most_recent_token(self):
|
||||||
# read last invalid token from logfile
|
# read last invalid token from logfile
|
||||||
with open(self.nfc_logfile) as f:
|
return self.last_invalid
|
||||||
nfc_log = f.read()
|
|
||||||
match = re.search(r"(?P<date>\d{4}-\d{2}-\d{2}) (?P<time>\d{2}:\d{2}:\d{2}) Invalid token: (?P<token>[[:xdigit:]]{14})",
|
|
||||||
nfc_log, re.REVERSE)
|
|
||||||
return match
|
|
||||||
|
|
|
@ -1,2 +1,5 @@
|
||||||
flask
|
flask~=1.1.2
|
||||||
Flask-Security-Too
|
Flask-Security-Too~=4.0.0
|
||||||
|
WTForms~=2.3.3
|
||||||
|
paho-mqtt~=1.5.1
|
||||||
|
bleach~=3.3.0
|
|
@ -18,7 +18,7 @@
|
||||||
{% endwith %}
|
{% endwith %}
|
||||||
{% if token is not none %}
|
{% if token is not none %}
|
||||||
Letzter unregistrierter Token: {{ token['token'] }} <br>
|
Letzter unregistrierter Token: {{ token['token'] }} <br>
|
||||||
Gelesen am {{ token['date'] }} um {{ token['time'] }}
|
Gelesen: {{ token['timestamp']}}
|
||||||
|
|
||||||
<div>
|
<div>
|
||||||
<p>
|
<p>
|
||||||
|
|
Loading…
Reference in New Issue
Block a user