Compare commits
6 Commits
67405fbd3f
...
15da18f949
Author | SHA1 | Date | |
---|---|---|---|
15da18f949 | |||
d985b830b9 | |||
c6582c9e6c | |||
cfa669e498 | |||
24cf87e9dd | |||
9c222b1099 |
micropython_firmware
remote_control
|
@ -2,7 +2,7 @@ import machine
|
|||
import sys
|
||||
from machine import I2C, Pin
|
||||
|
||||
i2c = False
|
||||
i2c = True
|
||||
if i2c:
|
||||
import d1motor
|
||||
else:
|
||||
|
@ -13,14 +13,13 @@ import usocket
|
|||
import esp
|
||||
|
||||
class Robot:
|
||||
def __init__(self):
|
||||
def __init__(self, i2c=True):
|
||||
if i2c:
|
||||
print("setting up I2C ...")
|
||||
#d1 = Pin(5)
|
||||
#d2 = Pin(4)
|
||||
#i2c = I2C(scl=d1, sda=d2)
|
||||
#i2c_scan = i2c.scan()
|
||||
i2c_scan = []
|
||||
d1 = Pin(5)
|
||||
d2 = Pin(4)
|
||||
i2c = I2C(scl=d1, sda=d2)
|
||||
i2c_scan = i2c.scan()
|
||||
if len(i2c_scan) > 0:
|
||||
i2c_addr = i2c_scan[0]
|
||||
print("i2c scan = {}".format(i2c_addr))
|
||||
|
|
|
@ -138,7 +138,7 @@ class OpenLoopSolver:
|
|||
#return
|
||||
|
||||
|
||||
def solve(self, x0, target, obstacles):
|
||||
def solve(self, x0, target, obstacles, track):
|
||||
tstart = time.time()
|
||||
# alternative solution using multiple shooting (way faster!)
|
||||
self.opti = Opti() # Optimization problem
|
||||
|
@ -213,7 +213,7 @@ class OpenLoopSolver:
|
|||
# ---- path constraints -----------
|
||||
# limit = lambda pos: 1-sin(2*pi*pos)/2
|
||||
# self.opti.subject_to(speed<=limit(pos)) # track speed limit
|
||||
maxcontrol = 0.95
|
||||
maxcontrol = 0.9
|
||||
self.opti.subject_to(self.opti.bounded(-maxcontrol, self.U, maxcontrol)) # control is limited
|
||||
|
||||
# ---- boundary conditions --------
|
||||
|
@ -236,7 +236,17 @@ class OpenLoopSolver:
|
|||
if p is not None:
|
||||
for k in range(1,self.N):
|
||||
self.opti.subject_to((self.X[0,k]-p[0])**2 + (self.X[1,k]-p[1])**2 + self.slack > r**2)
|
||||
# pass
|
||||
# keep inside track
|
||||
# TODO
|
||||
# track_ids = track.inner.keys()
|
||||
# a = track.outer[track_ids[0]]
|
||||
# b = track.outer[track_ids[1]]
|
||||
# c = track.outer[track_ids[2]]
|
||||
# d = track.outer[track_ids[3]]
|
||||
#
|
||||
# for k in range(1, self.N):
|
||||
# self.opti.subject_to(self.opti.subject_to(self.X))
|
||||
|
||||
posx = self.X[0, :]
|
||||
posy = self.X[1, :]
|
||||
angle = self.X[2, :]
|
||||
|
@ -250,8 +260,11 @@ class OpenLoopSolver:
|
|||
|
||||
tstart = time.time()
|
||||
if self.use_warmstart and self.opti_x0 is not None:
|
||||
try:
|
||||
self.opti.set_initial(self.opti.lam_g, self.opti_lam_g0)
|
||||
self.opti.set_initial(self.opti.x, self.opti_x0)
|
||||
except RuntimeError:
|
||||
print("could not set warmstart")
|
||||
sol = self.opti.solve() # actual solve
|
||||
tend = time.time()
|
||||
print("solving the problem took {} seconds".format(tend - tstart))
|
||||
|
|
|
@ -13,15 +13,34 @@ class robot: # we have 4 arg for this class, because joysticks get the same (va
|
|||
self.robot0_stopped_3 = True
|
||||
self.robot0_stopped_4 = True
|
||||
self.rc_socket = socket.socket()
|
||||
self.u1 = 0
|
||||
self.u2 = 0
|
||||
try:
|
||||
self.rc_socket.connect((self.ip, self.port))
|
||||
except socket.error():
|
||||
print("couldn't connect to socket")
|
||||
|
||||
self.check_joystick = False
|
||||
def joystick_init(self): # Joystick's initialisation
|
||||
joystick_count = pygame.joystick.get_count()
|
||||
for count in range(joystick_count):
|
||||
if joystick_count == 2:
|
||||
self.joystick = pygame.joystick.Joystick(self.joy)
|
||||
print("{}-->joystick count".format(joystick_count))
|
||||
self.joystick.init()
|
||||
self.axes = self.joystick.get_numaxes()
|
||||
self.check_joystick = True
|
||||
elif joystick_count == 1:
|
||||
joystick = pygame.joystick.Joystick(0)
|
||||
joystick.init()
|
||||
print("connected only 1 joystick - {}".format(joystick))
|
||||
self.check_joystick = False
|
||||
elif not joystick_count:
|
||||
print("no joysticks connected")
|
||||
self.check_joystick = False
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def control(self, event): # the control of two robots with joysticks
|
||||
joy = event.joy
|
||||
|
@ -74,55 +93,56 @@ class robot: # we have 4 arg for this class, because joysticks get the same (va
|
|||
self.robot0_stopped_4 = True
|
||||
|
||||
def control_keyboard(self, event): # keyboard control for robot1
|
||||
command_received = False
|
||||
if event.key == pygame.K_LEFT:
|
||||
|
||||
if pressed[pygame.K_LEFT]:
|
||||
u1 = -1.0
|
||||
u2 = 1.0
|
||||
command_received = True
|
||||
elif event.key == pygame.K_RIGHT:
|
||||
u1 = 1.0
|
||||
u2 = -1.0
|
||||
command_received = True
|
||||
elif event.key == pygame.K_UP:
|
||||
u1 = -1.0
|
||||
u2 = -1.0
|
||||
command_received = True
|
||||
elif event.key == pygame.K_DOWN:
|
||||
u1 = 1.0
|
||||
u2 = 1.0
|
||||
command_received = True
|
||||
if command_received:
|
||||
self.rc_socket.send('({},{})\n'.format(u1, u2).encode())
|
||||
|
||||
def control_keyboard_2(self, event): # keyboard control for robot2
|
||||
|
||||
command_received = False
|
||||
if event.key == pygame.K_a:
|
||||
u1 = -1.0
|
||||
u2 = 1.0
|
||||
command_received = True
|
||||
elif event.key == pygame.K_d:
|
||||
elif pressed[pygame.K_RIGHT]:
|
||||
u1 = 1.0
|
||||
u2 = -1.0
|
||||
command_received = True
|
||||
elif event.key == pygame.K_w:
|
||||
self.rc_socket.send('({},{})\n'.format(u1, u2).encode())
|
||||
|
||||
elif pressed[pygame.K_UP]:
|
||||
u1 = -1.0
|
||||
u2 = -1.0
|
||||
command_received = True
|
||||
elif event.key == pygame.K_s:
|
||||
self.rc_socket.send('({},{})\n'.format(u1, u2).encode())
|
||||
|
||||
elif pressed[pygame.K_DOWN]:
|
||||
u1 = 1.0
|
||||
u2 = 1.0
|
||||
command_received = True
|
||||
if command_received:
|
||||
self.rc_socket.send('({},{})\n'.format(u1, u2).encode())
|
||||
|
||||
def control_keyboard_stop(self): # stop for both robot
|
||||
elif event.type == pygame.KEYUP:
|
||||
self.rc_socket.send('({},{})\n'.format(self.u1, self.u2).encode())
|
||||
|
||||
u1 = 0
|
||||
u2 = 0
|
||||
print("key released, resetting: ({},{})".format(u1, u2))
|
||||
def control_keyboard_2(self, event): # keyboard control for robot1
|
||||
|
||||
if pressed[pygame.K_a]:
|
||||
u1 = -1.0
|
||||
u2 = 1.0
|
||||
self.rc_socket.send('({},{})\n'.format(u1, u2).encode())
|
||||
|
||||
elif pressed[pygame.K_d]:
|
||||
u1 = 1.0
|
||||
u2 = -1.0
|
||||
self.rc_socket.send('({},{})\n'.format(u1, u2).encode())
|
||||
|
||||
elif pressed[pygame.K_w]:
|
||||
u1 = -1.0
|
||||
u2 = -1.0
|
||||
self.rc_socket.send('({},{})\n'.format(u1, u2).encode())
|
||||
|
||||
elif pressed[pygame.K_s]:
|
||||
u1 = 1.0
|
||||
u2 = 1.0
|
||||
self.rc_socket.send('({},{})\n'.format(u1, u2).encode())
|
||||
|
||||
elif event.type == pygame.KEYUP:
|
||||
self.rc_socket.send('({},{})\n'.format(self.u1, self.u2).encode())
|
||||
|
||||
|
||||
def main():
|
||||
pygame.init()
|
||||
pygame.display.set_mode((640, 480))
|
||||
|
@ -131,19 +151,18 @@ def main():
|
|||
robot_1.joystick_init()
|
||||
robot_2 = robot(1, '192.168.1.103', 1234)
|
||||
robot_2.joystick_init()
|
||||
|
||||
while True:
|
||||
events = pygame.event.get()
|
||||
global pressed
|
||||
pressed = pygame.key.get_pressed()
|
||||
for event in events:
|
||||
if event.type == pygame.JOYAXISMOTION:
|
||||
robot_1.control(event)
|
||||
robot_2.control(event)
|
||||
elif event.type == pygame.KEYDOWN:
|
||||
else:
|
||||
robot_1.control_keyboard(event)
|
||||
robot_2.control_keyboard_2(event)
|
||||
elif event.type == pygame.KEYUP:
|
||||
robot_1.control_keyboard_stop()
|
||||
robot_2.control_keyboard_stop()
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -20,12 +20,16 @@ import matplotlib.pyplot as plt
|
|||
import matplotlib.animation as anim
|
||||
import matplotlib.patches as patch
|
||||
|
||||
from shapely.geometry import Polygon
|
||||
|
||||
import time
|
||||
|
||||
from casadi_opt import OpenLoopSolver
|
||||
|
||||
from marker_pos_angle.msg import id_pos_angle
|
||||
|
||||
from collections import OrderedDict
|
||||
|
||||
class Robot:
|
||||
def __init__(self, id, ip=None):
|
||||
self.pos = None
|
||||
|
@ -44,6 +48,57 @@ class Obstacle:
|
|||
self.pos = None
|
||||
self.radius = radius
|
||||
|
||||
class Track:
|
||||
def __init__(self):
|
||||
# ids in clockwise direction
|
||||
self.inner = OrderedDict()
|
||||
self.inner[1] = None
|
||||
self.inner[2] = None
|
||||
self.inner[3] = None
|
||||
self.inner[4] = None
|
||||
|
||||
self.outer = OrderedDict()
|
||||
self.outer[5] = None
|
||||
self.outer[6] = None
|
||||
self.outer[7] = None
|
||||
self.outer[8] = None
|
||||
|
||||
self.track_complete = False
|
||||
|
||||
self.inner_poly = None
|
||||
self.outer_poly = None
|
||||
|
||||
def set_id(self, d):
|
||||
if not self.track_complete:
|
||||
if d.id in self.inner:
|
||||
print("Detected marker {} at pos {}".format(d.id, (d.x,d.y)))
|
||||
self.inner[d.id] = (d.x, d.y)
|
||||
elif d.id in self.outer:
|
||||
print("Detected marker {} at pos {}".format(d.id, (d.x, d.y)))
|
||||
self.outer[d.id] = (d.x, d.y)
|
||||
else:
|
||||
print("Unknown marker!")
|
||||
else:
|
||||
return
|
||||
|
||||
if not None in self.inner.values() and not None in self.outer.values():
|
||||
print("Track marker positions detected!")
|
||||
self.track_complete = True
|
||||
|
||||
self.inner_poly = Polygon(self.inner.values())
|
||||
self.outer_poly = Polygon(self.outer.values())
|
||||
|
||||
def plot_track(self):
|
||||
if self.track_complete:
|
||||
plt.figure(2)
|
||||
x_in, y_in = self.inner_poly.exterior.xy
|
||||
x_out, y_out = self.outer_poly.exterior.xy
|
||||
plt.plot(x_in, y_in)
|
||||
plt.plot(x_out, y_out)
|
||||
else:
|
||||
print("plot is not complete yet!")
|
||||
|
||||
|
||||
def f_ode(t, x, u):
|
||||
# dynamical model of the two-wheeled robot
|
||||
# TODO: find exact values for these parameters
|
||||
|
@ -69,18 +124,21 @@ def f_ode(t, x, u):
|
|||
class RemoteController:
|
||||
def __init__(self):
|
||||
|
||||
self.robots = [Robot(3, '192.168.1.103')]
|
||||
self.robots = [Robot(15, '192.168.1.103')]
|
||||
#self.robots = [Robot(14, '192.168.1.102')]
|
||||
|
||||
self.robot_ids = {}
|
||||
for r in self.robots:
|
||||
self.robot_ids[r.id] = r
|
||||
|
||||
obst = [Obstacle(6, 0.175), Obstacle(5, 0.175), Obstacle(8, 0.175)]
|
||||
obst = [Obstacle(12, 0.275), Obstacle(10, 0.275), Obstacle(13, 0.275), Obstacle(14, 0.275)]
|
||||
|
||||
self.obstacles = {}
|
||||
for r in obst:
|
||||
self.obstacles[r.id] = r
|
||||
|
||||
self.track = Track()
|
||||
|
||||
# connect to robot
|
||||
self.rc_socket = socket.socket()
|
||||
#self.rc_socket = None
|
||||
|
@ -89,7 +147,7 @@ class RemoteController:
|
|||
self.rc_socket.connect((r.ip, 1234)) # connect to robot
|
||||
except socket.error:
|
||||
print("could not connect to socket")
|
||||
self.rc_socket = None
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
self.t = time.time()
|
||||
|
@ -123,7 +181,10 @@ class RemoteController:
|
|||
|
||||
# animation
|
||||
self.fig = plt.figure()
|
||||
self.ax = self.fig.add_subplot(1,1,1)
|
||||
self.ax = self.fig.add_subplot(2,2,1)
|
||||
self.ax2 = self.fig.add_subplot(2, 2, 2)
|
||||
self.ax3 = self.fig.add_subplot(2, 2, 4)
|
||||
|
||||
self.xdata, self.ydata = [], []
|
||||
self.line, = self.ax.plot([],[], color='grey', linestyle=':')
|
||||
self.line_sim, = self.ax.plot([], [])
|
||||
|
@ -131,6 +192,12 @@ class RemoteController:
|
|||
self.dirm, = self.ax.plot([], [])
|
||||
self.dirs, = self.ax.plot([], [])
|
||||
|
||||
self.line_x, = self.ax2.plot([],[])
|
||||
self.line_y, = self.ax3.plot([], [])
|
||||
|
||||
self.track_line_inner, = self.ax.plot([], [])
|
||||
self.track_line_outer, = self.ax.plot([], [])
|
||||
|
||||
self.circles = []
|
||||
for o in self.obstacles:
|
||||
self.circles.append(patch.Circle((0.0, 0.0), radius=0.1, fill=False, color='red', linestyle='--'))
|
||||
|
@ -138,11 +205,20 @@ class RemoteController:
|
|||
for s in self.circles:
|
||||
self.ax.add_artist(s)
|
||||
|
||||
plt.xlabel('x-position')
|
||||
plt.ylabel('y-position')
|
||||
plt.grid()
|
||||
self.ax.set_xlabel('x-position')
|
||||
self.ax.set_ylabel('y-position')
|
||||
self.ax.grid()
|
||||
|
||||
self.ols = OpenLoopSolver()
|
||||
self.ax2.set_xlabel('Zeit t')
|
||||
self.ax2.set_ylabel('x-position')
|
||||
self.ax2.grid()
|
||||
|
||||
self.ax3.set_xlabel('Zeit t')
|
||||
self.ax3.set_ylabel('y-position')
|
||||
self.ax3.grid()
|
||||
|
||||
self.mstep = 2
|
||||
self.ols = OpenLoopSolver(N=20, T=4.0)
|
||||
self.ols.setup()
|
||||
self.dt = self.ols.T / self.ols.N
|
||||
|
||||
|
@ -151,9 +227,11 @@ class RemoteController:
|
|||
# integrator
|
||||
self.r = scipy.integrate.ode(f_ode)
|
||||
self.omega_max = 5.0
|
||||
#self.omega_max = 13.32
|
||||
|
||||
|
||||
def ani(self):
|
||||
print("starting animation")
|
||||
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
|
||||
plt.ion()
|
||||
plt.show(block=True)
|
||||
|
@ -163,7 +241,18 @@ class RemoteController:
|
|||
self.ax.set_ylim(-2, 2)
|
||||
self.ax.set_aspect('equal', adjustable='box')
|
||||
|
||||
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol, self.circles[0], self.circles[1],self.circles[2],
|
||||
self.ax2.set_ylim(-2, 2)
|
||||
self.ax2.set_xlim(0, 10)
|
||||
|
||||
self.ax3.set_ylim(-2, 2)
|
||||
self.ax3.set_xlim(0, 10)
|
||||
|
||||
self.track_line_inner.set_data(self.track.inner_poly.exterior.xy)
|
||||
self.track_line_outer.set_data(self.track.outer_poly.exterior.xy)
|
||||
|
||||
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol,\
|
||||
self.track_line_inner, self.track_line_outer, self.line_x,self.line_y, self.circles[0], self.circles[1],\
|
||||
self.circles[2], self.circles[3],
|
||||
|
||||
def ani_update(self, frame):
|
||||
#print("plotting")
|
||||
|
@ -187,6 +276,12 @@ class RemoteController:
|
|||
|
||||
self.dirm.set_data(np.array([a, a2]), np.array([b, b2]))
|
||||
|
||||
n_plot = 300
|
||||
if len(tm_local) > n_plot:
|
||||
# plot x and y coordinate
|
||||
self.line_x.set_data(tm_local[-n_plot:] - (tm_local[-1] - 10), xm_local[-n_plot:,0])
|
||||
self.line_y.set_data(tm_local[-n_plot:] - (tm_local[-1] - 10), xm_local[-n_plot:, 1])
|
||||
|
||||
ts_local = deepcopy(self.ts)
|
||||
xs_local = deepcopy(self.xs)
|
||||
|
||||
|
@ -224,9 +319,12 @@ class RemoteController:
|
|||
finally:
|
||||
self.mutex.release()
|
||||
|
||||
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol, self.circles[0], self.circles[1],self.circles[2],
|
||||
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol, self.track_line_inner, self.track_line_outer,\
|
||||
self.line_x, self.line_y, self.circles[0], self.circles[1],self.circles[2],self.circles[3],
|
||||
|
||||
def measurement_callback(self, data):
|
||||
#print(data)
|
||||
|
||||
# detect robots
|
||||
if data.id in self.robot_ids:
|
||||
r = self.robot_ids[data.id]
|
||||
|
@ -259,8 +357,26 @@ class RemoteController:
|
|||
obst = (data.x, data.y)
|
||||
self.obstacles[data.id].pos = obst
|
||||
|
||||
# detect track
|
||||
if data.id in self.track.inner.keys() or data.id in self.track.outer.keys():
|
||||
self.track.set_id(data)
|
||||
|
||||
def controller(self):
|
||||
print("starting control")
|
||||
|
||||
targets = {}
|
||||
markers_in = self.track.inner.values()
|
||||
markers_out = self.track.outer.values()
|
||||
|
||||
# find targets:
|
||||
lamb = 0.3
|
||||
for i in range(0,4):
|
||||
p = np.array(markers_in[i]) + lamb * (np.array(markers_out[i]) - np.array(markers_in[i]))
|
||||
targets[i] = (p[0],p[1], 0.0)
|
||||
|
||||
auto_control = False
|
||||
current_target = 0
|
||||
|
||||
while True:
|
||||
|
||||
# open loop controller
|
||||
|
@ -277,21 +393,37 @@ class RemoteController:
|
|||
elif event.key == pygame.K_0:
|
||||
self.target = (0.0, 0.0, 0.0)
|
||||
elif event.key == pygame.K_1:
|
||||
self.target = (0.5,0.5, -np.pi/2.0)
|
||||
#self.target = (0.5,0.5, -np.pi/2.0)
|
||||
self.target = targets[0]
|
||||
elif event.key == pygame.K_2:
|
||||
self.target = (0.5, -0.5, 0.0)
|
||||
#self.target = (0.5, -0.5, 0.0)
|
||||
self.target = targets[1]
|
||||
elif event.key == pygame.K_3:
|
||||
self.target = (-0.5,-0.5, np.pi/2.0)
|
||||
#self.target = (-0.5,-0.5, np.pi/2.0)
|
||||
self.target = targets[2]
|
||||
elif event.key == pygame.K_4:
|
||||
self.target = (-0.5,0.5, 0.0)
|
||||
#self.target = (-0.5,0.5, 0.0)
|
||||
self.target = targets[3]
|
||||
elif event.key == pygame.K_5:
|
||||
auto_control = not auto_control
|
||||
if auto_control:
|
||||
self.target = targets[current_target]
|
||||
|
||||
if self.controlling:
|
||||
|
||||
x_pred = self.get_measurement_prediction()
|
||||
|
||||
if auto_control:
|
||||
if np.linalg.norm(x_pred[0:2]-np.array(self.target[0:2])) < 0.3:
|
||||
print("close to target!")
|
||||
current_target = (current_target + 1) % 4
|
||||
self.target = targets[current_target]
|
||||
print("new target = {}".format(current_target))
|
||||
|
||||
tmpc_start = time.time()
|
||||
|
||||
# solve mpc open loop problem
|
||||
res = self.ols.solve(x_pred, self.target, self.obstacles)
|
||||
res = self.ols.solve(x_pred, self.target, self.obstacles, self.track)
|
||||
|
||||
us1 = res[0]
|
||||
us2 = res[1]
|
||||
|
@ -312,18 +444,24 @@ class RemoteController:
|
|||
time.sleep(self.dt - dt_mpc)
|
||||
|
||||
# send controls to the robot
|
||||
for i in range(0, 1): # option to use multistep mpc if len(range) > 1
|
||||
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
|
||||
u1 = us1[i]
|
||||
u2 = us2[i]
|
||||
if self.rc_socket:
|
||||
self.rc_socket.send('({},{})\n'.format(u1, u2))
|
||||
if i < self.mstep:
|
||||
time.sleep(self.dt)
|
||||
self.t = time.time() # save time the most recent control was applied
|
||||
|
||||
def get_measurement_prediction(self):
|
||||
# get measurement
|
||||
self.mutex.acquire()
|
||||
try:
|
||||
last_measurement = copy.deepcopy(self.xms[-1])
|
||||
window = 3
|
||||
last_measurement = copy.deepcopy(self.xms[-window:])
|
||||
#print("last_measurements = {}".format(last_measurement))
|
||||
#print("mean = {}".format(np.mean(last_measurement, axis=0)))
|
||||
last_measurement = np.mean(last_measurement, axis=0)
|
||||
last_time = copy.deepcopy(self.tms[-1])
|
||||
finally:
|
||||
self.mutex.release()
|
||||
|
@ -349,6 +487,10 @@ def main(args):
|
|||
screenwidth = 640
|
||||
pygame.display.set_mode([screenwidth, screenheight])
|
||||
|
||||
print("waiting until track is completely detected")
|
||||
while not rc.track.track_complete:
|
||||
pass
|
||||
|
||||
#threading.Thread(target=rc.input_handling).start()
|
||||
threading.Thread(target=rc.controller).start()
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user