import os, serial, socket, subprocess, select, datetime, sys import argparse parser = argparse.ArgumentParser() parser.add_argument("--serial_port", default="/dev/serial/by-id/usb-Imaginaerraum.de_DoorControl_43363220195053573A002C0-if01") parser.add_argument("--nfc_fifo", default="/tmp/nfc_fifo") parser.add_argument("--control_socket", default="/tmp/nfc.sock") parser.add_argument("--valid_tokens", default="/etc/door_tokens") parser.add_argument("--log_file", default="/tmp/nfc.log") parser.add_argument("--state_timeout", type=float, default=10) parser.add_argument("--repeat_time", type=float, default=5) config = parser.parse_args() # Log data to stdout and the log file def log(*args): data = "%s %s" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " ".join(i for i in args)) lgf.write(data + "\n") lgf.flush() print(data) # Opens the socket that can control the daemon # Commands are TBD def open_control_socket(): global control_socket global config global comm_channels if os.path.exists(config.control_socket): os.unlink(config.control_socket) control_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) control_socket.bind(config.control_socket) control_socket.listen(5) comm_channels = [] # Opens the serial port to talk to the lock actuator # Might return a failure. In that case, it will be tried again later def open_serial_port(): global serial_port global config try: serial_port = serial.Serial(config.serial_port, timeout=2) except: serial_port = None pass # Detected tokens are passed in through a FIFO def open_nfc_fifo(): global nfc_fifo global config nfc_fifo = open(config.nfc_fifo, "r") # Refresh valid tokens from the configured file def read_valid_tokens(): global valid global config try: valid = [ s.strip() for s in open(config.valid_tokens, "r").readlines() ] except: valid = [] log("No valid tokens") # Opens the log file for writing def open_logfile(): global lgf global config lgf = open(config.log_file, "a+") # Actions IDLE, CLOSE, OPEN_THEN_CLOSE, OPEN, CLOSE_THEN_OPEN = range(5) # Current door state state = None # Current target action action = CLOSE # Start time of the current action start_time = datetime.datetime.now() # How often the action was repeated repeats = 0 # Read the state of the door from the serial port def poll_door_state(): global state global serial_port serial_port.reset_input_buffer() serial_port.write(b"R") data = serial_port.readline().strip().split() if data[0] == b'pos:': data = int(data[1]) changed = False if data < 80 and door_state != OPEN: door_state = OPEN changed = True elif data > 100 and door_state != CLOSE: door_state = CLOSE changed = True else: print(data) return door_state # Check the door state and send commands through a state machine def handle_door_state(): global action, target, serial_port, start_time, repeats state_names = { OPEN: "open", CLOSE: "close" } # Commands associated with each target state target_state_cmd = { OPEN: b'O', CLOSE: b'C' } old_state = state # If no serial port, try to open it or return if not serial_port: try: open_serial_port() except: return # Get new state poll_door_state() # Idle + change = key? if target == IDLE: if state != old_state: log("Door changed unexpectedly:", state_names[state]) return # Target state, next action, timeout action actions = { OPEN: (OPEN, IDLE, CLOSE_THEN_OPEN ), CLOSE: (CLOSE, IDLE, OPEN_THEN_CLOSE ), OPEN_THEN_CLOSE: (OPEN, CLOSE, CLOSE ), CLOSE_THEN_OPEN: (CLOSE, OPEN, OPEN ) } # Target state reached if state == actions[action][0]: # Select next action, reset start time and repetitions action = actions[action][1] start_time = datetime.datetime.now() repeats = 0 # On idle, we're done if action == IDLE: log("Reached target position:", state_names[state]) return # Execution time t = (datetime.datetime.now() - start_time) if t >= config.state_timeout: # Timeout -> switch to timeout action action = actions[action][2] start_time = datetime.datetime.now() repeats = 0 log("Timeout. Switching to", action) elif t >= (1 + repeats) * config.repeat_time: # Repeat every couple of seconds repeats += 1 serial_port.write(target_state_cmd[actions[action][0]]) log("Repeating command:", target_state_cmd[actions[action][0]]) def open_door(): global action log("Opening the door") action = OPEN def close_door(): global action log("Closing the door") action = CLOSE def toggle_door_state(): global state, action if state == CLOSE: open_door() else: close_door() def handle_nfc_token(): global valid token = nfc_fifo.readline() if token == "": open_nfc_fifo() token = token.strip() if token in valid: log("Valid token:", token) toggle_door_state() else: log("Invalid token:", token) open_logfile() read_valid_tokens() open_nfc_fifo() open_serial_port() open_control_socket() while True: readable = select.select([ nfc_fifo, control_socket ] + comm_channels, [], [], 0.5)[0] for c in readable: if c == nfc_fifo: handle_nfc_token() elif c == control_socket: comm_channels.append(control_socket.accept()[0]) else: data = c.recv(1024) if len(data) == 0: comm_channels.remove(c) else: print("Got comms data: ", str(c.recv(1024))) c.sendall(b':)\n')