forked from Telos4/RoboRally
81 lines
2.9 KiB
Python
81 lines
2.9 KiB
Python
import numpy as np
|
|
import time
|
|
|
|
from robot import ControlledRobot
|
|
from pid_controller import PIDController
|
|
from event_listener import EventListener
|
|
|
|
|
|
class CommanderBase:
|
|
def __init__(self):
|
|
self.robots = []
|
|
self.robots = [ControlledRobot(12, '10.10.11.91')] # , Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
|
|
# self.robots = [ControlledRobot(marker_id=13, ip='10.10.11.90'),
|
|
# ControlledRobot(marker_id=14, ip='10.10.11.89')]
|
|
|
|
for r in self.robots:
|
|
r.connect()
|
|
r.attach_controller(PIDController())
|
|
|
|
self.event_listener = EventListener(event_server=('127.0.0.1', 42424))
|
|
|
|
self.current_robot_index = 0
|
|
self.controlling = False
|
|
self.running = False
|
|
|
|
def run(self):
|
|
unconnected_robots = list(filter(lambda r: not r.connected, self.robots))
|
|
if len(unconnected_robots) > 0:
|
|
print(f"warning: could not connect to the following robots: {unconnected_robots}")
|
|
return
|
|
|
|
all_detected = False
|
|
while not all_detected:
|
|
undetected_robots = list(filter(lambda r: None in r.get_measurement(), self.robots))
|
|
all_detected = len(undetected_robots) == 0
|
|
if not all_detected:
|
|
print(f"warning: no measurements available for the following robots: {undetected_robots}")
|
|
time.sleep(0.5)
|
|
|
|
print("starting control")
|
|
self.running = True
|
|
while self.running:
|
|
while not self.event_listener.event_queue.empty():
|
|
event = self.event_listener.event_queue.get()
|
|
self.handle_event(event)
|
|
|
|
def handle_event(self, event):
|
|
# handle events from opencv window
|
|
print("event: ", event)
|
|
if event[0] == 'click':
|
|
target = event[1]
|
|
target_pos = np.array([target['x'], target['y'], target['angle']])
|
|
self.robots[self.current_robot_index].move_to_pos(target_pos)
|
|
elif event[0] == 'key':
|
|
key = event[1]
|
|
if key == 32: # arrow up
|
|
self.controlling = not self.controlling
|
|
if not self.controlling:
|
|
print("disable control")
|
|
for r in self.robots:
|
|
r.stop_control()
|
|
else:
|
|
print("enable control")
|
|
for r in self.robots:
|
|
r.start_control()
|
|
elif key == 9: # TAB
|
|
# switch controlled robot
|
|
self.current_robot_index = (self.current_robot_index + 1) % len(self.robots)
|
|
robot = self.robots[self.current_robot_index]
|
|
print(f"controlled robot: {robot.id}")
|
|
elif key == 113 or key == 27: # q or ESCAPE
|
|
print("quit!")
|
|
for r in self.robots:
|
|
r.stop_control()
|
|
self.running = False
|
|
|
|
|
|
if __name__ == '__main__':
|
|
rc = CommanderBase()
|
|
rc.run()
|