# startup: # roscore -> start ros # rosparam set cv_camera/device_id 0 -> set appropriate camera device # rosrun cv_camera cv_camera_node -> start the camera # roslaunch aruco_detect aruco_detect.launch camera:=cv_camera image:=image_raw dictionary:=16 transport:= fiducial_len:=0.1 # aruco marker detection # python fiducial_to_2d_pos_angle.py -> compute position and angle of markers in 2d plane import sys import rospy import pygame import numpy as np import socket import scipy.integrate import copy import threading from copy import deepcopy import matplotlib.pyplot as plt import matplotlib.animation as anim import time from casadi_opt import OpenLoopSolver from marker_pos_angle.msg import id_pos_angle class Robot: def __init__(self, id, ip=None): self.pos = None self.orient = None self.id = id self.pos = None self.euler = None self.ip = ip def f_ode(t, x, u): # dynamical model of the two-wheeled robot # TODO: find exact values for these parameters r = 0.03 R = 0.05 d = 0.02 theta = x[2] omega_r = u[0] omega_l = u[1] dx = np.zeros(3) dx[0] = (r/2.0 * np.cos(theta) - r*d/(2.0*R) * np.sin(theta)) * omega_r \ + (r/2.0 * np.cos(theta) + r*d/(2.0 * R) * np.sin(theta)) * omega_l dx[1] = (r/2.0 * np.sin(theta) + r*d/(2.0*R) * np.cos(theta)) * omega_r \ + (r/2 * np.sin(theta) - r*d/(2.0*R) * np.cos(theta)) * omega_l dx[2] = -r/(2.0*R) * (omega_r - omega_l) return dx class RemoteController: def __init__(self): self.robots = [Robot(3)] self.robot_ids = {} for r in self.robots: self.robot_ids[r.id] = r # connect to robot self.rc_socket = socket.socket() try: pass self.rc_socket.connect(('192.168.1.103', 1234)) # connect to robot except socket.error: print("could not connect to socket") self.t = time.time() # variables for simulated state self.x0 = None self.ts = np.array([]) self.xs = [] # variables for measurements self.tms_0 = None self.xm_0 = None self.tms = None self.xms = None self.mutex = threading.Lock() marker_sub = rospy.Subscriber("/marker_id_pos_angle", id_pos_angle, self.measurement_callback) # pid parameters self.k = 0 self.ii = 0.1 self.pp = 0.4 self.inc = 0.0 self.alphas = [] self.speed = 1.0 self.controlling = False self.u1 = 0.0 self.u2 = 0.0 # animation self.fig = plt.figure() self.ax = self.fig.add_subplot(1,1,1) self.xdata, self.ydata = [], [] self.line, = self.ax.plot([],[]) self.line_sim, = self.ax.plot([], []) self.dirm, = self.ax.plot([], []) self.dirs, = self.ax.plot([], []) plt.xlabel('x-position') plt.ylabel('y-position') plt.grid() self.ols = OpenLoopSolver() self.ols.setup() self.target = (0.0, 0.0, 0.0) def ani(self): self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True) plt.ion() plt.show(block=True) def ani_init(self): self.ax.set_xlim(-2, 2) self.ax.set_ylim(-2, 2) self.ax.set_aspect('equal', adjustable='box') return self.line, self.line_sim, self.dirm, self.dirs, def ani_update(self, frame): #print("plotting") self.mutex.acquire() try: # copy data for plot from global arrays if self.tms is not None: tm_local = deepcopy(self.tms) xm_local = deepcopy(self.xms) if len(tm_local) > 0: # plot path of the robot self.line.set_data(xm_local[:,0], xm_local[:,1]) # compute and plot direction the robot is facing a = xm_local[-1, 0] b = xm_local[-1, 1] a2 = a + np.cos(xm_local[-1, 2]) * 1.0 b2 = b + np.sin(xm_local[-1, 2]) * 1.0 self.dirm.set_data(np.array([a, a2]), np.array([b, b2])) ts_local = deepcopy(self.ts) xs_local = deepcopy(self.xs) if len(ts_local) > 0: # plot simulated path of the robot self.line_sim.set_data(xs_local[:,0], xs_local[:,1]) # compute and plot direction the robot is facing a = xs_local[-1, 0] b = xs_local[-1, 1] a2 = a + np.cos(xs_local[-1, 2]) * 1.0 b2 = b + np.sin(xs_local[-1, 2]) * 1.0 self.dirs.set_data(np.array([a, a2]), np.array([b, b2])) finally: self.mutex.release() return self.line, self.line_sim, self.dirm, self.dirs, def measurement_callback(self, data): #print("data = {}".format(data)) if data.id in self.robot_ids: r = self.robot_ids[data.id] r.pos = (data.x, data.y) # only x and y component are important for us r.euler = data.angle #print("r.pos = {}".format(r.pos)) #print("r.angle = {}".format(r.euler)) # save measured position and angle for plotting measurement = np.array([r.pos[0], r.pos[1], r.euler]) if self.tms_0 is None: self.tms_0 = time.time() self.xm_0 = measurement self.mutex.acquire() try: self.tms = np.array([0.0]) self.xms = measurement finally: self.mutex.release() else: self.mutex.acquire() try: self.tms = np.vstack((self.tms, time.time() - self.tms_0)) self.xms = np.vstack((self.xms, measurement)) finally: self.mutex.release() def controller(self): tgrid = None us1 = None us2 = None u1 = -0.0 u2 = 0.0 r = scipy.integrate.ode(f_ode) omega_max = 5.0 init_pos = None init_time = None final_pos = None final_time = None forward = True print("starting control") while True: keyboard_control = False keyboard_control_speed_test = False pid = False open_loop_solve = True if keyboard_control: # keyboard controller events = pygame.event.get() speed = 1.0 for event in events: if event.type == pygame.KEYDOWN: if event.key == pygame.K_LEFT: self.u1 = -speed self.u2 = speed #print("turn left: ({},{})".format(u1, u2)) elif event.key == pygame.K_RIGHT: self.u1 = speed self.u2 = -speed #print("turn right: ({},{})".format(u1, u2)) elif event.key == pygame.K_UP: self.u1 = speed self.u2 = speed #print("forward: ({},{})".format(self.u1, self.u2)) elif event.key == pygame.K_DOWN: self.u1 = -speed self.u2 = -speed #print("forward: ({},{})".format(u1, u2)) self.rc_socket.send('({},{},{})\n'.format(0.1, self.u1, self.u2)) elif event.type == pygame.KEYUP: self.u1 = 0 self.u2 = 0 #print("key released, resetting: ({},{})".format(u1, u2)) self.rc_socket.send('({}, {},{})\n'.format(0.1, self.u1, self.u2)) tnew = time.time() dt = tnew - self.t r = scipy.integrate.ode(f_ode) r.set_f_params(np.array([self.u1 * omega_max, self.u2 * omega_max])) #print(self.x0) if self.x0 is None: if self.xm_0 is not None: self.x0 = self.xm_0 self.xs = self.x0 else: print("error: no measurement available to initialize simulation") x = self.x0 r.set_initial_value(x, self.t) xnew = r.integrate(r.t + dt) self.t = tnew self.x0 = xnew self.mutex.acquire() try: self.ts = np.append(self.ts, tnew) self.xs = np.vstack((self.xs, xnew)) finally: self.mutex.release() elif keyboard_control_speed_test: events = pygame.event.get() for event in events: if event.type == pygame.KEYDOWN: if event.key == pygame.K_1: self.controlling = True forward = True print("starting test") self.mutex.acquire() try: init_pos = copy.deepcopy(self.xms[-1]) init_time = copy.deepcopy(self.tms[-1]) finally: self.mutex.release() if event.key == pygame.K_2: self.controlling = True forward = False print("starting test") self.mutex.acquire() try: init_pos = copy.deepcopy(self.xms[-1]) init_time = copy.deepcopy(self.tms[-1]) finally: self.mutex.release() elif event.key == pygame.K_3: self.controlling = False print("stopping test") self.rc_socket.send('(0.1, 0.0,0.0)\n') self.mutex.acquire() try: final_pos = copy.deepcopy(self.xms[-1]) final_time = copy.deepcopy(self.tms[-1]) finally: self.mutex.release() print("init_pos = {}".format(init_pos)) print("final_pos = {}".format(final_pos)) print("distance = {}".format(np.linalg.norm(init_pos[0:2]-final_pos[0:2]))) print("dt = {}".format(final_time - init_time)) d = np.linalg.norm(init_pos[0:2]-final_pos[0:2]) t = final_time - init_time r = 0.03 angular_velocity = d/r/t print("average angular velocity = {}".format(angular_velocity)) if self.controlling: if forward: self.rc_socket.send('(0.1, 1.0,1.0)\n') else: self.rc_socket.send('(0.1, -1.0,-1.0)\n') time.sleep(0.1) #print("speed = {}".format(self.speed)) elif pid: # pid controller events = pygame.event.get() for event in events: if event.type == pygame.KEYDOWN: if event.key == pygame.K_LEFT: self.ii = self.ii / np.sqrt(np.sqrt(np.sqrt(10.0))) print("ii = {}".format(self.pp)) elif event.key == pygame.K_RIGHT: self.ii = self.ii * np.sqrt(np.sqrt(np.sqrt(10.0))) print("ii = {}".format(self.pp)) elif event.key == pygame.K_UP: self.controlling = True elif event.key == pygame.K_DOWN: self.controlling = False self.rc_socket.send('({},{})\n'.format(0, 0)) dt = 0.05 if self.controlling: # test: turn robot such that angle is zero for r in self.robots: if r.euler is not None: self.k = self.k + 1 alpha = r.euler self.alphas.append(alpha) # compute error e = alpha - 0 # compute integral of error (approximately) self.inc += e * dt # PID p = self.pp * e i = self.ii * self.inc d = 0.0 # compute controls for robot from PID u1 = p + i + d u2 = - p - i - d print("alpha = {}, u = ({}, {})".format(alpha, u1, u2)) self.rc_socket.send('({},{})\n'.format(u1, u2)) time.sleep(dt) elif open_loop_solve: # open loop controller events = pygame.event.get() for event in events: if event.type == pygame.KEYDOWN: if event.key == pygame.K_UP: self.controlling = True self.t = time.time() elif event.key == pygame.K_DOWN: self.controlling = False self.rc_socket.send('(0.1, 0.0,0.0)\n') elif event.key == pygame.K_0: self.target = (0.0, 0.0, 0.0) elif event.key == pygame.K_1: self.target = (0.5,0.5, -np.pi/2.0) elif event.key == pygame.K_2: self.target = (0.5, -0.5, 0.0) elif event.key == pygame.K_3: self.target = (-0.5,-0.5, np.pi/2.0) elif event.key == pygame.K_4: self.target = (-0.5,0.5, 0.0) if self.controlling: tmpc_start = time.time() # get measurement self.mutex.acquire() try: last_measurement = copy.deepcopy(self.xms[-1]) last_time = copy.deepcopy(self.tms[-1]) finally: self.mutex.release() print("current measurement (t, x) = ({}, {})".format(last_time, last_measurement)) print("current control (u1, u2) = ({}, {})".format(u1, u2)) # prediction of state at time the mpc will terminate r.set_f_params(np.array([u1 * omega_max, u2 * omega_max])) r.set_initial_value(last_measurement, last_time) dt = self.ols.T/self.ols.N print("integrating for {} seconds".format((dt))) x_pred = r.integrate(r.t + (dt)) print("predicted initial state x_pred = ({})".format(x_pred)) res = self.ols.solve(x_pred, self.target) #tgrid = res[0] us1 = res[0] us2 = res[1] # tt = 0 # x = last_measurement # t_ol = np.array([tt]) # x_ol = np.array([x]) # # compute open loop prediction # for i in range(len(us1)): # r = scipy.integrate.ode(f_ode) # r.set_f_params(np.array([us1[i] * 13.32, us2[i] * 13.32])) # r.set_initial_value(x, tt) # # tt = tt + 0.1 # x = r.integrate(tt) # # t_ol = np.vstack((t_ol, tt)) # x_ol = np.vstack((x_ol, x)) #plt.figure(4) #plt.plot(x_ol[:,0], x_ol[:,1]) #if event.key == pygame.K_DOWN: # if tgrid is not None: tmpc_end = time.time() print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start)) dt_mpc = time.time() - self.t if dt_mpc < dt: print("sleeping for {} seconds...".format(dt - dt_mpc)) time.sleep(dt - dt_mpc) self.mutex.acquire() try: second_measurement = copy.deepcopy(self.xms[-1]) second_time = copy.deepcopy(self.tms[-1]) finally: self.mutex.release() print("(last_time, second_time, dt) = ({}, {}, {})".format(last_time, second_time, second_time - last_time)) print("mismatch between predicted state and measured state: {}\n\n".format(second_measurement - last_measurement)) for i in range(0, 1): u1 = us1[i] u2 = us2[i] #self.rc_socket.send('({},{},{})\n'.format(dt,u1, u2)) self.rc_socket.send('({},{})\n'.format(u1, u2)) self.t = time.time() time.sleep(0.2) # pass def main(args): rospy.init_node('controller_node', anonymous=True) rc = RemoteController() pygame.init() screenheight = 480 screenwidth = 640 screen = pygame.display.set_mode([screenwidth, screenheight]) threading.Thread(target=rc.controller).start() rc.ani() if __name__ == '__main__': main(sys.argv)