Compare commits

...

7 Commits

3 changed files with 462 additions and 169 deletions

View File

@ -4,8 +4,9 @@ from machine import I2C, Pin
import d1motor import d1motor
import time import utime
import usocket import usocket
import uselect
import esp import esp
class Robot: class Robot:
@ -32,6 +33,12 @@ class Robot:
# setup socket for remote control # setup socket for remote control
self.addr = usocket.getaddrinfo(ip, 1234)[0][-1] self.addr = usocket.getaddrinfo(ip, 1234)[0][-1]
self.poller = uselect.poll()
self.poller_timeout = 2 # timeout in ms
self.control_queue = []
def remote_control(self): def remote_control(self):
while True: while True:
print("setting up socket communication ...") print("setting up socket communication ...")
@ -45,18 +52,69 @@ class Robot:
socket_setup_complete = True socket_setup_complete = True
except Exception as e: except Exception as e:
print("could not create socket. error msg: {}\nwaiting 1 sec and retrying...".format(e)) print("could not create socket. error msg: {}\nwaiting 1 sec and retrying...".format(e))
time.sleep(1.0) utime.sleep(1.0)
print("waiting for connections on {} ...".format(self.addr)) print("waiting for connections on {} ...".format(self.addr))
socket.listen(1) socket.listen(1)
res = socket.accept() # this blocks until someone connects to the socket res = socket.accept() # this blocks until someone connects to the socket
comm_socket = res[0] comm_socket = res[0]
self.poller.register(comm_socket, uselect.POLLIN)
print("connected!") print("connected!")
listening = True listening = True
duration_current = 0
t_current = utime.ticks_ms()
duration_next = None
stopped = True
timeouts = 0
while listening: while listening:
# expected data: '(u1, u2)'\n" elapsed = utime.ticks_ms()
remaining = duration_current - (elapsed-t_current)
if remaining >= 0:
timeouts = 0
print("start of loop\n I have {} ms until next control needs to be applied".format(remaining))
else:
# current control timed out -> applying next control
if len(self.control_queue) > 0:
print("previous control applied for {} ms too long".format(elapsed - t_current - duration_current))
u_next = self.control_queue.pop(0)
#print("duration of previous control = {}".format((elapsed - t_current)/1000.0))
#print("applying new control (duration, u1, u2) = ({}, {}, {})".format(duration_next, u1_next, u2_next))
# if so, apply it
self.m1.speed(u_next[0])
self.m2.speed(u_next[1])
t_current = utime.ticks_ms()
duration_current = duration_next
stopped = False
elif not stopped:
print("previous control applied for {} ms too long".format(elapsed - t_current - duration_current))
#print("duration of previous control = {}".format((elapsed - t_current)/1000.0))
# no new control available -> shutdown
print("no new control available -> stopping")
self.m1.speed(0)
self.m2.speed(0)
t_current = utime.ticks_ms()
duration_current = 0 # as soon as new control will become available we directly want to apply it immediately
stopped = True
#elif timeouts < 10:
# print("start of loop\n I have {} ms until next control needs to be applied, timeouts = {}".format(remaining, timeouts))
# timeouts = timeouts + 1
trecv_start = utime.ticks_ms()
# expected data: '(t, u1_0, u2_0, u1_1, u2_1, ...)'\n"
# where ui = control for motor i # where ui = control for motor i
# ui \in [-1.0, 1.0] # ui \in [-1.0, 1.0]
#print("poller waiting..")
poll_res = self.poller.poll(self.poller_timeout) # wait 100 milliseconds for socket data
if poll_res:
print("new data available")
try: try:
data = comm_socket.readline() data = comm_socket.readline()
data_str = data.decode() data_str = data.decode()
@ -64,36 +122,54 @@ class Robot:
#print("processing data = {}".format(data_str)) #print("processing data = {}".format(data_str))
l = data_str.strip('()\n').split(',') l = data_str.strip('()\n').split(',')
#print("l = {}".format(l)) #print("l = {}".format(l))
u1 = int(float(l[0])*100) duration_next = int(float(l[0])*1000)
#print("u1 = {}".format(u1)) #print("duration = {}".format(duration_next))
u2 = int(float(l[1])*100) self.control_queue = []
#print("u2 = {}".format(u2)) print("putting data into queue")
for i in range((len(l)-1)/2):
u1_next = int(float(l[2*i+1])*100)
print("u1 = {}".format(u1_next))
u2_next = int(float(l[2*i+2])*100)
print("u2 = {}".format(u2_next))
self.control_queue.append((u1_next, u2_next))
except ValueError: except ValueError:
print("ValueError: Data has wrong format.") print("ValueError: Data has wrong format.")
print("Data received: {}".format(data_str)) print("Data received: {}".format(data_str))
print("Shutting down ...") print("Shutting down ...")
u1 = u2 = 0 self.control_queue = []
duration_current = 0
listening = False listening = False
except IndexError:
print("IndexError: Data has wrong format.")
print("Data received: {}".format(data_str))
print("Shutting down ...")
u1 = u2 = 0
listening = False
except Exception as e:
print("Some other error occured")
print("Exception: {}".format(e))
print("Shutting down ...")
u1 = u2 = 0
listening = False
finally:
self.m1.speed(u1)
self.m2.speed(u2)
comm_socket.close() comm_socket.close()
socket.close() socket.close()
del comm_socket del comm_socket
del socket del socket
print("disconnected!") print("disconnected!")
except IndexError:
print("IndexError: Data has wrong format.")
print("Data received: {}".format(data_str))
print("Shutting down ...")
self.control_queue = []
duration_current = 0
listening = False
comm_socket.close()
socket.close()
del comm_socket
del socket
print("disconnected!")
except Exception as e:
print("Some other error occured")
print("Exception: {}".format(e))
print("Shutting down ...")
self.control_queue = []
duration_current = 0
listening = False
comm_socket.close()
socket.close()
del comm_socket
del socket
print("disconnected!")
trecv_end = utime.ticks_ms()
print("communication (incl. polling) took {} ms".format(trecv_end - trecv_start))
wall_e = Robot() wall_e = Robot()
wall_e.remote_control() wall_e.remote_control()

View File

@ -1,13 +1,17 @@
from casadi import * from casadi import *
import time
# look at: https://github.com/casadi/casadi/blob/master/docs/examples/python/vdp_indirect_multiple_shooting.py # look at: https://github.com/casadi/casadi/blob/master/docs/examples/python/vdp_indirect_multiple_shooting.py
class OpenLoopSolver: class OpenLoopSolver:
def __init__(self, N=60, T=6.0): def __init__(self, N=10, T=2.0):
self.T = T self.T = T
self.N = N self.N = N
def solve(self, x0): self.opti_x0 = None
self.opti_lam_g0 = None
def setup(self):
x = SX.sym('x') x = SX.sym('x')
y = SX.sym('y') y = SX.sym('y')
theta = SX.sym('theta') theta = SX.sym('theta')
@ -15,29 +19,32 @@ class OpenLoopSolver:
r = 0.03 r = 0.03
R = 0.05 R = 0.05
d = 0.02 d = 0.02
#omega_max = 13.32
omega_max = 10.0
omegar = SX.sym('omegar') omegar = SX.sym('omegar')
omegal = SX.sym('omegal') omegal = SX.sym('omegal')
control = vertcat(omegar, omegal) control = vertcat(omegar, omegal)
# model equation # model equation
f1 = (r / 2 * cos(theta) - r * d / (2 * R) * sin(theta)) * omegar + (r / 2 * cos(theta) + r * d / (2 * R) * sin( f1 = (r / 2 * cos(theta) - r * d / (2 * R) * sin(theta)) * omegar * omega_max + (r / 2 * cos(theta) + r * d / (2 * R) * sin(
theta)) * omegal theta)) * omegal * omega_max
f2 = (r / 2 * sin(theta) + r * d / (2 * R) * cos(theta)) * omegar + (r / 2 * sin(theta) - r * d / (2 * R) * cos( f2 = (r / 2 * sin(theta) + r * d / (2 * R) * cos(theta)) * omegar * omega_max + (r / 2 * sin(theta) - r * d / (2 * R) * cos(
theta)) * omegal theta)) * omegal * omega_max
f3 = r / (2 * R) * omegar - r / (2 * R) * omegal f3 = -(r / (2 * R) * omegar - r / (2 * R) * omegal) * omega_max
xdot = vertcat(f1, f2, f3) xdot = vertcat(f1, f2, f3)
f = Function('f', [x, y, theta, omegar, omegal], [f1, f2, f3]) f = Function('f', [x, y, theta, omegar, omegal], [f1, f2, f3])
print("f = {}".format(f)) print("f = {}".format(f))
# cost functional # cost functional
L = x ** 2 + y ** 2 + 1e-2 * theta ** 2 + 1e-4 * (omegar ** 2 + omegal ** 2) target = (-0.0, 0.0)
L = (x-target[0]) ** 2 + (y-target[1]) ** 2 + 1e-2 * theta ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
# Fixed step Runge-Kutta 4 integrator # Fixed step Runge-Kutta 4 integrator
M = 4 # RK4 steps per interval M = 4 # RK4 steps per interval
DT = self.T / self.N / M DT = self.T / self.N / M
print("DT = {}".format(DT)) print("DT = {}".format(DT))
f = Function('f', [state, control], [xdot, L]) self.f = Function('f', [state, control], [xdot, L])
X0 = MX.sym('X0', 3) X0 = MX.sym('X0', 3)
U = MX.sym('U', 2) U = MX.sym('U', 2)
X = X0 X = X0
@ -45,10 +52,10 @@ class OpenLoopSolver:
runge_kutta = True runge_kutta = True
if runge_kutta: if runge_kutta:
for j in range(M): for j in range(M):
k1, k1_q = f(X, U) k1, k1_q = self.f(X, U)
k2, k2_q = f(X + DT / 2 * k1, U) k2, k2_q = self.f(X + DT / 2 * k1, U)
k3, k3_q = f(X + DT / 2 * k2, U) k3, k3_q = self.f(X + DT / 2 * k2, U)
k4, k4_q = f(X + DT * k3, U) k4, k4_q = self.f(X + DT * k3, U)
X = X + DT / 6 * (k1 + 2 * k2 + 2 * k3 + k4) X = X + DT / 6 * (k1 + 2 * k2 + 2 * k3 + k4)
Q = Q + DT / 6 * (k1_q + 2 * k2_q + 2 * k3_q + k4_q) Q = Q + DT / 6 * (k1_q + 2 * k2_q + 2 * k3_q + k4_q)
else: else:
@ -75,37 +82,38 @@ class OpenLoopSolver:
ubg = [] ubg = []
# Formulate the NLP # Formulate the NLP
Xk = MX(x0) # Xk = MX(x0)
for k in range(self.N): # for k in range(self.N):
# New NLP variable for the control # # New NLP variable for the control
U1k = MX.sym('U1_' + str(k), 2) # U1k = MX.sym('U1_' + str(k), 2)
# U2k = MX.sym('U2_' + str(k)) # # U2k = MX.sym('U2_' + str(k))
w += [U1k] # w += [U1k]
lbw += [-10, -10] # lbw += [-0.5, -0.5]
ubw += [10, 10] # ubw += [0.5, 0.5]
w0 += [0, 0] # w0 += [0, 0]
#
# Integrate till the end of the interval # # Integrate till the end of the interval
Fk = F(x0=Xk, p=U1k) # Fk = F(x0=Xk, p=U1k)
Xk = Fk['xf'] # Xk = Fk['xf']
J = J + Fk['qf'] # J = J + Fk['qf']
#
# Add inequality constraint # # Add inequality constraint
# g += [Xk[1]] # # g += [Xk[1]]
# lbg += [-.0] # # lbg += [-.0]
# ubg += [inf] # # ubg += [inf]
#
# Create an NLP solver # # Create an NLP solver
prob = {'f': J, 'x': vertcat(*w), 'g': vertcat(*g)} # prob = {'f': J, 'x': vertcat(*w), 'g': vertcat(*g)}
self.solver = nlpsol('solver', 'ipopt', prob) # self.solver = nlpsol('solver', 'ipopt', prob)
# Solve the NLP # Solve the NLP
if False:
sol = self.solver(x0=w0, lbx=lbw, ubx=ubw, lbg=lbg, ubg=ubg) sol = self.solver(x0=w0, lbx=lbw, ubx=ubw, lbg=lbg, ubg=ubg)
w_opt = sol['x'] w_opt = sol['x']
# Plot the solution # Plot the solution
u_opt = w_opt u_opt = w_opt
x_opt = [x0] x_opt = [self.x0]
for k in range(self.N): for k in range(self.N):
Fk = F(x0=x_opt[-1], p=u_opt[2*k:2*k+2]) Fk = F(x0=x_opt[-1], p=u_opt[2*k:2*k+2])
x_opt += [Fk['xf'].full()] x_opt += [Fk['xf'].full()]
@ -114,88 +122,145 @@ class OpenLoopSolver:
x3_opt = [r[2] for r in x_opt] x3_opt = [r[2] for r in x_opt]
tgrid = [self.T/self.N*k for k in range(self.N+1)] tgrid = [self.T/self.N*k for k in range(self.N+1)]
import matplotlib.pyplot as plt #import matplotlib.pyplot as plt
plt.figure(2) #plt.figure(2)
plt.clf() #plt.clf()
plt.plot(tgrid, x1_opt, '--') #plt.plot(tgrid, x1_opt, '--')
plt.plot(tgrid, x2_opt, '-') #plt.plot(tgrid, x2_opt, '-')
plt.plot(tgrid, x3_opt, '*') #plt.plot(tgrid, x3_opt, '*')
#plt.step(tgrid, vertcat(DM.nan(1), u_opt), '-.') #plt.step(tgrid, vertcat(DM.nan(1), u_opt), '-.')
plt.xlabel('t') #plt.xlabel('t')
plt.legend(['x1','x2','x3','u']) #plt.legend(['x1','x2','x3','u'])
plt.grid() #plt.grid()
#plt.show() #plt.show()
#return #return
# alternative solution using multiple shooting (way faster!) # alternative solution using multiple shooting (way faster!)
opti = Opti() # Optimization problem self.opti = Opti() # Optimization problem
# ---- decision variables --------- # ---- decision variables ---------
X = opti.variable(3,self.N+1) # state trajectory self.X = self.opti.variable(3,self.N+1) # state trajectory
Q = opti.variable(1,self.N+1) # state trajectory self.Q = self.opti.variable(1,self.N+1) # state trajectory
posx = X[0,:]
posy = X[1,:] self.U = self.opti.variable(2,self.N) # control trajectory (throttle)
angle = X[2,:] #T = self.opti.variable() # final time
U = opti.variable(2,self.N) # control trajectory (throttle)
#T = opti.variable() # final time
# ---- objective --------- # ---- objective ---------
#opti.minimize(T) # race in minimal time #self.opti.minimize(T) # race in minimal time
# ---- dynamic constraints -------- # ---- dynamic constraints --------
#f = lambda x,u: vertcat(f1, f2, f3) # dx/dt = f(x,u) #f = lambda x,u: vertcat(f1, f2, f3) # dx/dt = f(x,u)
# ---- initial values for solver ---
#self.opti.set_initial(speed, 1)
#self.opti.set_initial(T, 1)
def solve(self, x0, target):
tstart = time.time()
x = SX.sym('x')
y = SX.sym('y')
theta = SX.sym('theta')
state = vertcat(x, y, theta)
r = 0.03
R = 0.05
d = 0.02
omega_max = 13.32
omegar = SX.sym('omegar')
omegal = SX.sym('omegal')
control = vertcat(omegar, omegal)
# model equation
f1 = (r / 2 * cos(theta) - r * d / (2 * R) * sin(theta)) * omegar * omega_max + (r / 2 * cos(theta) + r * d / (
2 * R) * sin(
theta)) * omegal * omega_max
f2 = (r / 2 * sin(theta) + r * d / (2 * R) * cos(theta)) * omegar * omega_max + (r / 2 * sin(theta) - r * d / (
2 * R) * cos(
theta)) * omegal * omega_max
f3 = -(r / (2 * R) * omegar - r / (2 * R) * omegal) * omega_max
xdot = vertcat(f1, f2, f3)
L = (x - target[0]) ** 2 + (y - target[1]) ** 2 + 1e-2 * (theta - target[2]) ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
self.f = Function('f', [state, control], [xdot, L])
# ---- solve NLP ------
# set numerical backend
# delete constraints
self.opti.subject_to()
# add new constraints
dt = self.T / self.N # length of a control interval dt = self.T / self.N # length of a control interval
for k in range(self.N): # loop over control intervals for k in range(self.N): # loop over control intervals
# Runge-Kutta 4 integration # Runge-Kutta 4 integration
k1, k1_q = f(X[:,k], U[:,k]) k1, k1_q = self.f(self.X[:, k], self.U[:, k])
k2, k2_q = f(X[:,k]+dt/2*k1, U[:,k]) k2, k2_q = self.f(self.X[:, k] + dt / 2 * k1, self.U[:, k])
k3, k3_q = f(X[:,k]+dt/2*k2, U[:,k]) k3, k3_q = self.f(self.X[:, k] + dt / 2 * k2, self.U[:, k])
k4, k4_q = f(X[:,k]+dt*k3, U[:,k]) k4, k4_q = self.f(self.X[:, k] + dt * k3, self.U[:, k])
x_next = X[:,k] + dt/6*(k1+2*k2+2*k3+k4) x_next = self.X[:, k] + dt / 6 * (k1 + 2 * k2 + 2 * k3 + k4)
q_next = Q[:,k] + dt/6*(k1_q + 2 * k2_q + 2 * k3_q + k4_q) q_next = self.Q[:, k] + dt / 6 * (k1_q + 2 * k2_q + 2 * k3_q + k4_q)
opti.subject_to(X[:,k+1]==x_next) # close the gaps self.opti.subject_to(self.X[:, k + 1] == x_next) # close the gaps
opti.subject_to(Q[:,k+1]==q_next) # close the gaps self.opti.subject_to(self.Q[:, k + 1] == q_next) # close the gaps
opti.minimize(Q[:,self.N]) self.opti.minimize(self.Q[:, self.N])
# ---- path constraints ----------- # ---- path constraints -----------
# limit = lambda pos: 1-sin(2*pi*pos)/2 # limit = lambda pos: 1-sin(2*pi*pos)/2
#opti.subject_to(speed<=limit(pos)) # track speed limit # self.opti.subject_to(speed<=limit(pos)) # track speed limit
opti.subject_to(opti.bounded(-10,U,10)) # control is limited maxcontrol = 0.5
self.opti.subject_to(self.opti.bounded(-maxcontrol, self.U, maxcontrol)) # control is limited
# ---- boundary conditions -------- # ---- boundary conditions --------
opti.subject_to(posx[0]==x0[0]) # start at position 0 ...
opti.subject_to(posy[0]==x0[1]) # ... from stand-still # self.opti.subject_to(speed[-1]==0) # .. with speed 0
opti.subject_to(angle[0]==x0[2]) # finish line at position 1 self.opti.subject_to(self.Q[:, 0] == 0.0)
#opti.subject_to(speed[-1]==0) # .. with speed 0
opti.subject_to(Q[:,0]==0.0)
solver = self.opti.solver("ipopt", {}, {"print_level": 0})
# ---- misc. constraints ---------- # ---- misc. constraints ----------
#opti.subject_to(X[1,:]>=0) # Time must be positive # self.opti.subject_to(X[1,:]>=0) # Time must be positive
#opti.subject_to(X[2,:]<=4) # Time must be positive # self.opti.subject_to(X[2,:]<=4) # Time must be positive
#opti.subject_to(X[2,:]>=-2) # Time must be positive # self.opti.subject_to(X[2,:]>=-2) # Time must be positive
# avoid obstacle # avoid obstacle
# r = 0.25 # r = 0.25
# p = (0.5, 0.5) # p = (0.5, 0.5)
# for k in range(self.N): # for k in range(self.N):
# opti.subject_to((X[0,k]-p[0])**2 + (X[1,k]-p[1])**2 > r**2) # self.opti.subject_to((X[0,k]-p[0])**2 + (X[1,k]-p[1])**2 > r**2)
# pass # pass
posx = self.X[0, :]
posy = self.X[1, :]
angle = self.X[2, :]
self.opti.subject_to(posx[0] == x0[0]) # start at position 0 ...
self.opti.subject_to(posy[0] == x0[1]) # ... from stand-still
self.opti.subject_to(angle[0] == x0[2]) # finish line at position 1
tend = time.time()
# ---- initial values for solver --- print("setting up problem took {} seconds".format(tend - tstart))
#opti.set_initial(speed, 1)
#opti.set_initial(T, 1)
# ---- solve NLP ------ if self.opti_x0 is not None:
opti.solver("ipopt") # set numerical backend self.opti.set_initial(self.opti.lam_g, self.opti_lam_g0)
sol = opti.solve() # actual solve self.opti.set_initial(self.opti.x, self.opti_x0)
sol = self.opti.solve() # actual solve
#x0 = sol.value(opti.x) self.opti_x0 = sol.value(self.opti.x)
#lam_g0 = sol.value(opti.lam_g) self.opti_lam_g0 = sol.value(self.opti.lam_g)
#opti.set_initial(opti.lam_g, lam_g0)
#opti.set_initial(opti.x, x0) #u_opt_1 = map(lambda x: float(x), [u_opt[i * 2] for i in range(0, 60)])
#opti.solve() #u_opt_2 = map(lambda x: float(x), [u_opt[i * 2 + 1] for i in range(0, 60)])
u_opt_1 = sol.value(self.U[0,:])
u_opt_2 = sol.value(self.U[1,:])
return (u_opt_1, u_opt_2)
#lam_g0 = sol.value(self.opti.lam_g)
#self.opti.solve()
from pylab import plot, step, figure, legend, show, spy from pylab import plot, step, figure, legend, show, spy
@ -206,8 +271,8 @@ class OpenLoopSolver:
plt.figure(3) plt.figure(3)
plot(sol.value(posx), sol.value(posy)) plot(sol.value(posx), sol.value(posy))
ax = plt.gca() ax = plt.gca()
circle = plt.Circle(p, r) #circle = plt.Circle(p, r)
ax.add_artist(circle) #ax.add_artist(circle)
#plot(limit(sol.value(pos)),'r--',label="speed limit") #plot(limit(sol.value(pos)),'r--',label="speed limit")
#step(range(N),sol.value(U),'k',label="throttle") #step(range(N),sol.value(U),'k',label="throttle")
legend(loc="upper left") legend(loc="upper left")

View File

@ -11,6 +11,7 @@ import pygame
import numpy as np import numpy as np
import socket import socket
import scipy.integrate import scipy.integrate
import copy
import threading import threading
from copy import deepcopy from copy import deepcopy
@ -118,8 +119,12 @@ class RemoteController:
self.dirs, = self.ax.plot([], []) self.dirs, = self.ax.plot([], [])
plt.xlabel('x-position') plt.xlabel('x-position')
plt.ylabel('y-position') plt.ylabel('y-position')
plt.grid()
self.ols = OpenLoopSolver() self.ols = OpenLoopSolver()
self.ols.setup()
self.target = (0.0, 0.0, 0.0)
def ani(self): def ani(self):
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True) self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
@ -207,6 +212,21 @@ class RemoteController:
self.mutex.release() self.mutex.release()
def controller(self): def controller(self):
tgrid = None
us1 = None
us2 = None
u1 = -0.0
u2 = 0.0
r = scipy.integrate.ode(f_ode)
omega_max = 5.0
init_pos = None
init_time = None
final_pos = None
final_time = None
forward = True
print("starting control") print("starting control")
while True: while True:
@ -217,7 +237,7 @@ class RemoteController:
if keyboard_control: # keyboard controller if keyboard_control: # keyboard controller
events = pygame.event.get() events = pygame.event.get()
speed = 0.5 speed = 1.0
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT: if event.key == pygame.K_LEFT:
@ -236,17 +256,17 @@ class RemoteController:
self.u1 = -speed self.u1 = -speed
self.u2 = -speed self.u2 = -speed
#print("forward: ({},{})".format(u1, u2)) #print("forward: ({},{})".format(u1, u2))
self.rc_socket.send('({},{})\n'.format(self.u1, self.u2)) self.rc_socket.send('({},{},{})\n'.format(0.1, self.u1, self.u2))
elif event.type == pygame.KEYUP: elif event.type == pygame.KEYUP:
self.u1 = 0 self.u1 = 0
self.u2 = 0 self.u2 = 0
#print("key released, resetting: ({},{})".format(u1, u2)) #print("key released, resetting: ({},{})".format(u1, u2))
self.rc_socket.send('({},{})\n'.format(self.u1, self.u2)) self.rc_socket.send('({}, {},{})\n'.format(0.1, self.u1, self.u2))
tnew = time.time() tnew = time.time()
dt = tnew - self.t dt = tnew - self.t
r = scipy.integrate.ode(f_ode) r = scipy.integrate.ode(f_ode)
r.set_f_params(np.array([self.u1 * 13.32, self.u2 * 13.32])) r.set_f_params(np.array([self.u1 * omega_max, self.u2 * omega_max]))
#print(self.x0) #print(self.x0)
if self.x0 is None: if self.x0 is None:
@ -274,18 +294,59 @@ class RemoteController:
events = pygame.event.get() events = pygame.event.get()
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT: if event.key == pygame.K_1:
self.speed = self.speed / np.sqrt(np.sqrt(np.sqrt(10.0))) self.controlling = True
elif event.key == pygame.K_RIGHT: forward = True
self.speed = self.speed * np.sqrt(np.sqrt(np.sqrt(10.0))) print("starting test")
elif event.key == pygame.K_UP: self.mutex.acquire()
u1 = self.speed try:
u2 = -self.speed init_pos = copy.deepcopy(self.xms[-1])
elif event.key == pygame.K_DOWN: init_time = copy.deepcopy(self.tms[-1])
u1 = 0.0 finally:
u2 = 0.0 self.mutex.release()
print("speed = {}".format(self.speed)) if event.key == pygame.K_2:
self.rc_socket.send('({},{})\n'.format(u1, u2)) self.controlling = True
forward = False
print("starting test")
self.mutex.acquire()
try:
init_pos = copy.deepcopy(self.xms[-1])
init_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
elif event.key == pygame.K_3:
self.controlling = False
print("stopping test")
self.rc_socket.send('(0.1, 0.0,0.0)\n')
self.mutex.acquire()
try:
final_pos = copy.deepcopy(self.xms[-1])
final_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
print("init_pos = {}".format(init_pos))
print("final_pos = {}".format(final_pos))
print("distance = {}".format(np.linalg.norm(init_pos[0:2]-final_pos[0:2])))
print("dt = {}".format(final_time - init_time))
d = np.linalg.norm(init_pos[0:2]-final_pos[0:2])
t = final_time - init_time
r = 0.03
angular_velocity = d/r/t
print("average angular velocity = {}".format(angular_velocity))
if self.controlling:
if forward:
self.rc_socket.send('(0.1, 1.0,1.0)\n')
else:
self.rc_socket.send('(0.1, -1.0,-1.0)\n')
time.sleep(0.1)
#print("speed = {}".format(self.speed))
elif pid: elif pid:
# pid controller # pid controller
@ -342,7 +403,98 @@ class RemoteController:
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
if event.key == pygame.K_UP: if event.key == pygame.K_UP:
self.ols.solve(self.xms[-1]) self.controlling = True
self.t = time.time()
elif event.key == pygame.K_DOWN:
self.controlling = False
self.rc_socket.send('(0.1, 0.0,0.0)\n')
elif event.key == pygame.K_0:
self.target = (0.0, 0.0, 0.0)
elif event.key == pygame.K_1:
self.target = (0.5,0.5, -np.pi/2.0)
elif event.key == pygame.K_2:
self.target = (0.5, -0.5, 0.0)
elif event.key == pygame.K_3:
self.target = (-0.5,-0.5, np.pi/2.0)
elif event.key == pygame.K_4:
self.target = (-0.5,0.5, 0.0)
if self.controlling:
tmpc_start = time.time()
# get measurement
self.mutex.acquire()
try:
last_measurement = copy.deepcopy(self.xms[-1])
last_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
print("current measurement (t, x) = ({}, {})".format(last_time, last_measurement))
print("current control (u1, u2) = ({}, {})".format(u1, u2))
# prediction of state at time the mpc will terminate
r.set_f_params(np.array([u1 * omega_max, u2 * omega_max]))
r.set_initial_value(last_measurement, last_time)
dt = self.ols.T/self.ols.N
print("integrating for {} seconds".format((dt)))
x_pred = r.integrate(r.t + (dt))
print("predicted initial state x_pred = ({})".format(x_pred))
res = self.ols.solve(x_pred, self.target)
#tgrid = res[0]
us1 = res[0]
us2 = res[1]
# tt = 0
# x = last_measurement
# t_ol = np.array([tt])
# x_ol = np.array([x])
# # compute open loop prediction
# for i in range(len(us1)):
# r = scipy.integrate.ode(f_ode)
# r.set_f_params(np.array([us1[i] * 13.32, us2[i] * 13.32]))
# r.set_initial_value(x, tt)
#
# tt = tt + 0.1
# x = r.integrate(tt)
#
# t_ol = np.vstack((t_ol, tt))
# x_ol = np.vstack((x_ol, x))
#plt.figure(4)
#plt.plot(x_ol[:,0], x_ol[:,1])
#if event.key == pygame.K_DOWN:
# if tgrid is not None:
tmpc_end = time.time()
print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < dt:
print("sleeping for {} seconds...".format(dt - dt_mpc))
time.sleep(dt - dt_mpc)
self.mutex.acquire()
try:
second_measurement = copy.deepcopy(self.xms[-1])
second_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
print("(last_time, second_time, dt) = ({}, {}, {})".format(last_time, second_time, second_time - last_time))
print("mismatch between predicted state and measured state: {}\n\n".format(second_measurement - last_measurement))
for i in range(0, 1):
u1 = us1[i]
u2 = us2[i]
self.rc_socket.send('({},{},{})\n'.format(dt,u1, u2))
self.t = time.time()
#time.sleep(0.2)
#
pass
def main(args): def main(args):
rospy.init_node('controller_node', anonymous=True) rospy.init_node('controller_node', anonymous=True)