forked from Telos4/RoboRally
54 lines
1.4 KiB
Python
54 lines
1.4 KiB
Python
|
import sys
|
||
|
import socket
|
||
|
import threading
|
||
|
import time
|
||
|
|
||
|
from robot import Robot
|
||
|
|
||
|
from mpc_controller import MPCController
|
||
|
|
||
|
import opencv_viewer_example
|
||
|
|
||
|
|
||
|
class RemoteController:
|
||
|
def __init__(self):
|
||
|
self.robots = [Robot(12, '192.168.1.12')]
|
||
|
|
||
|
self.robot_ids = {}
|
||
|
for r in self.robots:
|
||
|
self.robot_ids[r.id] = r
|
||
|
|
||
|
# socket for movement commands
|
||
|
self.comm_socket = socket.socket()
|
||
|
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
|
|
||
|
for robot in self.robots:
|
||
|
robot.connect()
|
||
|
|
||
|
self.comm_socket.bind(('', 1337))
|
||
|
self.comm_socket.listen(5)
|
||
|
|
||
|
self.t = time.time()
|
||
|
|
||
|
# start thread for marker position detection
|
||
|
self.estimator = opencv_viewer_example.ArucoEstimator(self.robot_ids.keys())
|
||
|
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking)
|
||
|
self.estimator_thread.start()
|
||
|
|
||
|
self.controller = MPCController(self.estimator)
|
||
|
|
||
|
def run(self):
|
||
|
print("waiting until all markers are detected...")
|
||
|
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
|
||
|
pass
|
||
|
print("starting control")
|
||
|
self.controller.interactive_control(robots=self.robots)
|
||
|
|
||
|
def main(args):
|
||
|
rc = RemoteController()
|
||
|
rc.run()
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
main(sys.argv)
|