import numpy as np import time from robot import ControlledRobot from pid_controller import PIDController from mpc_controller import MPCController from event_listener import EventListener class ControlCommander: valid_controller_types = {'pid': PIDController, 'mpc': MPCController} def __init__(self, id_ip_dict, controller_type='pid'): """ :param id_ip_dict: dictionary containing robot marker id and ip of the robot, e.g. { 12: '10.10.11.91' } :param controller_type: string 'pid', 'mpc'; or dictionary with robot id and controller type string, e.g. { 12: 'mpc', 13: 'pid'} """ self.robots = {} for id, ip in id_ip_dict.items(): self.robots[id] = ControlledRobot(id, ip) for id, r in self.robots.items(): r.connect() if type(controller_type) == dict: for id, ctype in controller_type.items(): if ctype in ControlCommander.valid_controller_types: self.robots[id].attach_controller(ControlCommander.valid_controller_types[ctype]()) else: raise Exception(f"invalid controller type {ctype} specified for robot {id}. " f"valid controller types are {list(ControlCommander.valid_controller_types.keys())}") elif controller_type in ControlCommander.valid_controller_types: for id, r in self.robots.items(): r.attach_controller(ControlCommander.valid_controller_types[controller_type]()) else: raise Exception(f"invalid controller type {controller_type} specified. valid controller types are " f"{list(ControlCommander.valid_controller_types.keys())}") self.event_listener = EventListener(event_server=('127.0.0.1', 42424)) self.current_robot_index = 0 self.controlling = False self.running = False def run(self): unconnected_robots = list(filter(lambda r: not r.connected, self.robots.values())) if len(unconnected_robots) > 0: print(f"warning: could not connect to the following robots: {unconnected_robots}") return all_detected = False while not all_detected: undetected_robots = list(filter(lambda r: None in r.get_measurement(), self.robots.values())) all_detected = len(undetected_robots) == 0 if not all_detected: print(f"warning: no measurements available for the following robots: {undetected_robots}") time.sleep(0.5) print("starting control") self.running = True while self.running: while not self.event_listener.event_queue.empty(): event = self.event_listener.event_queue.get() self.handle_event(event) def handle_event(self, event): # handle events from opencv window print("event: ", event) if event[0] == 'click': # non-blocking move to target pos target = event[1] target_pos = np.array([target['x'], target['y'], target['angle']]) controlled_robot_id = list(self.robots.keys())[self.current_robot_index] self.robots[controlled_robot_id].move_to_pos(target_pos) elif event[0] == 'move_blocking': # blocking move to specified target position target = event[1] target_pos = np.array([target['x'], target['y'], target['angle']]) controlled_robot_id = list(self.robots.keys())[self.current_robot_index] # initialize move and wait for the robot to reach the target position self.robots[controlled_robot_id].move_to_pos(target_pos, blocking=True) #time.sleep(0.5) # send confirmation to the server which initiated the command self.event_listener.send_reply("ack\n".encode()) elif event[0] == 'key': key = event[1] if key == 16777235: # arrow up self.controlling = not self.controlling if not self.controlling: print("disable control") for _, r in self.robots.items(): r.stop_control() else: print("enable control") for _, r in self.robots.items(): r.start_control() elif key == 16777236: # left # switch controlled robot self.current_robot_index = (self.current_robot_index + 1) % len(self.robots) controlled_robot_id = list(self.robots.keys())[self.current_robot_index] robot = self.robots[controlled_robot_id] print(f"controlled robot: {robot.id}") elif key == 113 or key == 27: # q or ESCAPE print("quit!") for r in self.robots: r.stop_control() self.running = False if __name__ == '__main__': id_ip_dict = { #11: '10.10.11.88', 12: '192.168.1.12', 13: '192.168.1.13', #14: '10.10.11.89', } # controller_type = {12: 'mpc', 13: 'pid'} controller_type = 'pid' rc = ControlCommander(id_ip_dict, controller_type=controller_type) rc.run()