from flask import Flask, render_template, request import socket import threading import time import numpy as np from robot import Robot from mpc_controller import MPCController from aruco_estimator import ArucoEstimator from shapely.geometry import LineString width = 600 height = 400 robot_ids = [11, 12, 13, 14] robot_ids_control_history = [11, 12, 13, 14] app = Flask(__name__) def calc_target_pos(px, py): if rc.estimator.all_corners_detected(): # compute column line a = np.array([rc.estimator.corner_estimates['a']['x'], rc.estimator.corner_estimates['a']['y']]) b = np.array([rc.estimator.corner_estimates['b']['x'], rc.estimator.corner_estimates['b']['y']]) c = np.array([rc.estimator.corner_estimates['c']['x'], rc.estimator.corner_estimates['c']['y']]) d = np.array([rc.estimator.corner_estimates['d']['x'], rc.estimator.corner_estimates['d']['y']]) x_frac = (px + 0.5) / width y_frac = (py + 0.5) / height vab = b - a vdc = c - d column_line_top = a + x_frac * vab column_line_bottom = d + x_frac * vdc vad = d - a vbc = c - b row_line_top = a + y_frac * vad row_line_bottom = b + y_frac * vbc column_line = LineString([column_line_top, column_line_bottom]) row_line = LineString([row_line_top, row_line_bottom]) int_pt = column_line.intersection(row_line) point_of_intersection = np.array([int_pt.x, int_pt.y, 0.0]) return point_of_intersection @app.route('/', methods=('GET', 'POST')) def presence(): global robot_ids_control_history if request.method == 'GET': return render_template('telepresence.html', oldest_control=robot_ids.index(robot_ids_control_history[0])) elif request.method == 'POST': x = int(float(request.form.get('x'))) y = int(float(request.form.get('y'))) print(x, y) margin = 0.1 x = min(max(width * margin, x), width * (1 - margin)) y = min(max(height * margin, y), height * (1 - margin)) id = int(request.form.get('robot')) robot_ids_control_history.append(robot_ids_control_history.pop(robot_ids_control_history.index(id))) target_pos = calc_target_pos(x, y) rc.estimator.event_queue.put(('robot', id, target_pos)) return 'OK' class RemoteController: def __init__(self): self.robots = [] #self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')] self.robots = [Robot(11, '10.10.11.88'), Robot(12, '10.10.11.91'), Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')] self.robot_ids = {} for r in self.robots: self.robot_ids[r.id] = r # socket for movement commands self.comm_socket = socket.socket() self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) for robot in self.robots: robot.connect() self.comm_socket.bind(('', 1337)) self.comm_socket.listen(5) self.t = time.time() # start thread for marker position detection self.estimator = ArucoEstimator(self.robot_ids.keys()) self.estimator_thread = threading.Thread(target=self.estimator.run_tracking, kwargs={'draw_markers':False}) self.estimator_thread.start() self.controller = MPCController(self.estimator) def run(self): print("waiting until all markers are detected...") while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()): pass print("starting control") self.controller.interactive_control(robots=self.robots) if __name__ == "__main__": rc = RemoteController() rc_thread = threading.Thread(target=rc.run) rc_thread.start() app.run('0.0.0.0')