import socketserver import threading import time import json from aruco_estimator import ArucoEstimator class MeasurementHandler(socketserver.BaseRequestHandler): def handle(self) -> None: self.data = self.request.recv(1024).strip() cur_thread = threading.current_thread() print(f"current thread {cur_thread}") if 'events' in self.data.decode(): self.request.sendall('subscribed to events\n'.encode()) # send input events while True: while not self.server.estimator.event_queue.empty(): event = self.server.estimator.event_queue.get() self.request.sendall((json.dumps(event) + '\n').encode()) else: # send robot position try: marker_id = int(self.data) except ValueError: marker_id = None if marker_id is not None: if marker_id in self.server.estimator.robot_marker_estimates: while True: self.request.sendall((json.dumps(self.server.estimator.robot_marker_estimates[marker_id]) + '\n').encode()) time.sleep(1.0 / self.server.max_measurements_per_second) else: self.request.sendall("error: unknown robot marker id\n".encode()) else: self.request.sendall("error: data not understood. expected or 'events'\n".encode()) return class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer): allow_reuse_address = True def __init__(self, server_address, RequestHandlerClass, estimator, max_measurements_per_second=30): super().__init__(server_address, RequestHandlerClass) self.estimator = estimator self.max_measurements_per_second = max_measurements_per_second def handle_error(self, request, client_address): print("an error occurred -> terminating connection") if __name__ == "__main__": aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14]) estimator_thread = threading.Thread(target=aruco_estimator.run_tracking, kwargs={'invert_grayscale': True}) estimator_thread.start() with MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator, max_measurements_per_second=30) as measurement_server: measurement_server.serve_forever() # receive with: nc 127.0.0.1 42424 -> 12 + Enter