import numpy as np import time from robot import ControlledRobot from pid_controller import PIDController from mpc_controller import MPCController from event_listener import EventListener class CommanderBase: def __init__(self): self.robots = [] self.robots = [ControlledRobot(12, '10.10.11.91'), ControlledRobot(13, '10.10.11.90')] #, Robot(14, '192.168.1.14')] # self.robots = [ControlledRobot(marker_id=13, ip='10.10.11.90'), # ControlledRobot(marker_id=14, ip='10.10.11.89')] for r in self.robots: r.connect() self.robots[0].attach_controller(PIDController()) self.robots[1].attach_controller(MPCController()) self.event_listener = EventListener(event_server=('127.0.0.1', 42424)) self.current_robot_index = 0 self.controlling = False self.running = False def run(self): unconnected_robots = list(filter(lambda r: not r.connected, self.robots)) if len(unconnected_robots) > 0: print(f"warning: could not connect to the following robots: {unconnected_robots}") return all_detected = False while not all_detected: undetected_robots = list(filter(lambda r: None in r.get_measurement(), self.robots)) all_detected = len(undetected_robots) == 0 if not all_detected: print(f"warning: no measurements available for the following robots: {undetected_robots}") time.sleep(0.5) print("starting control") self.running = True while self.running: while not self.event_listener.event_queue.empty(): event = self.event_listener.event_queue.get() self.handle_event(event) def handle_event(self, event): # handle events from opencv window print("event: ", event) if event[0] == 'click': target = event[1] target_pos = np.array([target['x'], target['y'], target['angle']]) self.robots[self.current_robot_index].move_to_pos(target_pos) elif event[0] == 'key': key = event[1] if key == 32: # arrow up self.controlling = not self.controlling if not self.controlling: print("disable control") for r in self.robots: r.stop_control() else: print("enable control") for r in self.robots: r.start_control() elif key == 9: # TAB # switch controlled robot self.current_robot_index = (self.current_robot_index + 1) % len(self.robots) robot = self.robots[self.current_robot_index] print(f"controlled robot: {robot.id}") elif key == 113 or key == 27: # q or ESCAPE print("quit!") for r in self.robots: r.stop_control() self.running = False if __name__ == '__main__': rc = CommanderBase() rc.run()