RoboRally/remote_control/roborally.py

620 lines
20 KiB
Python
Raw Normal View History

2020-09-04 14:53:39 +00:00
# startup:
# roscore -> start ros
# rosparam set cv_camera/device_id 0 -> set appropriate camera device
# rosrun cv_camera cv_camera_node -> start the camera
# roslaunch aruco_detect aruco_detect.launch camera:=cv_camera image:=image_raw dictionary:=16 transport:= fiducial_len:=0.1 # aruco marker detection
# python fiducial_to_2d_pos_angle.py -> compute position and angle of markers in 2d plane
import sys
import rospy
import pygame
import numpy as np
import socket
import scipy.integrate
import copy
import threading
from copy import deepcopy
import matplotlib.pyplot as plt
import matplotlib.animation as anim
import matplotlib.patches as patch
from shapely.geometry import Polygon
import time
from casadi_opt import OpenLoopSolver
from marker_pos_angle.msg import id_pos_angle
from collections import OrderedDict
from argparse import ArgumentParser
class Robot:
def __init__(self, id, ip):
self.pos = None
self.orient = None
self.id = id
self.pos = None
self.euler = None
self.ip = ip
class Obstacle:
def __init__(self, id, radius):
self.id = id
self.pos = None
self.radius = radius
class Track:
def __init__(self):
# ids in clockwise direction
self.inner = OrderedDict()
self.inner[0] = None
self.inner[1] = None
self.inner[2] = None
self.inner[3] = None
self.outer = OrderedDict()
self.outer[4] = None
self.outer[5] = None
self.outer[6] = None
self.outer[7] = None
self.track_complete = False
self.inner_poly = None
self.outer_poly = None
def set_id(self, d):
if not self.track_complete:
if d.id in self.inner:
print("Detected marker {} at pos {}".format(d.id, (d.x,d.y)))
self.inner[d.id] = (d.x, d.y)
elif d.id in self.outer:
print("Detected marker {} at pos {}".format(d.id, (d.x, d.y)))
self.outer[d.id] = (d.x, d.y)
else:
print("Unknown marker!")
else:
return
if not None in self.inner.values() and not None in self.outer.values():
print("Track marker positions detected!")
self.track_complete = True
self.inner_poly = Polygon(self.inner.values())
self.outer_poly = Polygon(self.outer.values())
def plot_track(self):
if self.track_complete:
plt.figure(2)
x_in, y_in = self.inner_poly.exterior.xy
x_out, y_out = self.outer_poly.exterior.xy
plt.plot(x_in, y_in)
plt.plot(x_out, y_out)
else:
print("plot is not complete yet!")
def f_ode(t, x, u):
# dynamical model of the two-wheeled robot
# TODO: find exact values for these parameters
r = 0.03
R = 0.05
d = 0.02
theta = x[2]
omega_r = u[0]
omega_l = u[1]
dx = np.zeros(3)
dx[0] = (r/2.0 * np.cos(theta) - r*d/(2.0*R) * np.sin(theta)) * omega_r \
+ (r/2.0 * np.cos(theta) + r*d/(2.0 * R) * np.sin(theta)) * omega_l
dx[1] = (r/2.0 * np.sin(theta) + r*d/(2.0*R) * np.cos(theta)) * omega_r \
+ (r/2 * np.sin(theta) - r*d/(2.0*R) * np.cos(theta)) * omega_l
dx[2] = -r/(2.0*R) * (omega_r - omega_l)
return dx
class RemoteController:
def __init__(self, id, ip):
self.anim_stopped = False
#self.robots = [Robot(14, '192.168.1.103')]
#self.robots = [Robot(15, '192.168.1.102')]
self.robots = [Robot(id, ip)]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
self.track = Track()
# connect to robot
self.rc_socket = socket.socket()
#self.rc_socket = None
try:
for r in self.robots:
self.rc_socket.connect((r.ip, 1234)) # connect to robot
except socket.error:
print("could not connect to socket")
sys.exit(1)
self.t = time.time()
# variables for simulated state
self.x0 = None
self.ts = np.array([])
self.xs = []
# variables for measurements
self.tms_0 = None
self.xm_0 = None
self.tms = None
self.xms = None
# variable for mpc open loop
self.ol_x = None
self.ol_y = None
self.mutex = threading.Lock()
# ROS subscriber for detected markers
marker_sub = rospy.Subscriber("/marker_id_pos_angle", id_pos_angle, self.measurement_callback)
# pid parameters
self.controlling = False
# currently active control
self.u1 = 0.0
self.u2 = 0.0
# animation
self.fig = plt.figure()
self.ax = self.fig.add_subplot(2,2,1)
self.ax2 = self.fig.add_subplot(2, 2, 2)
self.ax3 = self.fig.add_subplot(2, 2, 4)
self.xdata, self.ydata = [], []
self.line, = self.ax.plot([],[], color='grey', linestyle=':')
self.line_sim, = self.ax.plot([], [])
self.line_ol, = self.ax.plot([],[], color='green', linestyle='--')
self.dirm, = self.ax.plot([], [])
self.dirs, = self.ax.plot([], [])
self.line_x, = self.ax2.plot([],[])
self.line_y, = self.ax3.plot([], [])
self.track_line_inner, = self.ax.plot([], [])
self.track_line_outer, = self.ax.plot([], [])
self.ax.set_xlabel('x-position')
self.ax.set_ylabel('y-position')
self.ax.grid()
self.ax2.set_xlabel('Zeit t')
self.ax2.set_ylabel('x-position')
self.ax2.grid()
self.ax3.set_xlabel('Zeit t')
self.ax3.set_ylabel('y-position')
self.ax3.grid()
self.mstep = 2
self.ols = OpenLoopSolver(N=20, T=1.0)
self.ols.setup()
self.dt = self.ols.T / self.ols.N
self.target = (0.0, 0.0, 0.0)
# integrator
self.r = scipy.integrate.ode(f_ode)
self.omega_max = 5.0
#self.omega_max = 13.32
def ani(self):
print("starting animation")
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
plt.ion()
plt.show(block=True)
def ani_init(self):
self.ax.set_xlim(-2, 2)
self.ax.set_ylim(-2, 2)
self.ax.set_aspect('equal', adjustable='box')
self.ax2.set_ylim(-2, 2)
self.ax2.set_xlim(0, 10)
self.ax3.set_ylim(-2, 2)
self.ax3.set_xlim(0, 10)
self.track_line_inner.set_data(self.track.inner_poly.exterior.xy)
self.track_line_outer.set_data(self.track.outer_poly.exterior.xy)
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol,\
self.track_line_inner, self.track_line_outer, self.line_x,self.line_y,
def ani_update(self, frame):
if self.anim_stopped:
self.ani.event_source.stop()
sys.exit(0)
#print("plotting")
self.mutex.acquire()
try:
# copy data for plot from global arrays
if self.tms is not None:
tm_local = deepcopy(self.tms)
xm_local = deepcopy(self.xms)
if len(tm_local) > 0:
# plot path of the robot
self.line.set_data(xm_local[:,0], xm_local[:,1])
# compute and plot direction the robot is facing
a = xm_local[-1, 0]
b = xm_local[-1, 1]
a2 = a + np.cos(xm_local[-1, 2]) * 0.2
b2 = b + np.sin(xm_local[-1, 2]) * 0.2
self.dirm.set_data(np.array([a, a2]), np.array([b, b2]))
n_plot = 300
if len(tm_local) > n_plot:
# plot x and y coordinate
self.line_x.set_data(tm_local[-n_plot:] - (tm_local[-1] - 10), xm_local[-n_plot:,0])
self.line_y.set_data(tm_local[-n_plot:] - (tm_local[-1] - 10), xm_local[-n_plot:, 1])
ts_local = deepcopy(self.ts)
xs_local = deepcopy(self.xs)
if len(ts_local) > 0:
# plot simulated path of the robot
self.line_sim.set_data(xs_local[:,0], xs_local[:,1])
# compute and plot direction the robot is facing
a = xs_local[-1, 0]
b = xs_local[-1, 1]
a2 = a + np.cos(xs_local[-1, 2]) * 0.2
b2 = b + np.sin(xs_local[-1, 2]) * 0.2
self.dirs.set_data(np.array([a, a2]), np.array([b, b2]))
ol_x_local = deepcopy(self.ol_x)
ol_y_local = deepcopy(self.ol_y)
if ol_x_local is not None:
self.line_ol.set_data(ol_x_local, ol_y_local)
else:
self.line_ol.set_data([],[])
finally:
self.mutex.release()
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol, self.track_line_inner, self.track_line_outer,\
self.line_x, self.line_y,
def measurement_callback(self, data):
#print(data)
# detect robots
if data.id in self.robot_ids:
r = self.robot_ids[data.id]
r.pos = (data.x, data.y) # only x and y component are important for us
r.euler = data.angle
# save measured position and angle for plotting
measurement = np.array([r.pos[0], r.pos[1], r.euler])
if self.tms_0 is None:
self.tms_0 = time.time()
self.xm_0 = measurement
self.mutex.acquire()
try:
self.tms = np.array([0.0])
self.xms = measurement
finally:
self.mutex.release()
else:
self.mutex.acquire()
try:
self.tms = np.vstack((self.tms, time.time() - self.tms_0))
self.xms = np.vstack((self.xms, measurement))
finally:
self.mutex.release()
# detect track
if data.id in self.track.inner.keys() or data.id in self.track.outer.keys():
self.track.set_id(data)
def controller(self):
print("starting control")
targets = {}
markers_in = self.track.inner.values()
markers_out = self.track.outer.values()
# find targets:
# generate waypoints
# lamb = 0.5
# j = 0
# for i in range(0,4):
# p = np.array(markers_in[i]) + lamb * (np.array(markers_out[i]) - np.array(markers_in[i]))
# targets[j] = (p[0],p[1], 0.0)
# j += 1
# if i < 3:
# mean_in = (np.array(markers_in[i]) + np.array(markers_in[i+1])) * 0.5
# mean_out = (np.array(markers_out[i]) + np.array(markers_out[i+1])) * 0.5
# mean = mean_in + (1.0 - lamb) * (mean_out - mean_in)
# targets[j] = (mean[0], mean[1], 0.0)
# j += 1
# final connection between first and last marker
#mean_in = (np.array(markers_in[3]) + np.array(markers_in[0])) * 0.5
#mean_out = (np.array(markers_out[3]) + np.array(markers_out[0])) * 0.5
#mean = mean_in + (1.0 - lamb) * (mean_out - mean_in)
#targets[j] = (mean[0], mean[1], 0.0)
grid_pos = (0,0, 0)
target_pos = np.array((0.0, 0.0, 0.0))
auto_control = False
current_target = 0
control_scaling = 0.4
self.pid = False
self.mpc = True
integ = 0.0
while True:
# open loop controller
events = pygame.event.get()
move = 0
turn = 0
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_UP:
self.controlling = True
self.t = time.time()
elif event.key == pygame.K_DOWN:
self.controlling = False
if self.rc_socket:
self.rc_socket.send('(0.0,0.0)\n')
elif event.key == pygame.K_0:
grid_pos = (0,0, 0)
elif event.key == pygame.K_w:
move = 1
turn = 0
elif event.key == pygame.K_s:
move = -1
turn = 0
elif event.key == pygame.K_a:
turn = 1
move = 0
integ = 0
elif event.key == pygame.K_d:
turn = -1
move = 0
integ = 0
elif event.key == pygame.K_p:
self.pid = True
elif event.key == pygame.K_SPACE:
auto_control = not auto_control
if auto_control:
self.target = targets[current_target]
elif event.key == pygame.K_PLUS:
control_scaling += 0.1
control_scaling = min(control_scaling, 1.0)
elif event.key == pygame.K_MINUS:
control_scaling -= 0.1
control_scaling = max(control_scaling, 0.3)
elif event.key == pygame.K_ESCAPE:
print("quit!")
self.controlling = False
if self.rc_socket:
self.rc_socket.send('(0.0,0.0)\n')
self.anim_stopped = True
return
elif event.key == pygame.QUIT:
print("quit!")
self.controlling = False
if self.rc_socket:
self.rc_socket.send('(0.0,0.0)\n')
self.anim_stopped = True
return
# compute new grid position and orientation
if move != 0:
new_x = grid_pos[0] + move * np.cos(grid_pos[2])
new_y = grid_pos[1] + move * np.sin(grid_pos[2])
new_angle = grid_pos[2]
grid_pos = (new_x, new_y, new_angle)
print(grid_pos)
elif turn != 0:
new_x = grid_pos[0]
new_y = grid_pos[1]
new_angle = grid_pos[2] + turn * np.pi/2
grid_pos = (new_x, new_y, new_angle)
print(grid_pos)
self.target = np.array((0.25*grid_pos[0], 0.25*grid_pos[1], grid_pos[2]))
if self.controlling:
if self.mpc:
x_pred = self.get_measurement_prediction()
tmpc_start = time.time()
error_pos = np.linalg.norm(x_pred[0:2] - self.target[0:2])
error_ang = np.abs(x_pred[2] - self.target[2])
print("error pos = ", error_pos, " error_ang = ", error_ang)
turning = turn != 0
if error_pos > 0.1 or error_ang > 0.4:
# solve mpc open loop problem
res = self.ols.solve(x_pred, self.target, [], turning)
#us1 = res[0]
#us2 = res[1]
us1 = res[0] * control_scaling
us2 = res[1] * control_scaling
#print("u = {}", (us1, us2))
# save open loop trajectories for plotting
self.mutex.acquire()
try:
self.ol_x = res[2]
self.ol_y = res[3]
finally:
self.mutex.release()
tmpc_end = time.time()
#print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
#print("sleeping for {} seconds...".format(self.dt - dt_mpc))
time.sleep(self.dt - dt_mpc)
else:
us1 = [0] * self.mstep
us2 = [0] * self.mstep
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
if self.rc_socket:
self.rc_socket.send('({},{})\n'.format(u1, u2))
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
else:
if self.pid:
x_pred = self.get_measurement()
# compute angle difference
d_angle = x_pred[2] - self.target[2]
dt = time.time() - self.t
integ += d_angle * dt
#print(d_angle)
K = 0.2
I = 0.15
pp = d_angle * K
ii = integ * I
u1 = pp + ii
u2 = -u1
print("e = {}, dt = {}, P = {}, I = {}, u = {}".format(d_angle, dt, pp, ii, (u1,u2)))
self.t = time.time()
self.rc_socket.send('({},{})\n'.format(u1, u2))
time.sleep(0.025)
#self.rc_socket.send('({},{})\n'.format(0, 0))
#time.sleep(0.1)
#self.pid = False
def get_measurement_prediction(self):
# get measurement
self.mutex.acquire()
try:
window = 3
last_measurement = copy.deepcopy(self.xms[-window:])
#print("last_measurements = {}".format(last_measurement))
#print("mean = {}".format(np.mean(last_measurement, axis=0)))
last_measurement = np.mean(last_measurement, axis=0)
last_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
# prediction of state at time the mpc will terminate
self.r.set_f_params(np.array([self.u1 * self.omega_max, self.u2 * self.omega_max]))
self.r.set_initial_value(last_measurement, last_time)
x_pred = self.r.integrate(self.r.t + self.dt)
return x_pred
def get_measurement(self):
self.mutex.acquire()
try:
last_measurement = copy.deepcopy(self.xms[-1:])
finally:
self.mutex.release()
return last_measurement[0]
def pos_getter(self):
while True:
x_pred = self.get_measurement_prediction()
print("pos = ", x_pred)
def main(args):
parser = ArgumentParser()
parser.add_argument('id', metavar='id', type=str, help='marker id of the controlled robot')
parser.add_argument('ip', metavar='ip', type=str, help='ip address of the controlled robot')
args = parser.parse_args()
marker_id = int(args.id)
ip = args.ip
rospy.init_node('controller_node', anonymous=True)
rc = RemoteController(marker_id, ip)
pygame.init()
screenheight = 480
screenwidth = 640
pygame.display.set_mode([screenwidth, screenheight])
# print("waiting until track is completely detected")
# while not rc.track.track_complete:
# pass
#threading.Thread(target=rc.input_handling).start()
controller_thread = threading.Thread(target=rc.controller)
controller_thread.start()
#time.sleep(10)
#rc.ani()
if __name__ == '__main__':
main(sys.argv)