From 3082eebc8dd0ee00bc2a25596ce1feedf66ad791 Mon Sep 17 00:00:00 2001 From: Simon Pirkelmann Date: Tue, 7 Sep 2021 22:31:28 +0200 Subject: [PATCH] implemented blocking events to make it possible to wait for the robot to reach the desired target position --- remote_control/control_commander.py | 28 ++++++++++----- remote_control/event_listener.py | 3 ++ remote_control/measurement_server.py | 53 +++++++++++++++++++++++----- 3 files changed, 66 insertions(+), 18 deletions(-) diff --git a/remote_control/control_commander.py b/remote_control/control_commander.py index b3d4e15..c0eda01 100644 --- a/remote_control/control_commander.py +++ b/remote_control/control_commander.py @@ -7,7 +7,7 @@ from mpc_controller import MPCController from event_listener import EventListener -class CommanderBase: +class ControlCommander: valid_controller_types = {'pid': PIDController, 'mpc': MPCController} @@ -26,17 +26,17 @@ class CommanderBase: if type(controller_type) == dict: for id, ctype in controller_type.items(): - if ctype in CommanderBase.valid_controller_types: - self.robots[id].attach_controller(CommanderBase.valid_controller_types[ctype]()) + if ctype in ControlCommander.valid_controller_types: + self.robots[id].attach_controller(ControlCommander.valid_controller_types[ctype]()) else: raise Exception(f"invalid controller type {ctype} specified for robot {id}. " - f"valid controller types are {list(CommanderBase.valid_controller_types.keys())}") - elif controller_type in CommanderBase.valid_controller_types: + f"valid controller types are {list(ControlCommander.valid_controller_types.keys())}") + elif controller_type in ControlCommander.valid_controller_types: for id, r in self.robots.items(): - r.attach_controller(CommanderBase.valid_controller_types[controller_type]()) + r.attach_controller(ControlCommander.valid_controller_types[controller_type]()) else: raise Exception(f"invalid controller type {controller_type} specified. valid controller types are " - f"{list(CommanderBase.valid_controller_types.keys())}") + f"{list(ControlCommander.valid_controller_types.keys())}") self.event_listener = EventListener(event_server=('127.0.0.1', 42424)) @@ -68,11 +68,21 @@ class CommanderBase: def handle_event(self, event): # handle events from opencv window print("event: ", event) - if event[0] == 'click': + if event[0] == 'click': # non-blocking move to target pos target = event[1] target_pos = np.array([target['x'], target['y'], target['angle']]) controlled_robot_id = list(self.robots.keys())[self.current_robot_index] self.robots[controlled_robot_id].move_to_pos(target_pos) + elif event[0] == 'move_blocking': # blocking move to specified target position + target = event[1] + target_pos = np.array([target['x'], target['y'], target['angle']]) + controlled_robot_id = list(self.robots.keys())[self.current_robot_index] + + # initialize move and wait for the robot to reach the target position + self.robots[controlled_robot_id].move_to_pos(target_pos, blocking=True) + #time.sleep(0.5) + # send confirmation to the server which initiated the command + self.event_listener.send_reply("ack\n".encode()) elif event[0] == 'key': key = event[1] if key == 16777235: # arrow up @@ -109,5 +119,5 @@ if __name__ == '__main__': # controller_type = {12: 'mpc', 13: 'pid'} controller_type = 'pid' - rc = CommanderBase(id_ip_dict, controller_type=controller_type) + rc = ControlCommander(id_ip_dict, controller_type=controller_type) rc.run() diff --git a/remote_control/event_listener.py b/remote_control/event_listener.py index d423fc4..31481e9 100644 --- a/remote_control/event_listener.py +++ b/remote_control/event_listener.py @@ -34,6 +34,9 @@ class EventListener: except ConnectionRefusedError: print(f"error: could not connect to event server at {event_server}.") + def send_reply(self, data): + self.event_socket.sendall(data) + def receive_events(self): self.receiving = True while self.receiving: diff --git a/remote_control/measurement_server.py b/remote_control/measurement_server.py index da3c462..7c679dc 100644 --- a/remote_control/measurement_server.py +++ b/remote_control/measurement_server.py @@ -2,6 +2,7 @@ import socketserver import threading import time import json +from queue import Queue from aruco_estimator import ArucoEstimator @@ -15,11 +16,23 @@ class MeasurementHandler(socketserver.BaseRequestHandler): if 'events' in data.decode(): self.request.sendall('subscribed to events\n'.encode()) - # send input events + # send any events in the event queue to the subscriber while True: while not self.server.estimator.event_queue.empty(): event = self.server.estimator.event_queue.get() - self.request.sendall((json.dumps(event) + '\n').encode()) + # we distinguish two kinds of events: + if event[0] == 'response_event': + # 1. for 'response_event' events we expect the event subscriber to give a reply which will then + # by passed on to the response queue for transmitting to the original correspondent + message = event[1]['event'] + #print(f"passing command {message} on to subscriber") + self.request.sendall((json.dumps(message) + '\n').encode()) + reply = self.request.recv(1024) + #print(f"putting reply {reply} in response queue") + self.server.response_queue.put(reply) + else: + # 2. for other types of events we don't expect a reply and just pass them on to the subscriber + self.request.sendall((json.dumps(event) + '\n').encode()) self.server.estimator.last_event = None time.sleep(1.0 / self.server.max_measurements_per_second) elif 'corners' in data.decode(): @@ -29,14 +42,34 @@ class MeasurementHandler(socketserver.BaseRequestHandler): for corner, data in corner_estimates.items(): response[corner] = {'x': data['x'], 'y': data['y']} self.request.sendall((json.dumps(response) + '\n').encode()) - elif 'move_grid' in data.decode(): + elif 'move_grid_blocking' in data.decode(): + # if we receive a move_grid event + # ( e.g. move_grid;{"x":1,"y":1,"dimx":10,"dimy":10,"orientation":"^","require_response":True} ) + # we compute the corresponding real-world position the robot should drive to + # and then create a new move event which is put in the event queue and will be propagated to the ControlCommander data_decode = data.decode() - print("data: ", data_decode) +# print("data: ", data_decode) payload = data.decode().split(';')[1] grid_pos = json.loads(payload) - print("grid_pos = ", grid_pos) - pos = self.server.estimator.get_pos_from_grid_point(grid_pos['x'], grid_pos['y'], grid_pos['orientation']) - self.server.estimator.event_queue.put(('click', {'x': pos[0], 'y': pos[1], 'angle': pos[2]})) +# print("grid_pos = ", grid_pos) + pos = self.server.estimator.get_pos_from_grid_point(grid_pos['x'], grid_pos['y'], grid_pos['dimx'], + grid_pos['dimy'], grid_pos['orientation']) +# print("pos = ", pos) +# print("event requiring response") + # put blocking move command in event queue + self.server.estimator.event_queue.put(('response_event', + {'event': ('move_blocking', {'x': pos[0], 'y': pos[1], 'angle': pos[2]})})) + + # wait for response of the move command + # TODO this assumes that we wait only for at most one response at a time + # we could add some kind of reference here to handle multiple responses (e.g. id of the response to wait for) + while self.server.response_queue.empty(): + pass + reply = self.server.response_queue.get() + + # send back response to the original source + #print(f"sending reply {reply} back to correspondent {self.request}") + self.request.sendall(reply) else: # send robot position try: @@ -65,16 +98,18 @@ class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer): super().__init__(server_address, RequestHandlerClass) self.estimator = estimator self.max_measurements_per_second = max_measurements_per_second + self.response_queue = Queue() def handle_error(self, request, client_address): print("an error occurred -> terminating connection") if __name__ == "__main__": - aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[12, 13]) + aruco_estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[12, 13]) # first we start thread for the measurement server - measurement_server = MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator, max_measurements_per_second=30) + measurement_server = MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator, + max_measurements_per_second=30) server_thread = threading.Thread(target=measurement_server.serve_forever) server_thread.start()