From eff5474ac29330df0263e7e4eaf42c4e6af0f284 Mon Sep 17 00:00:00 2001 From: Simon Pirkelmann Date: Wed, 11 Nov 2020 21:33:48 +0100 Subject: [PATCH] switched to using TCP sockets for measurement and event communication --- remote_control/event_listener.py | 47 ++++++++++++++++++++++ remote_control/measurement_client.py | 14 ++++--- remote_control/measurement_server.py | 49 +++++++++++++++-------- remote_control/robot.py | 59 ++++++++++++++++++++++++++-- 4 files changed, 142 insertions(+), 27 deletions(-) create mode 100644 remote_control/event_listener.py diff --git a/remote_control/event_listener.py b/remote_control/event_listener.py new file mode 100644 index 0000000..8b2e92f --- /dev/null +++ b/remote_control/event_listener.py @@ -0,0 +1,47 @@ +import socket +import threading +import queue +import json + + +class EventListener: + def __init__(self, event_server): + self.event_thread = threading.Thread(target=self.receive_events) + self.event_queue = queue.Queue() + + # connect to event server + print(f"connecting to event server on {event_server} ...") + self.event_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP socket + + try: + self.event_socket.connect(event_server) + self.event_socket.sendall(f"events\n".encode()) + + self.event_socket.settimeout(0.1) + # check if we receive data from the event server + response = self.event_socket.recv(1024) + if not 'error' in str(response): + print("... connected! -> start listening for events") + self.event_socket.settimeout(None) + # if so we start the event thread + self.event_thread.start() + else: + print(f"error: cannot communicate with the event server.\n The response was: {response}") + except socket.timeout: + print(f"error: the event server did not respond.") + except ConnectionRefusedError: + print(f"error: could not connect to event server at {event_server}.") + + def receive_events(self): + receiving = True + while receiving: + received = str(self.event_socket.recv(1024), "utf-8") + if len(received) > 0: + events = received.split('\n') + for event_json in events: + if len(event_json) > 0: + event = json.loads(event_json) + self.event_queue.put(event) + else: + receiving = False + print("event server seems to have shut down -> stop listening") \ No newline at end of file diff --git a/remote_control/measurement_client.py b/remote_control/measurement_client.py index d3c136d..4b466c0 100644 --- a/remote_control/measurement_client.py +++ b/remote_control/measurement_client.py @@ -2,12 +2,14 @@ import socket HOST, PORT = "localhost", 42424 -robot_id = 11 +robot_id = 12 -# SOCK_DGRAM is the socket type to use for UDP sockets -sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - -sock.sendto(f"{robot_id}\n".encode(), (HOST, PORT)) -while True: +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sock.connect((HOST, PORT)) +sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id +#sock.sendall(f"events\n".encode()) # request events +receiving = True +while receiving: received = str(sock.recv(1024), "utf-8") print("Received: {}".format(received)) + receiving = len(received) > 0 diff --git a/remote_control/measurement_server.py b/remote_control/measurement_server.py index 20bc168..25e6f16 100644 --- a/remote_control/measurement_server.py +++ b/remote_control/measurement_server.py @@ -1,41 +1,56 @@ import socketserver import threading import time +import json from aruco_estimator import ArucoEstimator class MeasurementHandler(socketserver.BaseRequestHandler): def handle(self) -> None: - data = self.request[0] - socket = self.request[1] + self.data = self.request.recv(1024).strip() + cur_thread = threading.current_thread() print(f"current thread {cur_thread}") - try: - marker_id = int(data) - if marker_id in self.server.estimator.robot_marker_estimates: - while True: - socket.sendto(f"{self.server.estimator.robot_marker_estimates[marker_id]}\n".encode(), - self.client_address) - time.sleep(1.0 / self.server.max_measurements_per_second) - else: - socket.sendto("error: unknown robot marker id\n".encode(), - self.client_address) - except ValueError: - socket.sendto("error: data not understood. expected robot marker id (int)\n".encode(), self.client_address) + if 'events' in self.data.decode(): + self.request.sendall('subscribed to events\n'.encode()) + # send input events + while True: + while not self.server.estimator.event_queue.empty(): + event = self.server.estimator.event_queue.get() + self.request.sendall((json.dumps(event) + '\n').encode()) + else: + # send robot position + try: + marker_id = int(self.data) + except ValueError: + marker_id = None + + if marker_id is not None: + if marker_id in self.server.estimator.robot_marker_estimates: + while True: + self.request.sendall((json.dumps(self.server.estimator.robot_marker_estimates[marker_id]) + '\n').encode()) + time.sleep(1.0 / self.server.max_measurements_per_second) + else: + self.request.sendall("error: unknown robot marker id\n".encode()) + else: + self.request.sendall("error: data not understood. expected or 'events'\n".encode()) return -class MeasurementServer(socketserver.ThreadingMixIn, socketserver.UDPServer): +class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + allow_reuse_address = True def __init__(self, server_address, RequestHandlerClass, estimator, max_measurements_per_second=30): super().__init__(server_address, RequestHandlerClass) self.estimator = estimator self.max_measurements_per_second = max_measurements_per_second + def handle_error(self, request, client_address): + print("an error occurred -> terminating connection") if __name__ == "__main__": - aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14]) + aruco_estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[11, 12, 13, 14]) estimator_thread = threading.Thread(target=aruco_estimator.run_tracking, kwargs={'invert_grayscale': True}) estimator_thread.start() @@ -43,4 +58,4 @@ if __name__ == "__main__": max_measurements_per_second=30) as measurement_server: measurement_server.serve_forever() - # receive with: nc 127.0.0.1 42424 -u -> 15 + Enter + # receive with: nc 127.0.0.1 42424 -> 12 + Enter diff --git a/remote_control/robot.py b/remote_control/robot.py index 7461202..24d4e5d 100644 --- a/remote_control/robot.py +++ b/remote_control/robot.py @@ -1,10 +1,16 @@ import socket +import threading +import json + class Robot: - def __init__(self, marker_id, ip): + def __init__(self, marker_id, ip, measurement_server=('127.0.0.1', 42424)): self.id = marker_id - self.pos = None - self.euler = None + + self.t_last_measurement = None + self.x = None + self.y = None + self.angle = None self.ip = ip self.socket = socket.socket() @@ -15,6 +21,11 @@ class Robot: self.connected = False + self.measurement_server = measurement_server + self.measurement_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP socket + self.measurement_thread = threading.Thread(target=self.receive_measurements) + + def connect(self): # connect to robot try: @@ -23,7 +34,30 @@ class Robot: print("connected!") self.connected = True except socket.error: - print("could not connect to robot {} with ip {}".format(self.id, self.ip)) + print("error: could not connect to robot {} with ip {}".format(self.id, self.ip)) + + # connect to measurement server + print(f"connecting to measurement server on {self.measurement_server} ...") + + try: + self.measurement_socket.connect(self.measurement_server) + self.measurement_socket.sendall(f"{self.id}\n".encode()) + + self.measurement_socket.settimeout(0.1) + # check if we receive data from the measurement server + response = self.measurement_socket.recv(1024) + if not 'error' in str(response): + print("... connected! -> start listening for events") + self.measurement_socket.settimeout(None) + # if so we start the measurement thread + self.measurement_thread.start() + else: + print(f"error: cannot communicate with the measurement server.\n The response was: {response}") + except socket.timeout: + print(f"error: the measurement server did not respond with data.") + except ConnectionRefusedError: + print(f"error: could not connect to measurement server at {self.measurement_server}.") + def send_cmd(self, u1=0.0, u2=0.0): if self.socket: @@ -35,3 +69,20 @@ class Robot: except ConnectionResetError: print(f"error: connection to robot {self.id} with ip {self.ip} lost") pass + + def receive_measurements(self): + receiving = True + while receiving: + received = str(self.measurement_socket.recv(1024), "utf-8") + if len(received) > 0: + measurement = json.loads(received) + self.t_last_measurement = measurement['t'] + self.x = measurement['x'] + self.y = measurement['y'] + self.angle = measurement['angle'] + else: + receiving = False + print(f"measurement server stopped sending data for robot {self.id}") + + def get_measurement(self): + return (self.t_last_measurement, self.x, self.y, self.angle)