diff --git a/remote_control/measurement_server.py b/remote_control/measurement_server.py index 7c679dc..44f8e7d 100644 --- a/remote_control/measurement_server.py +++ b/remote_control/measurement_server.py @@ -12,30 +12,38 @@ class MeasurementHandler(socketserver.BaseRequestHandler): data = self.request.recv(1024).strip() cur_thread = threading.current_thread() - print(f"current thread {cur_thread}") + print(f"start current thread {cur_thread}") if 'events' in data.decode(): + print(f"{cur_thread} subscribed to events") self.request.sendall('subscribed to events\n'.encode()) # send any events in the event queue to the subscriber - while True: - while not self.server.estimator.event_queue.empty(): - event = self.server.estimator.event_queue.get() - # we distinguish two kinds of events: - if event[0] == 'response_event': - # 1. for 'response_event' events we expect the event subscriber to give a reply which will then - # by passed on to the response queue for transmitting to the original correspondent - message = event[1]['event'] - #print(f"passing command {message} on to subscriber") - self.request.sendall((json.dumps(message) + '\n').encode()) - reply = self.request.recv(1024) - #print(f"putting reply {reply} in response queue") - self.server.response_queue.put(reply) - else: - # 2. for other types of events we don't expect a reply and just pass them on to the subscriber - self.request.sendall((json.dumps(event) + '\n').encode()) - self.server.estimator.last_event = None - time.sleep(1.0 / self.server.max_measurements_per_second) + event_loop_running = True + while event_loop_running: + try: + while not self.server.estimator.event_queue.empty(): + event = self.server.estimator.event_queue.get() + # we distinguish two kinds of events: + if event[0] == 'response_event': + # 1. for 'response_event' events we expect the event subscriber to give a reply which will then + # by passed on to the response queue for transmitting to the original correspondent + message = event[1]['event'] + #print(f"passing command {message} on to subscriber") + self.request.sendall((json.dumps(message) + '\n').encode()) + reply = self.request.recv(1024) + #print(f"putting reply {reply} in response queue") + self.server.response_queue.put(reply) + else: + # 2. for other types of events we don't expect a reply and just pass them on to the subscriber + self.request.sendall((json.dumps(event) + '\n').encode()) + self.server.estimator.last_event = None + time.sleep(1.0 / self.server.max_measurements_per_second) + except Exception as e: + print(f"exception in event loop: {e}") + event_loop_running = False + print("after event loop") elif 'corners' in data.decode(): + print(f"{cur_thread} subscribed to corners") # send positions of the board markers corner_estimates = self.server.estimator.corner_estimates response = {} @@ -43,19 +51,23 @@ class MeasurementHandler(socketserver.BaseRequestHandler): response[corner] = {'x': data['x'], 'y': data['y']} self.request.sendall((json.dumps(response) + '\n').encode()) elif 'move_grid_blocking' in data.decode(): + print(f"{cur_thread} subscribed move_grid_blocking") + # if we receive a move_grid event - # ( e.g. move_grid;{"x":1,"y":1,"dimx":10,"dimy":10,"orientation":"^","require_response":True} ) + # ( e.g. move_grid_blocking;{"x":1,"y":1,"dimx":10,"dimy":10,"orientation":"^"} ) + #move_grid_blocking;{"x": 1, "y": 2, "dimx": 7, "dimy": 4, "orientation": ">"} # we compute the corresponding real-world position the robot should drive to # and then create a new move event which is put in the event queue and will be propagated to the ControlCommander data_decode = data.decode() -# print("data: ", data_decode) - payload = data.decode().split(';')[1] + #print("data: ", data_decode) + payload = data_decode.split(';')[1] + #print("payload: ", payload) grid_pos = json.loads(payload) -# print("grid_pos = ", grid_pos) + #print("grid_pos = ", grid_pos) pos = self.server.estimator.get_pos_from_grid_point(grid_pos['x'], grid_pos['y'], grid_pos['dimx'], grid_pos['dimy'], grid_pos['orientation']) -# print("pos = ", pos) -# print("event requiring response") + #print("pos = ", pos) + #print("event requiring response") # put blocking move command in event queue self.server.estimator.event_queue.put(('response_event', {'event': ('move_blocking', {'x': pos[0], 'y': pos[1], 'angle': pos[2]})})) @@ -64,13 +76,15 @@ class MeasurementHandler(socketserver.BaseRequestHandler): # TODO this assumes that we wait only for at most one response at a time # we could add some kind of reference here to handle multiple responses (e.g. id of the response to wait for) while self.server.response_queue.empty(): + time.sleep(0.1) pass reply = self.server.response_queue.get() # send back response to the original source - #print(f"sending reply {reply} back to correspondent {self.request}") + print(f"sending reply {reply} back to correspondent {self.request}") self.request.sendall(reply) else: + print(f"{cur_thread} subscribed to robot position") # send robot position try: marker_id = int(data) @@ -79,15 +93,22 @@ class MeasurementHandler(socketserver.BaseRequestHandler): if marker_id is not None: if marker_id in self.server.estimator.robot_marker_estimates: - while True: - self.request.sendall((json.dumps(self.server.estimator.robot_marker_estimates[marker_id]) + marker_loop_running = True + while marker_loop_running: + try: + self.request.sendall((json.dumps(self.server.estimator.robot_marker_estimates[marker_id]) + '\n').encode()) - time.sleep(1.0 / self.server.max_measurements_per_second) + time.sleep(1.0 / self.server.max_measurements_per_second) + except Exception as e: + print(f"exception in marker loop: {e}") + marker_loop_running = False else: self.request.sendall("error: unknown robot marker id\n".encode()) else: self.request.sendall("error: data not understood. " "expected or 'events'\n".encode()) + + print(f"end current thread {cur_thread}") return @@ -105,7 +126,7 @@ class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer): if __name__ == "__main__": - aruco_estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[12, 13]) + aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[12, 13]) # first we start thread for the measurement server measurement_server = MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator,