From 2f081faee8144a81a5576e0242de98f39dd93de6 Mon Sep 17 00:00:00 2001 From: Simon Pirkelmann Date: Wed, 18 Nov 2020 21:38:41 +0100 Subject: [PATCH] restructured controller to work with mpc --- remote_control/controller.py | 5 +- remote_control/mpc_controller.py | 195 +++++++------------------------ remote_control/pid_controller.py | 11 +- 3 files changed, 50 insertions(+), 161 deletions(-) diff --git a/remote_control/controller.py b/remote_control/controller.py index bd6a128..55809f2 100644 --- a/remote_control/controller.py +++ b/remote_control/controller.py @@ -18,9 +18,9 @@ class ControllerBase: control = self.compute_control(state) - self.apply_control(control) + if self.controlling: + self.apply_control(control) - time.sleep(self.control_rate) self.apply_control((0.0, 0.0)) # stop robot def set_target_position(self, target_pos): @@ -42,6 +42,7 @@ class ControllerBase: else: raise Exception("error: controller cannot apply control!\n" " there is no robot attached to the controller!") + time.sleep(self.control_rate) def attach_robot(self, robot): self.robot = robot diff --git a/remote_control/mpc_controller.py b/remote_control/mpc_controller.py index 6f69696..33b38b8 100644 --- a/remote_control/mpc_controller.py +++ b/remote_control/mpc_controller.py @@ -1,175 +1,62 @@ import numpy as np import time +from controller import ControllerBase from casadi_opt import OpenLoopSolver -class MPCController: - def __init__(self, estimator): - self.t = time.time() +class MPCController(ControllerBase): + def __init__(self): + super().__init__() + self.t = None - self.estimator = estimator - self.controlling = False - - self.mstep = 2 self.ols = OpenLoopSolver(N=20, T=1.0) self.ols.setup() - self.dt = self.ols.T / self.ols.N + self.control_rate = self.ols.T / self.ols.N + + self.mstep = 2 # integrator self.omega_max = 5.0 self.control_scaling = 0.4 - def move_to_pos(self, target_pos, robot, near_target_counter=5): - near_target = 0 - while near_target < near_target_counter: - while not self.estimator.event_queue.empty(): - event = self.estimator.event_queue.get() - print("event: ", event) - if event[0] == 'click': - pass - elif event[0] == 'key': - key = event[1] + def set_target_position(self, target_pos): + super(MPCController, self).set_target_position(target_pos) + self.t = time.time() - if key == 84: # arrow up - self.controlling = True - self.t = time.time() - elif key == 82: # arrow down - self.controlling = False - robot.send_cmd() - elif key == 48: # 0 - target_pos = np.array([0.0, 0.0, 0.0]) - elif key == 43: # + - self.control_scaling += 0.1 - self.control_scaling = min(self.control_scaling, 1.0) - print("control scaling = ", self.control_scaling) - elif key == 45: # - - self.control_scaling -= 0.1 - self.control_scaling = max(self.control_scaling, 0.1) - print("control scaling = ", self.control_scaling) - elif key == 113: - print("quit!") - self.controlling = False - robot.send_cmd() - return - elif key == 27: # escape - print("quit!") - self.controlling = False - robot.send_cmd() - return + def compute_control(self, state): + x_pred = state - x_pred = self.get_measurement(robot.id) + error_pos = np.linalg.norm(x_pred[0:2] - self.target_pos[0:2]) + angles_unwrapped = np.unwrap([x_pred[2], self.target_pos[2]]) # unwrap angle to avoid jump in data + error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1]) - if x_pred is not None: - error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2]) - angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data - error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1]) - # print("error pos = ", error_pos) - # print("error_pos = {}, error_ang = {}".format(error_pos, error_ang)) + # if error_pos > 0.075 or error_ang > 0.35: + if error_pos > 0.05: + # solve mpc open loop problem + res = self.ols.solve(x_pred, self.target_pos) - # if error_pos > 0.075 or error_ang > 0.35: - if error_pos > 0.05 or error_ang > 0.1: - # solve mpc open loop problem - res = self.ols.solve(x_pred, target_pos) + us1 = res[0] * self.control_scaling + us2 = res[1] * self.control_scaling - # us1 = res[0] - # us2 = res[1] - us1 = res[0] * self.control_scaling - us2 = res[1] * self.control_scaling - # print("u = {}", (us1, us2)) + dt_mpc = time.time() - self.t + if dt_mpc < self.control_rate: # wait until next control can be applied + time.sleep(self.control_rate - dt_mpc) + else: + us1 = [0] * self.mstep + us2 = [0] * self.mstep - # print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start)) - dt_mpc = time.time() - self.t - if dt_mpc < self.dt: # wait until next control can be applied - # print("sleeping for {} seconds...".format(self.dt - dt_mpc)) - time.sleep(self.dt - dt_mpc) - else: - us1 = [0] * self.mstep - us2 = [0] * self.mstep - near_target += 1 + return us1, us2 - # send controls to the robot - for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1 - u1 = us1[i] - u2 = us2[i] - robot.send_cmd(u1, u2) - if i < self.mstep: - time.sleep(self.dt) - self.t = time.time() # save time the most recent control was applied - else: - print("robot not detected yet!") - - def interactive_control(self, robots): - controlled_robot_number = 0 - robot = robots[controlled_robot_number] - - target_pos = np.array([0.0, 0.0, 0.0]) - - running = True - while running: - # handle events from opencv window - while not self.estimator.event_queue.empty(): - event = self.estimator.event_queue.get() - print("event: ", event) - if event[0] == 'click': - target_pos = event[1] - elif event[0] == 'key': - key = event[1] - - if key == 32: # arrow up - self.controlling = not self.controlling - if not self.controlling: - print("disable control") - robot.send_cmd() # stop robot - else: - print("enable control") - self.t = time.time() - elif key == 48: # 0 - target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos - elif key == 43: # + - self.control_scaling += 0.1 - self.control_scaling = min(self.control_scaling, 1.0) - print("control scaling = ", self.control_scaling) - elif key == 45: # - - self.control_scaling -= 0.1 - self.control_scaling = max(self.control_scaling, 0.1) - print("control scaling = ", self.control_scaling) - elif key == 9: # TAB - # switch controlled robot - robot.send_cmd() # stop current robot - controlled_robot_number = (controlled_robot_number + 1) % len(robots) - robot = robots[controlled_robot_number] - print(f"controlled robot: {robot.id}") - elif key == 113 or key == 27: # q or ESCAPE - print("quit!") - self.controlling = False - robot.send_cmd() - return - - if self.controlling: - # measure state - x_pred = self.get_measurement(robot.id) - - # print(np.linalg.norm(x_pred-target_pos)) - - # solve mpc open loop problem - res = self.ols.solve(x_pred, target_pos) - - us1 = res[0] * self.control_scaling - us2 = res[1] * self.control_scaling - - dt_mpc = time.time() - self.t - if dt_mpc < self.dt: # wait until next control can be applied - time.sleep(self.dt - dt_mpc) - - # send controls to the robot - for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1 - u1 = us1[i] - u2 = us2[i] - robot.send_cmd(u1, u2) - if i < self.mstep: - time.sleep(self.dt) - self.t = time.time() # save time the most recent control was applied - - def get_measurement(self, robot_id): - return self.estimator.get_robot_state_estimate(robot_id) + def apply_control(self, control): + if self.robot is not None: + for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1 + u1 = control[0][i] + u2 = control[1][i] + self.robot.send_cmd(u1, u2) + if i < self.mstep: + time.sleep(self.control_rate) + self.t = time.time() # save time the most recent control was applied + else: + raise Exception("error: controller cannot apply control!\n" + " there is no robot attached to the controller!") \ No newline at end of file diff --git a/remote_control/pid_controller.py b/remote_control/pid_controller.py index dc46e9a..28417e1 100644 --- a/remote_control/pid_controller.py +++ b/remote_control/pid_controller.py @@ -10,8 +10,6 @@ class PIDController(ControllerBase): super().__init__() self.t = None - self.controlling = False - self.P_angle = 0.4 self.I_angle = 0.35 self.D_angle = 0.1 @@ -46,7 +44,7 @@ class PIDController(ControllerBase): if self.t is None: dt = 0.1 else: - dt = time.time() - self.t + dt = time.time() - self.t # time since last control was applied if self.mode == 'angle': # compute angle such that robot faces to target point @@ -108,9 +106,12 @@ class PIDController(ControllerBase): self.e_pos_old = e_pos self.e_angle_old = e_angle - else: u1 = 0.0 u2 = 0.0 - self.t = time.time() # save time when the most recent control was applied return u1, u2 + + def apply_control(self, control): + super(PIDController, self).apply_control(control) + self.t = time.time() # save time when the most recent control was applied + time.sleep(self.control_rate)