forked from Telos4/RoboRally
initial commit for control commander
This commit is contained in:
parent
edc3e8d290
commit
17637f427b
80
remote_control/control_commander.py
Normal file
80
remote_control/control_commander.py
Normal file
|
@ -0,0 +1,80 @@
|
|||
import numpy as np
|
||||
import time
|
||||
|
||||
from robot import ControlledRobot
|
||||
from pid_controller import PIDController
|
||||
from event_listener import EventListener
|
||||
|
||||
|
||||
class CommanderBase:
|
||||
def __init__(self):
|
||||
self.robots = []
|
||||
self.robots = [ControlledRobot(12, '10.10.11.91')] # , Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
|
||||
# self.robots = [ControlledRobot(marker_id=13, ip='10.10.11.90'),
|
||||
# ControlledRobot(marker_id=14, ip='10.10.11.89')]
|
||||
|
||||
for r in self.robots:
|
||||
r.connect()
|
||||
r.attach_controller(PIDController())
|
||||
|
||||
self.event_listener = EventListener(event_server=('127.0.0.1', 42424))
|
||||
|
||||
self.current_robot_index = 0
|
||||
self.controlling = False
|
||||
self.running = False
|
||||
|
||||
def run(self):
|
||||
unconnected_robots = list(filter(lambda r: not r.connected, self.robots))
|
||||
if len(unconnected_robots) > 0:
|
||||
print(f"warning: could not connect to the following robots: {unconnected_robots}")
|
||||
return
|
||||
|
||||
all_detected = False
|
||||
while not all_detected:
|
||||
undetected_robots = list(filter(lambda r: None in r.get_measurement(), self.robots))
|
||||
all_detected = len(undetected_robots) == 0
|
||||
if not all_detected:
|
||||
print(f"warning: no measurements available for the following robots: {undetected_robots}")
|
||||
time.sleep(0.5)
|
||||
|
||||
print("starting control")
|
||||
self.running = True
|
||||
while self.running:
|
||||
while not self.event_listener.event_queue.empty():
|
||||
event = self.event_listener.event_queue.get()
|
||||
self.handle_event(event)
|
||||
|
||||
def handle_event(self, event):
|
||||
# handle events from opencv window
|
||||
print("event: ", event)
|
||||
if event[0] == 'click':
|
||||
target = event[1]
|
||||
target_pos = np.array([target['x'], target['y'], target['angle']])
|
||||
self.robots[self.current_robot_index].move_to_pos(target_pos)
|
||||
elif event[0] == 'key':
|
||||
key = event[1]
|
||||
if key == 32: # arrow up
|
||||
self.controlling = not self.controlling
|
||||
if not self.controlling:
|
||||
print("disable control")
|
||||
for r in self.robots:
|
||||
r.stop_control()
|
||||
else:
|
||||
print("enable control")
|
||||
for r in self.robots:
|
||||
r.start_control()
|
||||
elif key == 9: # TAB
|
||||
# switch controlled robot
|
||||
self.current_robot_index = (self.current_robot_index + 1) % len(self.robots)
|
||||
robot = self.robots[self.current_robot_index]
|
||||
print(f"controlled robot: {robot.id}")
|
||||
elif key == 113 or key == 27: # q or ESCAPE
|
||||
print("quit!")
|
||||
for r in self.robots:
|
||||
r.stop_control()
|
||||
self.running = False
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
rc = CommanderBase()
|
||||
rc.run()
|
Loading…
Reference in New Issue
Block a user