# startup: # roscore -> start ros # rosparam set cv_camera/device_id 0 -> set appropriate camera device # rosrun cv_camera cv_camera_node -> start the camera # roslaunch aruco_detect aruco_detect.launch camera:=cv_camera image:=image_raw dictionary:=16 transport:= fiducial_len:=0.1 # aruco marker detection # python fiducial_to_2d_pos_angle.py -> compute position and angle of markers in 2d plane import sys import rospy import pygame import numpy as np import socket import scipy.integrate import copy import threading from copy import deepcopy import matplotlib.pyplot as plt import matplotlib.animation as anim import matplotlib.patches as patch from shapely.geometry import Polygon import time from casadi_opt import OpenLoopSolver #from marker_pos_angle.msg import id_pos_angle from collections import OrderedDict from argparse import ArgumentParser import opencv_viewer_example MSGLEN = 64 def myreceive(sock): chunks = [] bytes_recd = 0 while bytes_recd < MSGLEN: chunk = sock.recv(1) if chunk == b'': raise RuntimeError("socket connection broken") chunks.append(chunk) bytes_recd = bytes_recd + len(chunk) if chunks[-1] == b'\n': break return b''.join(chunks) class Robot: # dictionary mapping the current orientation and a turn command to the resulting orientation resulting_orientation = { '^': {'turn left': '<', 'turn right': '>', 'turn around': 'v'}, '>': {'turn left': '^', 'turn right': 'v', 'turn around': '<'}, 'v': {'turn left': '>', 'turn right': '<', 'turn around': '^'}, '<': {'turn left': 'v', 'turn right': '^', 'turn around': '>'}, } # dictionary mapping an orientation to its opposite opposites = {'^': 'v', '>': '<', 'v': '^', '<': '>'} def __init__(self, id, ip, x=0, y=0, orientation='>'): self.x = x self.y = y self.orientation = orientation self.id = id self.pos = None self.euler = None self.ip = ip self.socket = socket.socket() # variables for measurements self.tms_0 = None self.xm_0 = None self.tms = None self.xms = None # currently active control self.u1 = 0.0 self.u2 = 0.0 def get_neighbor_coordinates(self, direction): # get the coordinates of the neighboring tile in the given direction if direction == '^': return (self.x, self.y - 1) elif direction == '>': return (self.x + 1, self.y) elif direction == 'v': return (self.x, self.y + 1) elif direction == '<': return (self.x - 1, self.y) else: print("error: unknown direction") sys.exit(1) def move(self, type): if type == 'forward': target_tile = self.get_neighbor_coordinates(self.orientation) self.x = target_tile[0] self.y = target_tile[1] elif type == 'backward': opposite_orientation = Robot.opposites[self.orientation] target_tile = self.get_neighbor_coordinates(opposite_orientation) self.x = target_tile[0] self.y = target_tile[1] elif 'turn' in type: self.orientation = Robot.resulting_orientation[self.orientation][type] else: print("error: invalid move") sys.exit(1) def connect(self): # connect to robot try: print("connecting to robot {} with ip {} ...".format(self.id, self.ip)) #self.socket.connect((self.ip, 1234)) # connect to robot print("connected!") except socket.error: print("could not connect to robot {} with ip {}".format(self.id, self.ip)) sys.exit(1) def __str__(self): return self.__repr__() def __repr__(self): return f"x: {self.x}, y: {self.y}, orienation: {self.orientation}" def f_ode(t, x, u): # dynamical model of the two-wheeled robot # TODO: find exact values for these parameters r = 0.03 R = 0.05 d = 0.02 theta = x[2] omega_r = u[0] omega_l = u[1] dx = np.zeros(3) dx[0] = (r/2.0 * np.cos(theta) - r*d/(2.0*R) * np.sin(theta)) * omega_r \ + (r/2.0 * np.cos(theta) + r*d/(2.0 * R) * np.sin(theta)) * omega_l dx[1] = (r/2.0 * np.sin(theta) + r*d/(2.0*R) * np.cos(theta)) * omega_r \ + (r/2 * np.sin(theta) - r*d/(2.0*R) * np.cos(theta)) * omega_l dx[2] = -r/(2.0*R) * (omega_r - omega_l) return dx class RemoteController: def __init__(self, id, ip): # self.robots = #[Robot(11, '192.168.1.11', (6, -3, np.pi)), Robot(12, '192.168.1.12', (6, -3, np.pi)), # Robot(13, '192.168.1.13', (6, -3, np.pi)), Robot(14, '192.168.1.14', (6, -2, np.pi))] #self.robots = [Robot(13, '192.168.1.13', (6, -3, np.pi))] #self.robots = [] #self.robots = [Robot(11, '192.168.1.11', (6,-3,0)), Robot(14, '192.168.1.14', (6,3,0))] #self.robots = [Robot(11, '192.168.1.11'), Robot(14, '192.168.1.14')] self.robots = [Robot(12, '192.168.1.12')] self.robot_ids = {} for r in self.robots: self.robot_ids[r.id] = r self.valid_cmds = ['forward', 'backward', 'turn left', 'turn right', 'turn around', 'get position', 'set position'] # socket for movement commands self.comm_socket = socket.socket() self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) for robot in self.robots: robot.connect() self.comm_socket.bind(('', 1337)) self.comm_socket.listen(5) self.t = time.time() # variables for simulated state self.x0 = None self.ts = np.array([]) self.xs = [] # variable for mpc open loop self.ol_x = None self.ol_y = None # ROS subscriber for detected markers self.estimator = opencv_viewer_example.ArucoEstimator(self.robot_ids.keys()) self.estimator_thread = threading.Thread(target=self.estimator.run_tracking) self.estimator_thread.start() # pid parameters self.controlling = False self.mstep = 2 self.ols = OpenLoopSolver(N=20, T=1.0) self.ols.setup() self.dt = self.ols.T / self.ols.N self.target = (0.0, 0.0, 0.0) # integrator self.r = scipy.integrate.ode(f_ode) self.omega_max = 5.0 self.control_scaling = 0.2 #self.omega_max = 13.32 def controller(self): print("waiting until all markers are detected...") while not self.estimator.all_corners_detected(): pass print("starting control") running = True while running: (clientsocket, address) = self.comm_socket.accept() clientsocket.settimeout(None) connected = True while connected: try: data = myreceive(clientsocket) print("data received: ", data) inputs = data.split(b',') cmd = inputs[0] cmd = cmd.strip().decode() if len(inputs) > 1: if cmd in self.valid_cmds: try: robot_id = int(inputs[1]) except ValueError: print("could not read robot id!") clientsocket.sendall(b'Could not read robot id!\n') if robot_id in self.robot_ids: if cmd == b'get position': clientsocket.sendall(bytes('{}\n'.format(self.robot_ids[robot_id].grid_pos))) elif cmd == b'set position': try: pos_data = ",".join(inputs[2:]) new_grid_pos = tuple(map(lambda x: int(x[1]) if x[0] < 2 else float(x[1]), enumerate(pos_data.strip().strip('()').split(',')))) self.robot_ids[robot_id].grid_pos = new_grid_pos clientsocket.sendall(b'OK\n') except IndexError as e: print("could not set grid position!") clientsocket.sendall(bytes( 'could not set grid position! (invalid format)\n'.format( self.robot_ids[robot_id].grid_pos))) except ValueError as e: print("could not set grid position!") clientsocket.sendall(bytes('could not set grid position! (invalid format)\n'.format(self.robot_ids[robot_id].grid_pos))) else: self.mpc_control(robot_id, cmd) clientsocket.sendall(b'OK\n') else: print("invalid robot id!") clientsocket.sendall(b'Invalid robot id!\n') else: clientsocket.sendall(b'Invalid command!\n') else: # len(inputs) <= 1 if b'quit' in inputs[0]: clientsocket.close() self.comm_socket.close() connected = False running = False else: print("could not process command!") clientsocket.sendall(b'Could not process request!\n') except RuntimeError: print("disconnected") connected = False clientsocket.close() def mpc_control(self, robot_id, cmd): robot = self.robot_ids[robot_id] # get robot to be controlled print("robot grid pos before move: ", robot) robot.move(cmd) print("robot grid pos after move: ", robot) self.target = self.estimator.get_pos_from_grid_point(robot.x, robot.y, robot.orientation) self.pid = False self.mpc = True near_target = 0 while near_target < 5: # open loop controller events = pygame.event.get() for event in events: if event.type == pygame.KEYDOWN: if event.key == pygame.K_UP: self.controlling = True self.t = time.time() elif event.key == pygame.K_DOWN: self.controlling = False if self.robot_ids[robot_id].socket: self.robot_ids[robot_id].socket.send('(0.0,0.0)\n') elif event.key == pygame.K_0: self.target = np.array([0,0,0]) elif event.key == pygame.K_PLUS: self.control_scaling += 0.1 self.control_scaling = min(self.control_scaling, 1.0) print("control scaling = ", self.control_scaling) elif event.key == pygame.K_MINUS: self.control_scaling -= 0.1 self.control_scaling = max(self.control_scaling, 0.1) print("control scaling = ", self.control_scaling) elif event.key == pygame.K_ESCAPE: print("quit!") self.controlling = False if self.robot_ids[robot_id].socket: self.robot_ids[robot_id].socket.send('(0.0,0.0)\n') self.anim_stopped = True return elif event.key == pygame.QUIT: print("quit!") self.controlling = False if self.robot_ids[robot_id].socket: self.robot_ids[robot_id].socket.send('(0.0,0.0)\n') self.anim_stopped = True return if self.mpc: x_pred = self.get_measurement(robot_id) tmpc_start = time.time() error_pos = np.linalg.norm(x_pred[0:2] - self.target[0:2]) angles_unwrapped = np.unwrap([x_pred[2], self.target[2]]) # unwrap angle to avoid jump in data error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1]) #print("error pos = ", error_pos) #print("error_pos = {}, error_ang = {}".format(error_pos, error_ang)) #if error_pos > 0.075 or error_ang > 0.35: if error_pos > 0.05 or error_ang > 0.2: # solve mpc open loop problem res = self.ols.solve(x_pred, self.target) #us1 = res[0] #us2 = res[1] us1 = res[0] * self.control_scaling us2 = res[1] * self.control_scaling #print("u = {}", (us1, us2)) tmpc_end = time.time() #print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start)) dt_mpc = time.time() - self.t if dt_mpc < self.dt: # wait until next control can be applied #print("sleeping for {} seconds...".format(self.dt - dt_mpc)) time.sleep(self.dt - dt_mpc) else: us1 = [0] * self.mstep us2 = [0] * self.mstep near_target += 1 # send controls to the robot for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1 u1 = us1[i] u2 = us2[i] if self.robot_ids[robot_id].socket: #self.robot_ids[robot_id].socket.send('({},{})\n'.format(u1, u2).encode()) if i < self.mstep: time.sleep(self.dt) self.t = time.time() # save time the most recent control was applied def get_measurement(self, robot_id): return np.array(self.estimator.get_robot_state_estimate(robot_id)) def main(args): parser = ArgumentParser() parser.add_argument('id', metavar='id', type=str, help='marker id of the controlled robot') parser.add_argument('ip', metavar='ip', type=str, help='ip address of the controlled robot') args = parser.parse_args() marker_id = int(args.id) ip = args.ip rc = RemoteController(marker_id, ip) pygame.init() screenheight = 480 screenwidth = 640 pygame.display.set_mode([screenwidth, screenheight]) rc.controller() if __name__ == '__main__': main(sys.argv)