import socketserver import threading import time import json from queue import Queue from aruco_estimator import ArucoEstimator class MeasurementHandler(socketserver.BaseRequestHandler): def handle(self) -> None: data = self.request.recv(1024).strip() cur_thread = threading.current_thread() print(f"current thread {cur_thread}") if 'events' in data.decode(): self.request.sendall('subscribed to events\n'.encode()) # send any events in the event queue to the subscriber while True: while not self.server.estimator.event_queue.empty(): event = self.server.estimator.event_queue.get() # we distinguish two kinds of events: if event[0] == 'response_event': # 1. for 'response_event' events we expect the event subscriber to give a reply which will then # by passed on to the response queue for transmitting to the original correspondent message = event[1]['event'] #print(f"passing command {message} on to subscriber") self.request.sendall((json.dumps(message) + '\n').encode()) reply = self.request.recv(1024) #print(f"putting reply {reply} in response queue") self.server.response_queue.put(reply) else: # 2. for other types of events we don't expect a reply and just pass them on to the subscriber self.request.sendall((json.dumps(event) + '\n').encode()) self.server.estimator.last_event = None time.sleep(1.0 / self.server.max_measurements_per_second) elif 'corners' in data.decode(): # send positions of the board markers corner_estimates = self.server.estimator.corner_estimates response = {} for corner, data in corner_estimates.items(): response[corner] = {'x': data['x'], 'y': data['y']} self.request.sendall((json.dumps(response) + '\n').encode()) elif 'move_grid_blocking' in data.decode(): # if we receive a move_grid event # ( e.g. move_grid;{"x":1,"y":1,"dimx":10,"dimy":10,"orientation":"^","require_response":True} ) # we compute the corresponding real-world position the robot should drive to # and then create a new move event which is put in the event queue and will be propagated to the ControlCommander data_decode = data.decode() # print("data: ", data_decode) payload = data.decode().split(';')[1] grid_pos = json.loads(payload) # print("grid_pos = ", grid_pos) pos = self.server.estimator.get_pos_from_grid_point(grid_pos['x'], grid_pos['y'], grid_pos['dimx'], grid_pos['dimy'], grid_pos['orientation']) # print("pos = ", pos) # print("event requiring response") # put blocking move command in event queue self.server.estimator.event_queue.put(('response_event', {'event': ('move_blocking', {'x': pos[0], 'y': pos[1], 'angle': pos[2]})})) # wait for response of the move command # TODO this assumes that we wait only for at most one response at a time # we could add some kind of reference here to handle multiple responses (e.g. id of the response to wait for) while self.server.response_queue.empty(): pass reply = self.server.response_queue.get() # send back response to the original source #print(f"sending reply {reply} back to correspondent {self.request}") self.request.sendall(reply) else: # send robot position try: marker_id = int(data) except ValueError: marker_id = None if marker_id is not None: if marker_id in self.server.estimator.robot_marker_estimates: while True: self.request.sendall((json.dumps(self.server.estimator.robot_marker_estimates[marker_id]) + '\n').encode()) time.sleep(1.0 / self.server.max_measurements_per_second) else: self.request.sendall("error: unknown robot marker id\n".encode()) else: self.request.sendall("error: data not understood. " "expected or 'events'\n".encode()) return class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer): allow_reuse_address = True def __init__(self, server_address, RequestHandlerClass, estimator, max_measurements_per_second=30): super().__init__(server_address, RequestHandlerClass) self.estimator = estimator self.max_measurements_per_second = max_measurements_per_second self.response_queue = Queue() def handle_error(self, request, client_address): print("an error occurred -> terminating connection") if __name__ == "__main__": aruco_estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[12, 13]) # first we start thread for the measurement server measurement_server = MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator, max_measurements_per_second=30) server_thread = threading.Thread(target=measurement_server.serve_forever) server_thread.start() # now we start the Aruco estimator GUI aruco_estimator.process_frame() import sys from pyqtgraph.Qt import QtCore, QtGui if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'): QtGui.QApplication.instance().exec_() #with MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator, # max_measurements_per_second=30) as measurement_server: # measurement_server.serve_forever() # receive with: nc 127.0.0.1 42424 -> 12 + Enter