RoboRally/remote_control/position_controller.py

516 lines
18 KiB
Python

# startup:
# roscore -> start ros
# rosparam set cv_camera/device_id 0 -> set appropriate camera device
# rosrun cv_camera cv_camera_node -> start the camera
# roslaunch aruco_detect aruco_detect.launch camera:=cv_camera image:=image_raw dictionary:=16 transport:= fiducial_len:=0.1 # aruco marker detection
# python fiducial_to_2d_pos_angle.py -> compute position and angle of markers in 2d plane
import sys
import rospy
import pygame
import numpy as np
import socket
import scipy.integrate
import copy
import threading
from copy import deepcopy
import matplotlib.pyplot as plt
import matplotlib.animation as anim
import time
from casadi_opt import OpenLoopSolver
from marker_pos_angle.msg import id_pos_angle
class Robot:
def __init__(self, id, ip=None):
self.pos = None
self.orient = None
self.id = id
self.pos = None
self.euler = None
self.ip = ip
def f_ode(t, x, u):
# dynamical model of the two-wheeled robot
# TODO: find exact values for these parameters
r = 0.03
R = 0.05
d = 0.02
theta = x[2]
omega_r = u[0]
omega_l = u[1]
dx = np.zeros(3)
dx[0] = (r/2.0 * np.cos(theta) - r*d/(2.0*R) * np.sin(theta)) * omega_r \
+ (r/2.0 * np.cos(theta) + r*d/(2.0 * R) * np.sin(theta)) * omega_l
dx[1] = (r/2.0 * np.sin(theta) + r*d/(2.0*R) * np.cos(theta)) * omega_r \
+ (r/2 * np.sin(theta) - r*d/(2.0*R) * np.cos(theta)) * omega_l
dx[2] = -r/(2.0*R) * (omega_r - omega_l)
return dx
class RemoteController:
def __init__(self):
self.robots = [Robot(5)]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
# connect to robot
self.rc_socket = socket.socket()
try:
pass
self.rc_socket.connect(('192.168.1.103', 1234)) # connect to robot
except socket.error:
print("could not connect to socket")
self.t = time.time()
# variables for simulated state
self.x0 = None
self.ts = np.array([])
self.xs = []
# variables for measurements
self.tms_0 = None
self.xm_0 = None
self.tms = None
self.xms = None
self.mutex = threading.Lock()
marker_sub = rospy.Subscriber("/marker_id_pos_angle", id_pos_angle, self.measurement_callback)
# pid parameters
self.k = 0
self.ii = 0.1
self.pp = 0.4
self.inc = 0.0
self.alphas = []
self.speed = 1.0
self.controlling = False
self.u1 = 0.0
self.u2 = 0.0
# animation
self.fig = plt.figure()
self.ax = self.fig.add_subplot(1,1,1)
self.xdata, self.ydata = [], []
self.line, = self.ax.plot([],[])
self.line_sim, = self.ax.plot([], [])
self.dirm, = self.ax.plot([], [])
self.dirs, = self.ax.plot([], [])
plt.xlabel('x-position')
plt.ylabel('y-position')
plt.grid()
self.ols = OpenLoopSolver()
self.ols.setup()
self.target = (0.0, 0.0, 0.0)
def ani(self):
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
plt.ion()
plt.show(block=True)
def ani_init(self):
self.ax.set_xlim(-2, 2)
self.ax.set_ylim(-2, 2)
self.ax.set_aspect('equal', adjustable='box')
return self.line, self.line_sim, self.dirm, self.dirs,
def ani_update(self, frame):
#print("plotting")
self.mutex.acquire()
try:
# copy data for plot from global arrays
if self.tms is not None:
tm_local = deepcopy(self.tms)
xm_local = deepcopy(self.xms)
if len(tm_local) > 0:
# plot path of the robot
self.line.set_data(xm_local[:,0], xm_local[:,1])
# compute and plot direction the robot is facing
a = xm_local[-1, 0]
b = xm_local[-1, 1]
a2 = a + np.cos(xm_local[-1, 2]) * 1.0
b2 = b + np.sin(xm_local[-1, 2]) * 1.0
self.dirm.set_data(np.array([a, a2]), np.array([b, b2]))
ts_local = deepcopy(self.ts)
xs_local = deepcopy(self.xs)
if len(ts_local) > 0:
# plot simulated path of the robot
self.line_sim.set_data(xs_local[:,0], xs_local[:,1])
# compute and plot direction the robot is facing
a = xs_local[-1, 0]
b = xs_local[-1, 1]
a2 = a + np.cos(xs_local[-1, 2]) * 1.0
b2 = b + np.sin(xs_local[-1, 2]) * 1.0
self.dirs.set_data(np.array([a, a2]), np.array([b, b2]))
finally:
self.mutex.release()
return self.line, self.line_sim, self.dirm, self.dirs,
def measurement_callback(self, data):
#print("data = {}".format(data))
if data.id in self.robot_ids:
r = self.robot_ids[data.id]
r.pos = (data.x, data.y) # only x and y component are important for us
r.euler = data.angle
#print("r.pos = {}".format(r.pos))
#print("r.angle = {}".format(r.euler))
# save measured position and angle for plotting
measurement = np.array([r.pos[0], r.pos[1], r.euler])
if self.tms_0 is None:
self.tms_0 = time.time()
self.xm_0 = measurement
self.mutex.acquire()
try:
self.tms = np.array([0.0])
self.xms = measurement
finally:
self.mutex.release()
else:
self.mutex.acquire()
try:
self.tms = np.vstack((self.tms, time.time() - self.tms_0))
self.xms = np.vstack((self.xms, measurement))
finally:
self.mutex.release()
def controller(self):
tgrid = None
us1 = None
us2 = None
u1 = -0.0
u2 = 0.0
r = scipy.integrate.ode(f_ode)
omega_max = 5.0
init_pos = None
init_time = None
final_pos = None
final_time = None
forward = True
print("starting control")
while True:
keyboard_control = False
keyboard_control_speed_test = False
pid = False
open_loop_solve = True
if keyboard_control: # keyboard controller
events = pygame.event.get()
speed = 1.0
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
self.u1 = -speed
self.u2 = speed
#print("turn left: ({},{})".format(u1, u2))
elif event.key == pygame.K_RIGHT:
self.u1 = speed
self.u2 = -speed
#print("turn right: ({},{})".format(u1, u2))
elif event.key == pygame.K_UP:
self.u1 = speed
self.u2 = speed
#print("forward: ({},{})".format(self.u1, self.u2))
elif event.key == pygame.K_DOWN:
self.u1 = -speed
self.u2 = -speed
#print("forward: ({},{})".format(u1, u2))
self.rc_socket.send('({},{},{})\n'.format(0.1, self.u1, self.u2))
elif event.type == pygame.KEYUP:
self.u1 = 0
self.u2 = 0
#print("key released, resetting: ({},{})".format(u1, u2))
self.rc_socket.send('({}, {},{})\n'.format(0.1, self.u1, self.u2))
tnew = time.time()
dt = tnew - self.t
r = scipy.integrate.ode(f_ode)
r.set_f_params(np.array([self.u1 * omega_max, self.u2 * omega_max]))
#print(self.x0)
if self.x0 is None:
if self.xm_0 is not None:
self.x0 = self.xm_0
self.xs = self.x0
else:
print("error: no measurement available to initialize simulation")
x = self.x0
r.set_initial_value(x, self.t)
xnew = r.integrate(r.t + dt)
self.t = tnew
self.x0 = xnew
self.mutex.acquire()
try:
self.ts = np.append(self.ts, tnew)
self.xs = np.vstack((self.xs, xnew))
finally:
self.mutex.release()
elif keyboard_control_speed_test:
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_1:
self.controlling = True
forward = True
print("starting test")
self.mutex.acquire()
try:
init_pos = copy.deepcopy(self.xms[-1])
init_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
if event.key == pygame.K_2:
self.controlling = True
forward = False
print("starting test")
self.mutex.acquire()
try:
init_pos = copy.deepcopy(self.xms[-1])
init_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
elif event.key == pygame.K_3:
self.controlling = False
print("stopping test")
self.rc_socket.send('(0.1, 0.0,0.0)\n')
self.mutex.acquire()
try:
final_pos = copy.deepcopy(self.xms[-1])
final_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
print("init_pos = {}".format(init_pos))
print("final_pos = {}".format(final_pos))
print("distance = {}".format(np.linalg.norm(init_pos[0:2]-final_pos[0:2])))
print("dt = {}".format(final_time - init_time))
d = np.linalg.norm(init_pos[0:2]-final_pos[0:2])
t = final_time - init_time
r = 0.03
angular_velocity = d/r/t
print("average angular velocity = {}".format(angular_velocity))
if self.controlling:
if forward:
self.rc_socket.send('(0.1, 1.0,1.0)\n')
else:
self.rc_socket.send('(0.1, -1.0,-1.0)\n')
time.sleep(0.1)
#print("speed = {}".format(self.speed))
elif pid:
# pid controller
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
self.ii = self.ii / np.sqrt(np.sqrt(np.sqrt(10.0)))
print("ii = {}".format(self.pp))
elif event.key == pygame.K_RIGHT:
self.ii = self.ii * np.sqrt(np.sqrt(np.sqrt(10.0)))
print("ii = {}".format(self.pp))
elif event.key == pygame.K_UP:
self.controlling = True
elif event.key == pygame.K_DOWN:
self.controlling = False
self.rc_socket.send('({},{})\n'.format(0, 0))
dt = 0.05
if self.controlling:
# test: turn robot such that angle is zero
for r in self.robots:
if r.euler is not None:
self.k = self.k + 1
alpha = r.euler
self.alphas.append(alpha)
# compute error
e = alpha - 0
# compute integral of error (approximately)
self.inc += e * dt
# PID
p = self.pp * e
i = self.ii * self.inc
d = 0.0
# compute controls for robot from PID
u1 = p + i + d
u2 = - p - i - d
print("alpha = {}, u = ({}, {})".format(alpha, u1, u2))
self.rc_socket.send('({},{})\n'.format(u1, u2))
time.sleep(dt)
elif open_loop_solve:
# open loop controller
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_UP:
self.controlling = True
self.t = time.time()
elif event.key == pygame.K_DOWN:
self.controlling = False
self.rc_socket.send('(0.1, 0.0,0.0)\n')
elif event.key == pygame.K_0:
self.target = (0.0, 0.0, 0.0)
elif event.key == pygame.K_1:
self.target = (0.5,0.5, -np.pi/2.0)
elif event.key == pygame.K_2:
self.target = (0.5, -0.5, 0.0)
elif event.key == pygame.K_3:
self.target = (-0.5,-0.5, np.pi/2.0)
elif event.key == pygame.K_4:
self.target = (-0.5,0.5, 0.0)
if self.controlling:
tmpc_start = time.time()
# get measurement
self.mutex.acquire()
try:
last_measurement = copy.deepcopy(self.xms[-1])
last_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
print("current measurement (t, x) = ({}, {})".format(last_time, last_measurement))
print("current control (u1, u2) = ({}, {})".format(u1, u2))
# prediction of state at time the mpc will terminate
r.set_f_params(np.array([u1 * omega_max, u2 * omega_max]))
r.set_initial_value(last_measurement, last_time)
dt = self.ols.T/self.ols.N
print("integrating for {} seconds".format((dt)))
x_pred = r.integrate(r.t + (dt))
print("predicted initial state x_pred = ({})".format(x_pred))
res = self.ols.solve(x_pred, self.target)
#tgrid = res[0]
us1 = res[0]
us2 = res[1]
# tt = 0
# x = last_measurement
# t_ol = np.array([tt])
# x_ol = np.array([x])
# # compute open loop prediction
# for i in range(len(us1)):
# r = scipy.integrate.ode(f_ode)
# r.set_f_params(np.array([us1[i] * 13.32, us2[i] * 13.32]))
# r.set_initial_value(x, tt)
#
# tt = tt + 0.1
# x = r.integrate(tt)
#
# t_ol = np.vstack((t_ol, tt))
# x_ol = np.vstack((x_ol, x))
#plt.figure(4)
#plt.plot(x_ol[:,0], x_ol[:,1])
#if event.key == pygame.K_DOWN:
# if tgrid is not None:
tmpc_end = time.time()
print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < dt:
print("sleeping for {} seconds...".format(dt - dt_mpc))
time.sleep(dt - dt_mpc)
self.mutex.acquire()
try:
second_measurement = copy.deepcopy(self.xms[-1])
second_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
print("(last_time, second_time, dt) = ({}, {}, {})".format(last_time, second_time, second_time - last_time))
print("mismatch between predicted state and measured state: {}\n\n".format(second_measurement - last_measurement))
for i in range(0, 1):
u1 = us1[i]
u2 = us2[i]
self.rc_socket.send('({},{},{})\n'.format(dt,u1, u2))
self.t = time.time()
#time.sleep(0.2)
#
pass
def main(args):
rospy.init_node('controller_node', anonymous=True)
rc = RemoteController()
pygame.init()
screenheight = 480
screenwidth = 640
screen = pygame.display.set_mode([screenwidth, screenheight])
threading.Thread(target=rc.controller).start()
rc.ani()
if __name__ == '__main__':
main(sys.argv)