diff --git a/remote_control/position_controller.py b/remote_control/position_controller.py index 922ffcf..805fdd8 100644 --- a/remote_control/position_controller.py +++ b/remote_control/position_controller.py @@ -75,7 +75,7 @@ class RemoteController: for r in self.robots: self.robot_ids[r.id] = r - obst = [Obstacle(6, 0.2), Obstacle(5, 0.2), Obstacle(8, 0.2)] + obst = [Obstacle(6, 0.175), Obstacle(5, 0.175), Obstacle(8, 0.175)] self.obstacles = {} for r in obst: @@ -111,21 +111,13 @@ class RemoteController: self.mutex = threading.Lock() + # ROS subscriber for detected markers marker_sub = rospy.Subscriber("/marker_id_pos_angle", id_pos_angle, self.measurement_callback) - # pid parameters - self.k = 0 - self.ii = 0.1 - self.pp = 0.4 - - self.inc = 0.0 - - self.alphas = [] - - self.speed = 1.0 self.controlling = False + # currently active control self.u1 = 0.0 self.u2 = 0.0 @@ -152,9 +144,15 @@ class RemoteController: self.ols = OpenLoopSolver() self.ols.setup() + self.dt = self.ols.T / self.ols.N self.target = (0.0, 0.0, 0.0) + # integrator + self.r = scipy.integrate.ode(f_ode) + self.omega_max = 5.0 + + def ani(self): self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True) plt.ion() @@ -213,7 +211,6 @@ class RemoteController: else: self.line_ol.set_data([],[]) - i = 0 obst_keys = self.obstacles.keys() for s in self.circles: @@ -230,16 +227,13 @@ class RemoteController: return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol, self.circles[0], self.circles[1],self.circles[2], def measurement_callback(self, data): - #print("data = {}".format(data)) + # detect robots if data.id in self.robot_ids: r = self.robot_ids[data.id] r.pos = (data.x, data.y) # only x and y component are important for us r.euler = data.angle - #print("r.pos = {}".format(r.pos)) - #print("r.angle = {}".format(r.euler)) - # save measured position and angle for plotting measurement = np.array([r.pos[0], r.pos[1], r.euler]) if self.tms_0 is None: @@ -260,257 +254,89 @@ class RemoteController: finally: self.mutex.release() + # detect obstacles if data.id in self.obstacles.keys(): obst = (data.x, data.y) self.obstacles[data.id].pos = obst def controller(self): - tgrid = None - us1 = None - us2 = None - u1 = -0.0 - u2 = 0.0 - - r = scipy.integrate.ode(f_ode) - - omega_max = 5.0 - - init_pos = None - init_time = None - final_pos = None - final_time = None - forward = True print("starting control") while True: - keyboard_control = False - keyboard_control_speed_test = False - pid = False - open_loop_solve = True + # open loop controller + events = pygame.event.get() + for event in events: + if event.type == pygame.KEYDOWN: + if event.key == pygame.K_UP: + self.controlling = True + self.t = time.time() + elif event.key == pygame.K_DOWN: + self.controlling = False + if self.rc_socket: + self.rc_socket.send('(0.0,0.0)\n') + elif event.key == pygame.K_0: + self.target = (0.0, 0.0, 0.0) + elif event.key == pygame.K_1: + self.target = (0.5,0.5, -np.pi/2.0) + elif event.key == pygame.K_2: + self.target = (0.5, -0.5, 0.0) + elif event.key == pygame.K_3: + self.target = (-0.5,-0.5, np.pi/2.0) + elif event.key == pygame.K_4: + self.target = (-0.5,0.5, 0.0) - if keyboard_control: # keyboard controller - events = pygame.event.get() - speed = 1.0 - for event in events: - if event.type == pygame.KEYDOWN: - if event.key == pygame.K_LEFT: - self.u1 = -speed - self.u2 = speed - #print("turn left: ({},{})".format(u1, u2)) - elif event.key == pygame.K_RIGHT: - self.u1 = speed - self.u2 = -speed - #print("turn right: ({},{})".format(u1, u2)) - elif event.key == pygame.K_UP: - self.u1 = speed - self.u2 = speed - #print("forward: ({},{})".format(self.u1, self.u2)) - elif event.key == pygame.K_DOWN: - self.u1 = -speed - self.u2 = -speed - #print("forward: ({},{})".format(u1, u2)) - self.rc_socket.send('({},{},{})\n'.format(0.1, self.u1, self.u2)) - elif event.type == pygame.KEYUP: - self.u1 = 0 - self.u2 = 0 - #print("key released, resetting: ({},{})".format(u1, u2)) - self.rc_socket.send('({}, {},{})\n'.format(0.1, self.u1, self.u2)) + if self.controlling: + x_pred = self.get_measurement_prediction() - tnew = time.time() - dt = tnew - self.t - r = scipy.integrate.ode(f_ode) - r.set_f_params(np.array([self.u1 * omega_max, self.u2 * omega_max])) + tmpc_start = time.time() - #print(self.x0) - if self.x0 is None: - if self.xm_0 is not None: - self.x0 = self.xm_0 - self.xs = self.x0 - else: - print("error: no measurement available to initialize simulation") - x = self.x0 - r.set_initial_value(x, self.t) - xnew = r.integrate(r.t + dt) + # solve mpc open loop problem + res = self.ols.solve(x_pred, self.target, self.obstacles) - self.t = tnew - self.x0 = xnew + us1 = res[0] + us2 = res[1] + # save open loop trajectories for plotting self.mutex.acquire() try: - self.ts = np.append(self.ts, tnew) - self.xs = np.vstack((self.xs, xnew)) + self.ol_x = res[2] + self.ol_y = res[3] finally: self.mutex.release() + tmpc_end = time.time() + print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start)) + dt_mpc = time.time() - self.t + if dt_mpc < self.dt: # wait until next control can be applied + print("sleeping for {} seconds...".format(self.dt - dt_mpc)) + time.sleep(self.dt - dt_mpc) - elif keyboard_control_speed_test: - events = pygame.event.get() - for event in events: - if event.type == pygame.KEYDOWN: - if event.key == pygame.K_1: - self.controlling = True - forward = True - print("starting test") - self.mutex.acquire() - try: - init_pos = copy.deepcopy(self.xms[-1]) - init_time = copy.deepcopy(self.tms[-1]) - finally: - self.mutex.release() - if event.key == pygame.K_2: - self.controlling = True - forward = False - print("starting test") - self.mutex.acquire() - try: - init_pos = copy.deepcopy(self.xms[-1]) - init_time = copy.deepcopy(self.tms[-1]) - finally: - self.mutex.release() - elif event.key == pygame.K_3: - self.controlling = False - print("stopping test") - self.rc_socket.send('(0.1, 0.0,0.0)\n') + # send controls to the robot + for i in range(0, 1): # option to use multistep mpc if len(range) > 1 + u1 = us1[i] + u2 = us2[i] + if self.rc_socket: + self.rc_socket.send('({},{})\n'.format(u1, u2)) + self.t = time.time() # save time the most recent control was applied - self.mutex.acquire() - try: - final_pos = copy.deepcopy(self.xms[-1]) - final_time = copy.deepcopy(self.tms[-1]) - finally: - self.mutex.release() + def get_measurement_prediction(self): + # get measurement + self.mutex.acquire() + try: + last_measurement = copy.deepcopy(self.xms[-1]) + last_time = copy.deepcopy(self.tms[-1]) + finally: + self.mutex.release() - print("init_pos = {}".format(init_pos)) - print("final_pos = {}".format(final_pos)) - print("distance = {}".format(np.linalg.norm(init_pos[0:2]-final_pos[0:2]))) - print("dt = {}".format(final_time - init_time)) + # prediction of state at time the mpc will terminate + self.r.set_f_params(np.array([self.u1 * self.omega_max, self.u2 * self.omega_max])) - d = np.linalg.norm(init_pos[0:2]-final_pos[0:2]) - t = final_time - init_time - r = 0.03 + self.r.set_initial_value(last_measurement, last_time) - angular_velocity = d/r/t - print("average angular velocity = {}".format(angular_velocity)) + x_pred = self.r.integrate(self.r.t + self.dt) + return x_pred - if self.controlling: - if forward: - self.rc_socket.send('(0.1, 1.0,1.0)\n') - else: - self.rc_socket.send('(0.1, -1.0,-1.0)\n') - time.sleep(0.1) - #print("speed = {}".format(self.speed)) - - - elif open_loop_solve: - # open loop controller - - events = pygame.event.get() - for event in events: - if event.type == pygame.KEYDOWN: - if event.key == pygame.K_UP: - self.controlling = True - self.t = time.time() - elif event.key == pygame.K_DOWN: - self.controlling = False - if self.rc_socket: - self.rc_socket.send('(0.0,0.0)\n') - elif event.key == pygame.K_0: - self.target = (0.0, 0.0, 0.0) - elif event.key == pygame.K_1: - self.target = (0.5,0.5, -np.pi/2.0) - elif event.key == pygame.K_2: - self.target = (0.5, -0.5, 0.0) - elif event.key == pygame.K_3: - self.target = (-0.5,-0.5, np.pi/2.0) - elif event.key == pygame.K_4: - self.target = (-0.5,0.5, 0.0) - if self.controlling: - # get measurement - self.mutex.acquire() - try: - last_measurement = copy.deepcopy(self.xms[-1]) - last_time = copy.deepcopy(self.tms[-1]) - finally: - self.mutex.release() - - #print("current measurement (t, x) = ({}, {})".format(last_time, last_measurement)) - #print("current control (u1, u2) = ({}, {})".format(u1, u2)) - - # prediction of state at time the mpc will terminate - r.set_f_params(np.array([u1 * omega_max, u2 * omega_max])) - - r.set_initial_value(last_measurement, last_time) - dt = self.ols.T/self.ols.N - #print("integrating for {} seconds".format((dt))) - x_pred = r.integrate(r.t + (dt)) - - #print("predicted initial state x_pred = ({})".format(x_pred)) - - tmpc_start = time.time() - - res = self.ols.solve(x_pred, self.target, self.obstacles) - #tgrid = res[0] - us1 = res[0] - us2 = res[1] - - self.mutex.acquire() - try: - self.ol_x = res[2] - self.ol_y = res[3] - finally: - self.mutex.release() - - # tt = 0 - # x = last_measurement - # t_ol = np.array([tt]) - # x_ol = np.array([x]) - # # compute open loop prediction - # for i in range(len(us1)): - # r = scipy.integrate.ode(f_ode) - # r.set_f_params(np.array([us1[i] * 13.32, us2[i] * 13.32])) - # r.set_initial_value(x, tt) - # - # tt = tt + 0.1 - # x = r.integrate(tt) - # - # t_ol = np.vstack((t_ol, tt)) - # x_ol = np.vstack((x_ol, x)) - - #plt.figure(4) - #plt.plot(x_ol[:,0], x_ol[:,1]) - - - #if event.key == pygame.K_DOWN: - # if tgrid is not None: - tmpc_end = time.time() - print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start)) - dt_mpc = time.time() - self.t - if dt_mpc < dt: - print("sleeping for {} seconds...".format(dt - dt_mpc)) - time.sleep(dt - dt_mpc) - - self.mutex.acquire() - try: - second_measurement = copy.deepcopy(self.xms[-1]) - second_time = copy.deepcopy(self.tms[-1]) - finally: - self.mutex.release() - - #print("(last_time, second_time, dt) = ({}, {}, {})".format(last_time, second_time, second_time - last_time)) - #print("mismatch between predicted state and measured state: {}\n\n".format(second_measurement - last_measurement)) - - for i in range(0, 1): - u1 = us1[i] - u2 = us2[i] - #self.rc_socket.send('({},{},{})\n'.format(dt,u1, u2)) - if self.rc_socket: - self.rc_socket.send('({},{})\n'.format(u1, u2)) - self.t = time.time() - #time.sleep(0.2) - # - - - pass def main(args): rospy.init_node('controller_node', anonymous=True) @@ -523,6 +349,7 @@ def main(args): screenwidth = 640 pygame.display.set_mode([screenwidth, screenheight]) + #threading.Thread(target=rc.input_handling).start() threading.Thread(target=rc.controller).start() rc.ani()