rewrote position controller script. now animated plot works for keyboard control and pid control

This commit is contained in:
Simon Pirkelmann 2019-06-11 16:26:53 +02:00
parent 9595a68494
commit 480e1f523c

View File

@ -1,14 +1,14 @@
# startup: # startup:
# roscore # roscore -> start ros
# rosparam set cv_camera/device_id 0 # rosparam set cv_camera/device_id 0 -> set appropriate camera device
# rosrun cv_camera cv_camera_node # rosrun cv_camera cv_camera_node -> start the camera
# roslaunch aruco_detect aruco_detect.launch camera:=cv_camera image:=image_raw dictionary:=16 transport:= fiducial_len:=0.1 # aruco marker detection
# python fiducial_to_2d_pos_angle.py -> compute position and angle of markers in 2d plane
import sys import sys
import rospy import rospy
import pygame import pygame
import numpy as np import numpy as np
import cv2
import cv2.aruco as aruco
import socket import socket
import scipy.integrate import scipy.integrate
@ -20,59 +20,7 @@ import matplotlib.animation as anim
import time import time
from sensor_msgs.msg import Image from marker_pos_angle.msg import id_pos_angle
from sensor_msgs.msg import CompressedImage
from cv_bridge import CvBridge, CvBridgeError
import math
pygame.init()
pygame.font.init()
#pygame.joystick.init()
myfont = pygame.font.SysFont('Comic Sans MS', 30)
pygame.display.set_caption("ROS camera stream on Pygame")
screenheight = 1024
screenwidth = 1280 #4*screenheight//3
screen = pygame.display.set_mode([screenwidth, screenheight])
red = (255, 0, 0)
teal = (0, 255, 255)
# ros setup
camera_stream = "/cv_camera/image_raw"
#camera_stream = "/image_raw"
compression = False
# taken from https://www.learnopencv.com/rotation-matrix-to-euler-angles/
# Checks if a matrix is a valid rotation matrix.
def isRotationMatrix(R):
Rt = np.transpose(R)
shouldBeIdentity = np.dot(Rt, R)
I = np.identity(3, dtype=R.dtype)
n = np.linalg.norm(I - shouldBeIdentity)
return n < 1e-6
# Calculates rotation matrix to euler angles
# The result is the same as MATLAB except the order
# of the euler angles ( x and z are swapped ).
def rotationMatrixToEulerAngles(R):
assert (isRotationMatrix(R))
sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0])
singular = sy < 1e-6
if not singular:
x = math.atan2(R[2, 1], R[2, 2])
y = math.atan2(-R[2, 0], sy)
z = math.atan2(R[1, 0], R[0, 0])
else:
x = math.atan2(-R[1, 2], R[1, 1])
y = math.atan2(-R[2, 0], sy)
z = 0
return np.array([x, y, z])
class Robot: class Robot:
def __init__(self, id, ip=None): def __init__(self, id, ip=None):
@ -110,37 +58,40 @@ def f_ode(t, x, u):
class RemoteController: class RemoteController:
def __init__(self): def __init__(self):
#self.cam = cv2.VideoCapture(1)
#self.image_pub = rospy.Publisher("pygame_image", Image) self.robots = [Robot(3)]
self.bridge = CvBridge() self.robot_ids = {}
for r in self.robots:
#self.cv_image = np.zeros((1, 1, 3), np.uint8) self.robot_ids[r.id] = r
self.cv_image = None
self.robots = [Robot(2), Robot(6), Robot(7), Robot(8), Robot(9)]
self.robot_ids = [r.id for r in self.robots]
screen.fill([0, 0, 0])
cv_file = cv2.FileStorage("test.yaml", cv2.FILE_STORAGE_READ)
self.camera_matrix = cv_file.getNode("camera_matrix").mat()
self.dist_matrix = cv_file.getNode("dist_coeff").mat()
self.aruco_dict = aruco.Dictionary_get(aruco.DICT_ARUCO_ORIGINAL)
self.parameters = aruco.DetectorParameters_create()
# connect to robot
self.rc_socket = socket.socket() self.rc_socket = socket.socket()
try: try:
pass pass
self.rc_socket.connect(('192.168.4.1', 1234)) # connect to robot self.rc_socket.connect(('192.168.1.101', 1234)) # connect to robot
except socket.error: except socket.error:
print("could not connect to socket") print("could not connect to socket")
self.t = time.time()
# variables for simulated state
self.x0 = None
self.ts = np.array([])
self.xs = []
# variables for measurements
self.tms_0 = None
self.xm_0 = None
self.tms = None
self.xms = None
self.mutex = threading.Lock() self.mutex = threading.Lock()
marker_sub = rospy.Subscriber("/marker_id_pos_angle", id_pos_angle, self.measurement_callback)
# pid parameters
self.k = 0 self.k = 0
self.ii = 0.1 self.ii = 0.1
self.pp = 0.4 self.pp = 0.4
@ -155,51 +106,31 @@ class RemoteController:
self.u1 = 0.0 self.u1 = 0.0
self.u2 = 0.0 self.u2 = 0.0
self.t = time.time() # animation
self.x0 = np.zeros(3)
self.ts = np.array([0.0])
self.xs = np.array([[0.0, 0.0, 0.0]])
self.ys = []
self.omegas = []
self.tms_0 = None
self.tms = None #np.array([0.0])
self.xm_0 = None
self.xms = None #np.array([[0.0, 0.0, 0.0]])
self.alpha_0 = None
self.alphas = []
self.pos_0 = None
self.possx = []
self.possy = []
if compression:
self.image_sub = rospy.Subscriber(camera_stream + "/compressed", CompressedImage, self.callback)
else:
self.image_sub = rospy.Subscriber(camera_stream, Image, self.callback)
self.fig = plt.figure() self.fig = plt.figure()
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
self.ax = self.fig.add_subplot(1,1,1) self.ax = self.fig.add_subplot(1,1,1)
self.xdata, self.ydata = [], [] self.xdata, self.ydata = [], []
self.line, = self.ax.plot([],[]) self.line, = self.ax.plot([],[])
self.line_sim, = self.ax.plot([], []) self.line_sim, = self.ax.plot([], [])
self.dir, = self.ax.plot([], []) self.dirm, = self.ax.plot([], [])
self.dirs, = self.ax.plot([], [])
plt.xlabel('x-position') plt.xlabel('x-position')
plt.ylabel('y-position') plt.ylabel('y-position')
def ani(self):
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
plt.ion()
plt.show(block=True)
def ani_init(self): def ani_init(self):
self.ax.set_xlim(-2, 2) self.ax.set_xlim(-2, 2)
self.ax.set_ylim(-2, 2) self.ax.set_ylim(-2, 2)
self.ax.set_aspect('equal', adjustable='box') self.ax.set_aspect('equal', adjustable='box')
return self.line, self.line_sim, self.dir, return self.line, self.line_sim, self.dirm, self.dirs,
def ani_update(self, frame): def ani_update(self, frame):
#print("plotting")
self.mutex.acquire() self.mutex.acquire()
try: try:
# copy data for plot from global arrays # copy data for plot from global arrays
@ -207,149 +138,98 @@ class RemoteController:
tm_local = deepcopy(self.tms) tm_local = deepcopy(self.tms)
xm_local = deepcopy(self.xms) xm_local = deepcopy(self.xms)
#print(len(tm_local))
if len(tm_local) > 0: if len(tm_local) > 0:
# plot path of the robot
self.line.set_data(xm_local[:,0], xm_local[:,1]) self.line.set_data(xm_local[:,0], xm_local[:,1])
# compute and plot direction the robot is facing
a = xm_local[-1, 0] a = xm_local[-1, 0]
b = xm_local[-1, 0] b = xm_local[-1, 1]
a2 = a + np.cos(xm_local[-1, 2]) * 1.0 a2 = a + np.cos(xm_local[-1, 2]) * 1.0
b2 = b + np.sin(xm_local[-1, 2]) * 1.0 b2 = b + np.sin(xm_local[-1, 2]) * 1.0
self.dir.set_data(np.array([a, a2]), np.array([b, b2])) self.dirm.set_data(np.array([a, a2]), np.array([b, b2]))
ts_local = deepcopy(self.ts) ts_local = deepcopy(self.ts)
xs_local = deepcopy(self.xs) xs_local = deepcopy(self.xs)
if len(ts_local) > 0: if len(ts_local) > 0:
# plot simulated path of the robot
self.line_sim.set_data(xs_local[:,0], xs_local[:,1]) self.line_sim.set_data(xs_local[:,0], xs_local[:,1])
# compute and plot direction the robot is facing
a = xs_local[-1, 0]
b = xs_local[-1, 1]
a2 = a + np.cos(xs_local[-1, 2]) * 1.0
b2 = b + np.sin(xs_local[-1, 2]) * 1.0
self.dirs.set_data(np.array([a, a2]), np.array([b, b2]))
finally: finally:
self.mutex.release() self.mutex.release()
return self.line, self.line_sim, self.dir, return self.line, self.line_sim, self.dirm, self.dirs,
def callback(self, data): def measurement_callback(self, data):
#print("data = {}".format(data))
if data.id in self.robot_ids:
r = self.robot_ids[data.id]
#self.cv_image_small = np.fliplr(self.cv_image_small) # why is this necessary? r.pos = (data.x, data.y) # only x and y component are important for us
r.euler = data.angle
# marker detection #print("r.pos = {}".format(r.pos))
#gray = cv2.cvtColor(self.cv_image, cv2.COLOR_BGR2GRAY) #print("r.angle = {}".format(r.euler))
# save measured position and angle for plotting
measurement = np.array([r.pos[0], r.pos[1], r.euler])
if self.tms_0 is None:
self.tms_0 = time.time()
self.xm_0 = measurement
#print("robot {} pos = {}".format(r.id, r.pos)) self.mutex.acquire()
try:
#ret_val, self.cv_image = self.cam.read() self.tms = np.array([0.0])
try: self.xms = measurement
if compression: finally:
self.cv_image = self.bridge.compressed_imgmsg_to_cv2(data) self.mutex.release()
else: else:
self.cv_image = self.bridge.imgmsg_to_cv2(data, "bgr8") self.mutex.acquire()
except CvBridgeError as e: try:
print(e) self.tms = np.vstack((self.tms, time.time() - self.tms_0))
self.xms = np.vstack((self.xms, measurement))
finally:
self.mutex.release()
corners, ids, rejectedImgPoints = aruco.detectMarkers(self.cv_image, self.aruco_dict, parameters=self.parameters) def controller(self):
marker_found = len(corners) > 0 print("starting control")
if marker_found:
markers = zip(corners, ids)
#print("found!")
# filter markers with unknown ids
#print("detected markers = {}".format(markers))
markers_filtered = list(filter(lambda x: x[1] in self.robot_ids, markers))
#print("filtered markers = {}".format(markers_filtered))
if len(markers_filtered) > 0:
filtered_corners, filtered_ids = zip(*markers_filtered)
#print("filtered corners = {}".format(filtered_corners[0]))
rvec, tvec, _ = aruco.estimatePoseSingleMarkers(filtered_corners, 0.1, self.camera_matrix,
self.dist_matrix)
aruco.drawDetectedMarkers(self.cv_image, filtered_corners)
for i in range(len(filtered_corners)):
aruco.drawAxis(self.cv_image, self.camera_matrix, self.dist_matrix, rvec[i], tvec[i],
0.1)
for r in self.robots:
if r.id == filtered_ids[i]:
r.pos = tvec[i][0] # only x and y component are important for us
r.orient = rvec[i][0]
r.rot_mat, r.jacobian = cv2.Rodrigues(r.orient)
r.euler = rotationMatrixToEulerAngles(r.rot_mat)
# save measured position and angle for plotting
measurement = np.array([-r.pos[0], -r.pos[1], r.euler[2] + np.pi/4.0])
if self.xm_0 is None:
self.tms_0 = time.time()
self.xm_0 = deepcopy(measurement)
self.xm_0[2] = 0.0
self.tms = np.array([self.tms_0])
self.xms = measurement - self.xm_0
else:
self.mutex.acquire()
try:
self.tms = np.vstack((self.tms, time.time() - self.tms_0))
self.xms = np.vstack((self.xms, measurement - self.xm_0))
if len(self.tms) == 50:
self.alpha_0 = np.mean(self.xms[:,2])
finally:
self.mutex.release()
def show_display(self):
while self.alpha_0 is None:
pass
self.x0[2] = self.alpha_0
print("alpha_0 = {}".format(self.alpha_0))
print("show_display")
while True: while True:
#self.capture()
# show ros camera image on the pygame screen keyboard_control = False
if self.cv_image is not None:
image = cv2.resize(self.cv_image,(screenwidth,screenheight))
frame = cv2.cvtColor(self.cv_image, cv2.COLOR_BGR2RGB)
frame = np.rot90(frame)
frame = pygame.surfarray.make_surface(frame)
screen.blit(frame, (0, 0))
# plot robot positions
for r in self.robots:
if r.euler is not None:
#print("r.pos = {}".format(r.pos))
#print("r.euler = {}".format(r.euler[2]))
#print("drawing at {}".format(r.pos))
#pygame.draw.circle(screen, (255, 0, 0), r.pos, 10)
pass
pygame.display.update()
keyboard_control = True
keyboard_control_speed_test = False keyboard_control_speed_test = False
pid = False pid = True
if keyboard_control: if keyboard_control: # keyboard controller
events = pygame.event.get() events = pygame.event.get()
speed = 1.0 speed = 0.5
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT: if event.key == pygame.K_LEFT:
self.u1 = speed self.u1 = -speed
self.u2 = -speed self.u2 = speed
#print("turn left: ({},{})".format(u1, u2)) #print("turn left: ({},{})".format(u1, u2))
elif event.key == pygame.K_RIGHT: elif event.key == pygame.K_RIGHT:
self.u1 = -speed self.u1 = speed
self.u2 = speed self.u2 = -speed
#print("turn right: ({},{})".format(u1, u2)) #print("turn right: ({},{})".format(u1, u2))
elif event.key == pygame.K_UP: elif event.key == pygame.K_UP:
self.u1 = -speed
self.u2 = -speed
print("forward: ({},{})".format(self.u1, self.u2))
elif event.key == pygame.K_DOWN:
self.u1 = speed self.u1 = speed
self.u2 = speed self.u2 = speed
#print("forward: ({},{})".format(self.u1, self.u2))
elif event.key == pygame.K_DOWN:
self.u1 = -speed
self.u2 = -speed
#print("forward: ({},{})".format(u1, u2)) #print("forward: ({},{})".format(u1, u2))
self.rc_socket.send('({},{})\n'.format(self.u1, self.u2)) self.rc_socket.send('({},{})\n'.format(self.u1, self.u2))
elif event.type == pygame.KEYUP: elif event.type == pygame.KEYUP:
@ -363,6 +243,13 @@ class RemoteController:
r = scipy.integrate.ode(f_ode) r = scipy.integrate.ode(f_ode)
r.set_f_params(np.array([self.u1 * 13.32, self.u2 * 13.32])) r.set_f_params(np.array([self.u1 * 13.32, self.u2 * 13.32]))
#print(self.x0)
if self.x0 is None:
if self.xm_0 is not None:
self.x0 = self.xm_0
self.xs = self.x0
else:
print("error: no measurement available to initialize simulation")
x = self.x0 x = self.x0
r.set_initial_value(x, self.t) r.set_initial_value(x, self.t)
xnew = r.integrate(r.t + dt) xnew = r.integrate(r.t + dt)
@ -372,30 +259,10 @@ class RemoteController:
self.mutex.acquire() self.mutex.acquire()
try: try:
self.ts = np.vstack((self.ts, tnew)) self.ts = np.append(self.ts, tnew)
self.xs = np.vstack((self.xs, xnew)) self.xs = np.vstack((self.xs, xnew))
#self.ys.append(xnew[1])
#self.omegas.append(xnew[2])
finally: finally:
self.mutex.release() self.mutex.release()
# for r in self.robots:
# if r.euler is not None:
# if self.alpha_0 is not None:
# self.alphas.append(r.euler[2]-self.alpha_0)
# else:
# self.alpha_0 = r.euler[2]
# self.alphas.append(0.0)
# if r.pos is not None:
# if self.pos_0 is not None:
# self.possx.append(r.pos[0] - self.pos_0[0])
# self.possy.append(r.pos[1] - self.pos_0[1])
# else:
# self.pos_0 = r.pos[0:2]
# self.possx.append(0.0)
# self.possy.append(0.0)
#print("(t,x,u) = ({},{},{})".format(tnew,xnew,[self.u1, self.u2]))
#time.sleep(0.1)
elif keyboard_control_speed_test: elif keyboard_control_speed_test:
@ -416,6 +283,8 @@ class RemoteController:
self.rc_socket.send('({},{})\n'.format(u1, u2)) self.rc_socket.send('({},{})\n'.format(u1, u2))
elif pid: elif pid:
# pid controller
events = pygame.event.get() events = pygame.event.get()
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
@ -431,57 +300,35 @@ class RemoteController:
self.controlling = False self.controlling = False
self.rc_socket.send('({},{})\n'.format(0, 0)) self.rc_socket.send('({},{})\n'.format(0, 0))
dt = 0.1 dt = 0.05
#i = 0.0 # 0.001
if self.controlling: if self.controlling:
# test: turn robot such that angle is zero # test: turn robot such that angle is zero
for r in self.robots: for r in self.robots:
if r.euler is not None: if r.euler is not None:
self.k = self.k + 1 self.k = self.k + 1
alpha = r.euler[2] alpha = r.euler
self.alphas.append(alpha) self.alphas.append(alpha)
# compute error
e = alpha - 0 e = alpha - 0
p = self.pp * e
# compute integral of error (approximately)
self.inc += e * dt self.inc += e * dt
# PID
p = self.pp * e
i = self.ii * self.inc
d = 0.0 d = 0.0
u1 = p + self.ii * self.inc + d # compute controls for robot from PID
u2 = - p - self.ii * self.inc - d u1 = p + i + d
u2 = - p - i - d
print("alpha = {}, u = ({}, {})".format(alpha, u1, u2)) print("alpha = {}, u = ({}, {})".format(alpha, u1, u2))
self.rc_socket.send('({},{})\n'.format(u1, u2)) self.rc_socket.send('({},{})\n'.format(u1, u2))
time.sleep(dt) time.sleep(dt)
# elif self.k == kmax:
# u1 = u2 = 0.0
# self.rc_socket.send('({},{})\n'.format(u1, u2))
# self.k = self.k + 1
#
# plt.plot(np.array(self.alphas))
# plt.show()
pass
def calibration_sequence(self):
speed = 1.0
u1 = speed
u2 = speed
self.rc_socket.send('({},{})\n'.format(u1, u2))
time.sleep(4.0)
u1 = u2 = 0
self.rc_socket.send('({},{})\n'.format(u1, u2))
self.rc_socket.send('\n')
# test:
# -> 1.6 m in 4 seconds
# angular velocity: angle/second
# 1.6 / (2 * pi * 0.03)
# l = 1.6
# r = 0.03
# t = 4.0
# -> number of rotations n = l / (2 * pi * r)
# -> angular velocity = 2 * pi * n / t = l / (r * t)
# result: maximum angular velocity: omega_max = 13.32 rad/sec
def main(args): def main(args):
rospy.init_node('controller_node', anonymous=True) rospy.init_node('controller_node', anonymous=True)
@ -489,25 +336,14 @@ def main(args):
rc = RemoteController() rc = RemoteController()
pygame.init() pygame.init()
pygame.display.set_mode((640, 480))
#p = multiprocessing.Process(target=rc.show_display) screenheight = 480
threading.Thread(target=rc.show_display).start() screenwidth = 640
#p.start() screen = pygame.display.set_mode([screenwidth, screenheight])
plt.ion()
plt.pause(0.01)
#p.join()
pass
#rc.show_display() threading.Thread(target=rc.controller).start()
#rc.calibration_sequence()
#game.loop()
# try: rc.ani()
# rospy.spin()
# except KeyboardInterrupt:
# print("Shutting down")
# cv2.destroyAllWindows()
if __name__ == '__main__': if __name__ == '__main__':