Compare commits

..

No commits in common. "843b30f5d3d1651a2335066988307f470954cc29" and "f100f21162d02ae618aded11e5949cddd6e77731" have entirely different histories.

3 changed files with 169 additions and 462 deletions

View File

@ -4,9 +4,8 @@ from machine import I2C, Pin
import d1motor import d1motor
import utime import time
import usocket import usocket
import uselect
import esp import esp
class Robot: class Robot:
@ -33,12 +32,6 @@ class Robot:
# setup socket for remote control # setup socket for remote control
self.addr = usocket.getaddrinfo(ip, 1234)[0][-1] self.addr = usocket.getaddrinfo(ip, 1234)[0][-1]
self.poller = uselect.poll()
self.poller_timeout = 2 # timeout in ms
self.control_queue = []
def remote_control(self): def remote_control(self):
while True: while True:
print("setting up socket communication ...") print("setting up socket communication ...")
@ -52,69 +45,18 @@ class Robot:
socket_setup_complete = True socket_setup_complete = True
except Exception as e: except Exception as e:
print("could not create socket. error msg: {}\nwaiting 1 sec and retrying...".format(e)) print("could not create socket. error msg: {}\nwaiting 1 sec and retrying...".format(e))
utime.sleep(1.0) time.sleep(1.0)
print("waiting for connections on {} ...".format(self.addr)) print("waiting for connections on {} ...".format(self.addr))
socket.listen(1) socket.listen(1)
res = socket.accept() # this blocks until someone connects to the socket res = socket.accept() # this blocks until someone connects to the socket
comm_socket = res[0] comm_socket = res[0]
self.poller.register(comm_socket, uselect.POLLIN)
print("connected!") print("connected!")
listening = True listening = True
duration_current = 0
t_current = utime.ticks_ms()
duration_next = None
stopped = True
timeouts = 0
while listening: while listening:
elapsed = utime.ticks_ms() # expected data: '(u1, u2)'\n"
remaining = duration_current - (elapsed-t_current)
if remaining >= 0:
timeouts = 0
print("start of loop\n I have {} ms until next control needs to be applied".format(remaining))
else:
# current control timed out -> applying next control
if len(self.control_queue) > 0:
print("previous control applied for {} ms too long".format(elapsed - t_current - duration_current))
u_next = self.control_queue.pop(0)
#print("duration of previous control = {}".format((elapsed - t_current)/1000.0))
#print("applying new control (duration, u1, u2) = ({}, {}, {})".format(duration_next, u1_next, u2_next))
# if so, apply it
self.m1.speed(u_next[0])
self.m2.speed(u_next[1])
t_current = utime.ticks_ms()
duration_current = duration_next
stopped = False
elif not stopped:
print("previous control applied for {} ms too long".format(elapsed - t_current - duration_current))
#print("duration of previous control = {}".format((elapsed - t_current)/1000.0))
# no new control available -> shutdown
print("no new control available -> stopping")
self.m1.speed(0)
self.m2.speed(0)
t_current = utime.ticks_ms()
duration_current = 0 # as soon as new control will become available we directly want to apply it immediately
stopped = True
#elif timeouts < 10:
# print("start of loop\n I have {} ms until next control needs to be applied, timeouts = {}".format(remaining, timeouts))
# timeouts = timeouts + 1
trecv_start = utime.ticks_ms()
# expected data: '(t, u1_0, u2_0, u1_1, u2_1, ...)'\n"
# where ui = control for motor i # where ui = control for motor i
# ui \in [-1.0, 1.0] # ui \in [-1.0, 1.0]
#print("poller waiting..")
poll_res = self.poller.poll(self.poller_timeout) # wait 100 milliseconds for socket data
if poll_res:
print("new data available")
try: try:
data = comm_socket.readline() data = comm_socket.readline()
data_str = data.decode() data_str = data.decode()
@ -122,54 +64,36 @@ class Robot:
#print("processing data = {}".format(data_str)) #print("processing data = {}".format(data_str))
l = data_str.strip('()\n').split(',') l = data_str.strip('()\n').split(',')
#print("l = {}".format(l)) #print("l = {}".format(l))
duration_next = int(float(l[0])*1000) u1 = int(float(l[0])*100)
#print("duration = {}".format(duration_next)) #print("u1 = {}".format(u1))
self.control_queue = [] u2 = int(float(l[1])*100)
print("putting data into queue") #print("u2 = {}".format(u2))
for i in range((len(l)-1)/2):
u1_next = int(float(l[2*i+1])*100)
print("u1 = {}".format(u1_next))
u2_next = int(float(l[2*i+2])*100)
print("u2 = {}".format(u2_next))
self.control_queue.append((u1_next, u2_next))
except ValueError: except ValueError:
print("ValueError: Data has wrong format.") print("ValueError: Data has wrong format.")
print("Data received: {}".format(data_str)) print("Data received: {}".format(data_str))
print("Shutting down ...") print("Shutting down ...")
self.control_queue = [] u1 = u2 = 0
duration_current = 0
listening = False listening = False
comm_socket.close()
socket.close()
del comm_socket
del socket
print("disconnected!")
except IndexError: except IndexError:
print("IndexError: Data has wrong format.") print("IndexError: Data has wrong format.")
print("Data received: {}".format(data_str)) print("Data received: {}".format(data_str))
print("Shutting down ...") print("Shutting down ...")
self.control_queue = [] u1 = u2 = 0
duration_current = 0
listening = False listening = False
comm_socket.close()
socket.close()
del comm_socket
del socket
print("disconnected!")
except Exception as e: except Exception as e:
print("Some other error occured") print("Some other error occured")
print("Exception: {}".format(e)) print("Exception: {}".format(e))
print("Shutting down ...") print("Shutting down ...")
self.control_queue = [] u1 = u2 = 0
duration_current = 0
listening = False listening = False
finally:
self.m1.speed(u1)
self.m2.speed(u2)
comm_socket.close() comm_socket.close()
socket.close() socket.close()
del comm_socket del comm_socket
del socket del socket
print("disconnected!") print("disconnected!")
trecv_end = utime.ticks_ms()
print("communication (incl. polling) took {} ms".format(trecv_end - trecv_start))
wall_e = Robot() wall_e = Robot()
wall_e.remote_control() wall_e.remote_control()

View File

@ -1,17 +1,13 @@
from casadi import * from casadi import *
import time
# look at: https://github.com/casadi/casadi/blob/master/docs/examples/python/vdp_indirect_multiple_shooting.py # look at: https://github.com/casadi/casadi/blob/master/docs/examples/python/vdp_indirect_multiple_shooting.py
class OpenLoopSolver: class OpenLoopSolver:
def __init__(self, N=10, T=2.0): def __init__(self, N=60, T=6.0):
self.T = T self.T = T
self.N = N self.N = N
self.opti_x0 = None def solve(self, x0):
self.opti_lam_g0 = None
def setup(self):
x = SX.sym('x') x = SX.sym('x')
y = SX.sym('y') y = SX.sym('y')
theta = SX.sym('theta') theta = SX.sym('theta')
@ -19,32 +15,29 @@ class OpenLoopSolver:
r = 0.03 r = 0.03
R = 0.05 R = 0.05
d = 0.02 d = 0.02
#omega_max = 13.32
omega_max = 10.0
omegar = SX.sym('omegar') omegar = SX.sym('omegar')
omegal = SX.sym('omegal') omegal = SX.sym('omegal')
control = vertcat(omegar, omegal) control = vertcat(omegar, omegal)
# model equation # model equation
f1 = (r / 2 * cos(theta) - r * d / (2 * R) * sin(theta)) * omegar * omega_max + (r / 2 * cos(theta) + r * d / (2 * R) * sin( f1 = (r / 2 * cos(theta) - r * d / (2 * R) * sin(theta)) * omegar + (r / 2 * cos(theta) + r * d / (2 * R) * sin(
theta)) * omegal * omega_max theta)) * omegal
f2 = (r / 2 * sin(theta) + r * d / (2 * R) * cos(theta)) * omegar * omega_max + (r / 2 * sin(theta) - r * d / (2 * R) * cos( f2 = (r / 2 * sin(theta) + r * d / (2 * R) * cos(theta)) * omegar + (r / 2 * sin(theta) - r * d / (2 * R) * cos(
theta)) * omegal * omega_max theta)) * omegal
f3 = -(r / (2 * R) * omegar - r / (2 * R) * omegal) * omega_max f3 = r / (2 * R) * omegar - r / (2 * R) * omegal
xdot = vertcat(f1, f2, f3) xdot = vertcat(f1, f2, f3)
f = Function('f', [x, y, theta, omegar, omegal], [f1, f2, f3]) f = Function('f', [x, y, theta, omegar, omegal], [f1, f2, f3])
print("f = {}".format(f)) print("f = {}".format(f))
# cost functional # cost functional
target = (-0.0, 0.0) L = x ** 2 + y ** 2 + 1e-2 * theta ** 2 + 1e-4 * (omegar ** 2 + omegal ** 2)
L = (x-target[0]) ** 2 + (y-target[1]) ** 2 + 1e-2 * theta ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
# Fixed step Runge-Kutta 4 integrator # Fixed step Runge-Kutta 4 integrator
M = 4 # RK4 steps per interval M = 4 # RK4 steps per interval
DT = self.T / self.N / M DT = self.T / self.N / M
print("DT = {}".format(DT)) print("DT = {}".format(DT))
self.f = Function('f', [state, control], [xdot, L]) f = Function('f', [state, control], [xdot, L])
X0 = MX.sym('X0', 3) X0 = MX.sym('X0', 3)
U = MX.sym('U', 2) U = MX.sym('U', 2)
X = X0 X = X0
@ -52,10 +45,10 @@ class OpenLoopSolver:
runge_kutta = True runge_kutta = True
if runge_kutta: if runge_kutta:
for j in range(M): for j in range(M):
k1, k1_q = self.f(X, U) k1, k1_q = f(X, U)
k2, k2_q = self.f(X + DT / 2 * k1, U) k2, k2_q = f(X + DT / 2 * k1, U)
k3, k3_q = self.f(X + DT / 2 * k2, U) k3, k3_q = f(X + DT / 2 * k2, U)
k4, k4_q = self.f(X + DT * k3, U) k4, k4_q = f(X + DT * k3, U)
X = X + DT / 6 * (k1 + 2 * k2 + 2 * k3 + k4) X = X + DT / 6 * (k1 + 2 * k2 + 2 * k3 + k4)
Q = Q + DT / 6 * (k1_q + 2 * k2_q + 2 * k3_q + k4_q) Q = Q + DT / 6 * (k1_q + 2 * k2_q + 2 * k3_q + k4_q)
else: else:
@ -82,38 +75,37 @@ class OpenLoopSolver:
ubg = [] ubg = []
# Formulate the NLP # Formulate the NLP
# Xk = MX(x0) Xk = MX(x0)
# for k in range(self.N): for k in range(self.N):
# # New NLP variable for the control # New NLP variable for the control
# U1k = MX.sym('U1_' + str(k), 2) U1k = MX.sym('U1_' + str(k), 2)
# # U2k = MX.sym('U2_' + str(k)) # U2k = MX.sym('U2_' + str(k))
# w += [U1k] w += [U1k]
# lbw += [-0.5, -0.5] lbw += [-10, -10]
# ubw += [0.5, 0.5] ubw += [10, 10]
# w0 += [0, 0] w0 += [0, 0]
#
# # Integrate till the end of the interval # Integrate till the end of the interval
# Fk = F(x0=Xk, p=U1k) Fk = F(x0=Xk, p=U1k)
# Xk = Fk['xf'] Xk = Fk['xf']
# J = J + Fk['qf'] J = J + Fk['qf']
#
# # Add inequality constraint # Add inequality constraint
# # g += [Xk[1]] # g += [Xk[1]]
# # lbg += [-.0] # lbg += [-.0]
# # ubg += [inf] # ubg += [inf]
#
# # Create an NLP solver # Create an NLP solver
# prob = {'f': J, 'x': vertcat(*w), 'g': vertcat(*g)} prob = {'f': J, 'x': vertcat(*w), 'g': vertcat(*g)}
# self.solver = nlpsol('solver', 'ipopt', prob) self.solver = nlpsol('solver', 'ipopt', prob)
# Solve the NLP # Solve the NLP
if False:
sol = self.solver(x0=w0, lbx=lbw, ubx=ubw, lbg=lbg, ubg=ubg) sol = self.solver(x0=w0, lbx=lbw, ubx=ubw, lbg=lbg, ubg=ubg)
w_opt = sol['x'] w_opt = sol['x']
# Plot the solution # Plot the solution
u_opt = w_opt u_opt = w_opt
x_opt = [self.x0] x_opt = [x0]
for k in range(self.N): for k in range(self.N):
Fk = F(x0=x_opt[-1], p=u_opt[2*k:2*k+2]) Fk = F(x0=x_opt[-1], p=u_opt[2*k:2*k+2])
x_opt += [Fk['xf'].full()] x_opt += [Fk['xf'].full()]
@ -122,145 +114,88 @@ class OpenLoopSolver:
x3_opt = [r[2] for r in x_opt] x3_opt = [r[2] for r in x_opt]
tgrid = [self.T/self.N*k for k in range(self.N+1)] tgrid = [self.T/self.N*k for k in range(self.N+1)]
#import matplotlib.pyplot as plt import matplotlib.pyplot as plt
#plt.figure(2) plt.figure(2)
#plt.clf() plt.clf()
#plt.plot(tgrid, x1_opt, '--') plt.plot(tgrid, x1_opt, '--')
#plt.plot(tgrid, x2_opt, '-') plt.plot(tgrid, x2_opt, '-')
#plt.plot(tgrid, x3_opt, '*') plt.plot(tgrid, x3_opt, '*')
#plt.step(tgrid, vertcat(DM.nan(1), u_opt), '-.') #plt.step(tgrid, vertcat(DM.nan(1), u_opt), '-.')
#plt.xlabel('t') plt.xlabel('t')
#plt.legend(['x1','x2','x3','u']) plt.legend(['x1','x2','x3','u'])
#plt.grid() plt.grid()
#plt.show() #plt.show()
#return #return
# alternative solution using multiple shooting (way faster!) # alternative solution using multiple shooting (way faster!)
self.opti = Opti() # Optimization problem opti = Opti() # Optimization problem
# ---- decision variables --------- # ---- decision variables ---------
self.X = self.opti.variable(3,self.N+1) # state trajectory X = opti.variable(3,self.N+1) # state trajectory
self.Q = self.opti.variable(1,self.N+1) # state trajectory Q = opti.variable(1,self.N+1) # state trajectory
posx = X[0,:]
self.U = self.opti.variable(2,self.N) # control trajectory (throttle) posy = X[1,:]
#T = self.opti.variable() # final time angle = X[2,:]
U = opti.variable(2,self.N) # control trajectory (throttle)
#T = opti.variable() # final time
# ---- objective --------- # ---- objective ---------
#self.opti.minimize(T) # race in minimal time #opti.minimize(T) # race in minimal time
# ---- dynamic constraints -------- # ---- dynamic constraints --------
#f = lambda x,u: vertcat(f1, f2, f3) # dx/dt = f(x,u) #f = lambda x,u: vertcat(f1, f2, f3) # dx/dt = f(x,u)
# ---- initial values for solver ---
#self.opti.set_initial(speed, 1)
#self.opti.set_initial(T, 1)
def solve(self, x0, target):
tstart = time.time()
x = SX.sym('x')
y = SX.sym('y')
theta = SX.sym('theta')
state = vertcat(x, y, theta)
r = 0.03
R = 0.05
d = 0.02
omega_max = 13.32
omegar = SX.sym('omegar')
omegal = SX.sym('omegal')
control = vertcat(omegar, omegal)
# model equation
f1 = (r / 2 * cos(theta) - r * d / (2 * R) * sin(theta)) * omegar * omega_max + (r / 2 * cos(theta) + r * d / (
2 * R) * sin(
theta)) * omegal * omega_max
f2 = (r / 2 * sin(theta) + r * d / (2 * R) * cos(theta)) * omegar * omega_max + (r / 2 * sin(theta) - r * d / (
2 * R) * cos(
theta)) * omegal * omega_max
f3 = -(r / (2 * R) * omegar - r / (2 * R) * omegal) * omega_max
xdot = vertcat(f1, f2, f3)
L = (x - target[0]) ** 2 + (y - target[1]) ** 2 + 1e-2 * (theta - target[2]) ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
self.f = Function('f', [state, control], [xdot, L])
# ---- solve NLP ------
# set numerical backend
# delete constraints
self.opti.subject_to()
# add new constraints
dt = self.T/self.N # length of a control interval dt = self.T/self.N # length of a control interval
for k in range(self.N): # loop over control intervals for k in range(self.N): # loop over control intervals
# Runge-Kutta 4 integration # Runge-Kutta 4 integration
k1, k1_q = self.f(self.X[:, k], self.U[:, k]) k1, k1_q = f(X[:,k], U[:,k])
k2, k2_q = self.f(self.X[:, k] + dt / 2 * k1, self.U[:, k]) k2, k2_q = f(X[:,k]+dt/2*k1, U[:,k])
k3, k3_q = self.f(self.X[:, k] + dt / 2 * k2, self.U[:, k]) k3, k3_q = f(X[:,k]+dt/2*k2, U[:,k])
k4, k4_q = self.f(self.X[:, k] + dt * k3, self.U[:, k]) k4, k4_q = f(X[:,k]+dt*k3, U[:,k])
x_next = self.X[:, k] + dt / 6 * (k1 + 2 * k2 + 2 * k3 + k4) x_next = X[:,k] + dt/6*(k1+2*k2+2*k3+k4)
q_next = self.Q[:, k] + dt / 6 * (k1_q + 2 * k2_q + 2 * k3_q + k4_q) q_next = Q[:,k] + dt/6*(k1_q + 2 * k2_q + 2 * k3_q + k4_q)
self.opti.subject_to(self.X[:, k + 1] == x_next) # close the gaps opti.subject_to(X[:,k+1]==x_next) # close the gaps
self.opti.subject_to(self.Q[:, k + 1] == q_next) # close the gaps opti.subject_to(Q[:,k+1]==q_next) # close the gaps
self.opti.minimize(self.Q[:, self.N]) opti.minimize(Q[:,self.N])
# ---- path constraints ----------- # ---- path constraints -----------
#limit = lambda pos: 1-sin(2*pi*pos)/2 #limit = lambda pos: 1-sin(2*pi*pos)/2
# self.opti.subject_to(speed<=limit(pos)) # track speed limit #opti.subject_to(speed<=limit(pos)) # track speed limit
maxcontrol = 0.5 opti.subject_to(opti.bounded(-10,U,10)) # control is limited
self.opti.subject_to(self.opti.bounded(-maxcontrol, self.U, maxcontrol)) # control is limited
# ---- boundary conditions -------- # ---- boundary conditions --------
opti.subject_to(posx[0]==x0[0]) # start at position 0 ...
# self.opti.subject_to(speed[-1]==0) # .. with speed 0 opti.subject_to(posy[0]==x0[1]) # ... from stand-still
self.opti.subject_to(self.Q[:, 0] == 0.0) opti.subject_to(angle[0]==x0[2]) # finish line at position 1
#opti.subject_to(speed[-1]==0) # .. with speed 0
opti.subject_to(Q[:,0]==0.0)
solver = self.opti.solver("ipopt", {}, {"print_level": 0})
# ---- misc. constraints ---------- # ---- misc. constraints ----------
# self.opti.subject_to(X[1,:]>=0) # Time must be positive #opti.subject_to(X[1,:]>=0) # Time must be positive
# self.opti.subject_to(X[2,:]<=4) # Time must be positive #opti.subject_to(X[2,:]<=4) # Time must be positive
# self.opti.subject_to(X[2,:]>=-2) # Time must be positive #opti.subject_to(X[2,:]>=-2) # Time must be positive
# avoid obstacle # avoid obstacle
#r = 0.25 #r = 0.25
#p = (0.5, 0.5) #p = (0.5, 0.5)
#for k in range(self.N): #for k in range(self.N):
# self.opti.subject_to((X[0,k]-p[0])**2 + (X[1,k]-p[1])**2 > r**2) # opti.subject_to((X[0,k]-p[0])**2 + (X[1,k]-p[1])**2 > r**2)
# pass # pass
posx = self.X[0, :]
posy = self.X[1, :]
angle = self.X[2, :]
self.opti.subject_to(posx[0] == x0[0]) # start at position 0 ...
self.opti.subject_to(posy[0] == x0[1]) # ... from stand-still
self.opti.subject_to(angle[0] == x0[2]) # finish line at position 1
tend = time.time()
print("setting up problem took {} seconds".format(tend - tstart)) # ---- initial values for solver ---
#opti.set_initial(speed, 1)
#opti.set_initial(T, 1)
if self.opti_x0 is not None: # ---- solve NLP ------
self.opti.set_initial(self.opti.lam_g, self.opti_lam_g0) opti.solver("ipopt") # set numerical backend
self.opti.set_initial(self.opti.x, self.opti_x0) sol = opti.solve() # actual solve
sol = self.opti.solve() # actual solve
self.opti_x0 = sol.value(self.opti.x) #x0 = sol.value(opti.x)
self.opti_lam_g0 = sol.value(self.opti.lam_g) #lam_g0 = sol.value(opti.lam_g)
#opti.set_initial(opti.lam_g, lam_g0)
#u_opt_1 = map(lambda x: float(x), [u_opt[i * 2] for i in range(0, 60)]) #opti.set_initial(opti.x, x0)
#u_opt_2 = map(lambda x: float(x), [u_opt[i * 2 + 1] for i in range(0, 60)]) #opti.solve()
u_opt_1 = sol.value(self.U[0,:])
u_opt_2 = sol.value(self.U[1,:])
return (u_opt_1, u_opt_2)
#lam_g0 = sol.value(self.opti.lam_g)
#self.opti.solve()
from pylab import plot, step, figure, legend, show, spy from pylab import plot, step, figure, legend, show, spy
@ -271,8 +206,8 @@ class OpenLoopSolver:
plt.figure(3) plt.figure(3)
plot(sol.value(posx), sol.value(posy)) plot(sol.value(posx), sol.value(posy))
ax = plt.gca() ax = plt.gca()
#circle = plt.Circle(p, r) circle = plt.Circle(p, r)
#ax.add_artist(circle) ax.add_artist(circle)
#plot(limit(sol.value(pos)),'r--',label="speed limit") #plot(limit(sol.value(pos)),'r--',label="speed limit")
#step(range(N),sol.value(U),'k',label="throttle") #step(range(N),sol.value(U),'k',label="throttle")
legend(loc="upper left") legend(loc="upper left")

View File

@ -11,7 +11,6 @@ import pygame
import numpy as np import numpy as np
import socket import socket
import scipy.integrate import scipy.integrate
import copy
import threading import threading
from copy import deepcopy from copy import deepcopy
@ -119,12 +118,8 @@ class RemoteController:
self.dirs, = self.ax.plot([], []) self.dirs, = self.ax.plot([], [])
plt.xlabel('x-position') plt.xlabel('x-position')
plt.ylabel('y-position') plt.ylabel('y-position')
plt.grid()
self.ols = OpenLoopSolver() self.ols = OpenLoopSolver()
self.ols.setup()
self.target = (0.0, 0.0, 0.0)
def ani(self): def ani(self):
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True) self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
@ -212,21 +207,6 @@ class RemoteController:
self.mutex.release() self.mutex.release()
def controller(self): def controller(self):
tgrid = None
us1 = None
us2 = None
u1 = -0.0
u2 = 0.0
r = scipy.integrate.ode(f_ode)
omega_max = 5.0
init_pos = None
init_time = None
final_pos = None
final_time = None
forward = True
print("starting control") print("starting control")
while True: while True:
@ -237,7 +217,7 @@ class RemoteController:
if keyboard_control: # keyboard controller if keyboard_control: # keyboard controller
events = pygame.event.get() events = pygame.event.get()
speed = 1.0 speed = 0.5
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT: if event.key == pygame.K_LEFT:
@ -256,17 +236,17 @@ class RemoteController:
self.u1 = -speed self.u1 = -speed
self.u2 = -speed self.u2 = -speed
#print("forward: ({},{})".format(u1, u2)) #print("forward: ({},{})".format(u1, u2))
self.rc_socket.send('({},{},{})\n'.format(0.1, self.u1, self.u2)) self.rc_socket.send('({},{})\n'.format(self.u1, self.u2))
elif event.type == pygame.KEYUP: elif event.type == pygame.KEYUP:
self.u1 = 0 self.u1 = 0
self.u2 = 0 self.u2 = 0
#print("key released, resetting: ({},{})".format(u1, u2)) #print("key released, resetting: ({},{})".format(u1, u2))
self.rc_socket.send('({}, {},{})\n'.format(0.1, self.u1, self.u2)) self.rc_socket.send('({},{})\n'.format(self.u1, self.u2))
tnew = time.time() tnew = time.time()
dt = tnew - self.t dt = tnew - self.t
r = scipy.integrate.ode(f_ode) r = scipy.integrate.ode(f_ode)
r.set_f_params(np.array([self.u1 * omega_max, self.u2 * omega_max])) r.set_f_params(np.array([self.u1 * 13.32, self.u2 * 13.32]))
#print(self.x0) #print(self.x0)
if self.x0 is None: if self.x0 is None:
@ -294,59 +274,18 @@ class RemoteController:
events = pygame.event.get() events = pygame.event.get()
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
if event.key == pygame.K_1: if event.key == pygame.K_LEFT:
self.controlling = True self.speed = self.speed / np.sqrt(np.sqrt(np.sqrt(10.0)))
forward = True elif event.key == pygame.K_RIGHT:
print("starting test") self.speed = self.speed * np.sqrt(np.sqrt(np.sqrt(10.0)))
self.mutex.acquire() elif event.key == pygame.K_UP:
try: u1 = self.speed
init_pos = copy.deepcopy(self.xms[-1]) u2 = -self.speed
init_time = copy.deepcopy(self.tms[-1]) elif event.key == pygame.K_DOWN:
finally: u1 = 0.0
self.mutex.release() u2 = 0.0
if event.key == pygame.K_2: print("speed = {}".format(self.speed))
self.controlling = True self.rc_socket.send('({},{})\n'.format(u1, u2))
forward = False
print("starting test")
self.mutex.acquire()
try:
init_pos = copy.deepcopy(self.xms[-1])
init_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
elif event.key == pygame.K_3:
self.controlling = False
print("stopping test")
self.rc_socket.send('(0.1, 0.0,0.0)\n')
self.mutex.acquire()
try:
final_pos = copy.deepcopy(self.xms[-1])
final_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
print("init_pos = {}".format(init_pos))
print("final_pos = {}".format(final_pos))
print("distance = {}".format(np.linalg.norm(init_pos[0:2]-final_pos[0:2])))
print("dt = {}".format(final_time - init_time))
d = np.linalg.norm(init_pos[0:2]-final_pos[0:2])
t = final_time - init_time
r = 0.03
angular_velocity = d/r/t
print("average angular velocity = {}".format(angular_velocity))
if self.controlling:
if forward:
self.rc_socket.send('(0.1, 1.0,1.0)\n')
else:
self.rc_socket.send('(0.1, -1.0,-1.0)\n')
time.sleep(0.1)
#print("speed = {}".format(self.speed))
elif pid: elif pid:
# pid controller # pid controller
@ -403,98 +342,7 @@ class RemoteController:
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
if event.key == pygame.K_UP: if event.key == pygame.K_UP:
self.controlling = True self.ols.solve(self.xms[-1])
self.t = time.time()
elif event.key == pygame.K_DOWN:
self.controlling = False
self.rc_socket.send('(0.1, 0.0,0.0)\n')
elif event.key == pygame.K_0:
self.target = (0.0, 0.0, 0.0)
elif event.key == pygame.K_1:
self.target = (0.5,0.5, -np.pi/2.0)
elif event.key == pygame.K_2:
self.target = (0.5, -0.5, 0.0)
elif event.key == pygame.K_3:
self.target = (-0.5,-0.5, np.pi/2.0)
elif event.key == pygame.K_4:
self.target = (-0.5,0.5, 0.0)
if self.controlling:
tmpc_start = time.time()
# get measurement
self.mutex.acquire()
try:
last_measurement = copy.deepcopy(self.xms[-1])
last_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
print("current measurement (t, x) = ({}, {})".format(last_time, last_measurement))
print("current control (u1, u2) = ({}, {})".format(u1, u2))
# prediction of state at time the mpc will terminate
r.set_f_params(np.array([u1 * omega_max, u2 * omega_max]))
r.set_initial_value(last_measurement, last_time)
dt = self.ols.T/self.ols.N
print("integrating for {} seconds".format((dt)))
x_pred = r.integrate(r.t + (dt))
print("predicted initial state x_pred = ({})".format(x_pred))
res = self.ols.solve(x_pred, self.target)
#tgrid = res[0]
us1 = res[0]
us2 = res[1]
# tt = 0
# x = last_measurement
# t_ol = np.array([tt])
# x_ol = np.array([x])
# # compute open loop prediction
# for i in range(len(us1)):
# r = scipy.integrate.ode(f_ode)
# r.set_f_params(np.array([us1[i] * 13.32, us2[i] * 13.32]))
# r.set_initial_value(x, tt)
#
# tt = tt + 0.1
# x = r.integrate(tt)
#
# t_ol = np.vstack((t_ol, tt))
# x_ol = np.vstack((x_ol, x))
#plt.figure(4)
#plt.plot(x_ol[:,0], x_ol[:,1])
#if event.key == pygame.K_DOWN:
# if tgrid is not None:
tmpc_end = time.time()
print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < dt:
print("sleeping for {} seconds...".format(dt - dt_mpc))
time.sleep(dt - dt_mpc)
self.mutex.acquire()
try:
second_measurement = copy.deepcopy(self.xms[-1])
second_time = copy.deepcopy(self.tms[-1])
finally:
self.mutex.release()
print("(last_time, second_time, dt) = ({}, {}, {})".format(last_time, second_time, second_time - last_time))
print("mismatch between predicted state and measured state: {}\n\n".format(second_measurement - last_measurement))
for i in range(0, 1):
u1 = us1[i]
u2 = us2[i]
self.rc_socket.send('({},{},{})\n'.format(dt,u1, u2))
self.t = time.time()
#time.sleep(0.2)
#
pass
def main(args): def main(args):
rospy.init_node('controller_node', anonymous=True) rospy.init_node('controller_node', anonymous=True)