import sys import socket import threading import time from robot import Robot, ControlledRobot from mpc_controller import MPCController # HOST, PORT = "localhost", 42424 # # robot_id = 12 # # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # sock.connect((HOST, PORT)) # sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id # #sock.sendall(f"events\n".encode()) # request events # receiving = True # while receiving: # received = str(sock.recv(1024), "utf-8") # print("Received: {}".format(received)) # receiving = len(received) > 0 class RemoteController: def __init__(self): self.robots = [] #self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')] #self.robots = [Robot(12, '10.10.11.91'), Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')] #self.robots = [Robot(14, '10.10.11.89')] self.robots = [ControlledRobot(13, '192.168.1.13')] self.robot_ids = {} for r in self.robots: self.robot_ids[r.id] = r # socket for movement commands self.comm_socket = socket.socket() self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) for robot in self.robots: robot.connect() self.comm_socket.bind(('', 1337)) self.comm_socket.listen(5) self.t = time.time() # start thread for marker position detection self.controller = MPCController() self.robots[0].attach_controller(self.controller) def run(self): #print("waiting until all markers are detected...") #while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()): # pass print("starting control") #self.controller.interactive_control(robots=self.robots) self.controller.start() pass def main(args): rc = RemoteController() rc.run() if __name__ == '__main__': main(sys.argv) # to use the program, first start measurement_server.py, which launches the Aruco marker detection GUI and streams # marker positions and GUI events via socket # next, launch this program (mpc_test.py) in debug mode and set a breakpoint after self.controller.start() # you can now set the target position for the controlled robot via move_to_pos(), e.g. # self.robots[0].move_to_pos([0.0,0,0], True) # TODO # figure out how to get clicked positions from measurement server and drive robot there # also document and improve program structure