RoboRally/remote_control/measurement_server.py

47 lines
1.9 KiB
Python

import socketserver
import threading
import time
from aruco_estimator import ArucoEstimator
class MeasurementHandler(socketserver.BaseRequestHandler):
def handle(self) -> None:
data = self.request[0]
socket = self.request[1]
cur_thread = threading.current_thread()
print(f"current thread {cur_thread}")
try:
marker_id = int(data)
if marker_id in self.server.estimator.robot_marker_estimates:
while True:
socket.sendto(f"{self.server.estimator.robot_marker_estimates[marker_id]}\n".encode(),
self.client_address)
time.sleep(1.0 / self.server.max_measurements_per_second)
else:
socket.sendto("error: unknown robot marker id\n".encode(),
self.client_address)
except ValueError:
socket.sendto("error: data not understood. expected robot marker id (int)\n".encode(), self.client_address)
return
class MeasurementServer(socketserver.ThreadingMixIn, socketserver.UDPServer):
def __init__(self, server_address, RequestHandlerClass, estimator, max_measurements_per_second=30):
super().__init__(server_address, RequestHandlerClass)
self.estimator = estimator
self.max_measurements_per_second = max_measurements_per_second
if __name__ == "__main__":
aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14])
estimator_thread = threading.Thread(target=aruco_estimator.run_tracking, kwargs={'invert_grayscale': True})
estimator_thread.start()
with MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator,
max_measurements_per_second=30) as measurement_server:
measurement_server.serve_forever()
# receive with: nc 127.0.0.1 42424 -u -> 15 + Enter