added simulation of robot

This commit is contained in:
Simon Pirkelmann 2019-05-24 09:21:54 -05:00
parent f3de1b173a
commit 530cef54b3

View File

@ -1,5 +1,6 @@
# startup:
# roscore
# rosparam set cv_camera/device_id 0
# rosrun cv_camera cv_camera_node
import sys
@ -9,10 +10,20 @@ import numpy as np
import cv2
import cv2.aruco as aruco
import socket
import scipy.integrate
import threading
from copy import deepcopy
import matplotlib.pyplot as plt
import matplotlib.animation as anim
import time
from sensor_msgs.msg import Image
from sensor_msgs.msg import CompressedImage
from cv_bridge import CvBridge, CvBridgeError
import math
pygame.init()
pygame.font.init()
@ -20,8 +31,8 @@ pygame.font.init()
myfont = pygame.font.SysFont('Comic Sans MS', 30)
pygame.display.set_caption("ROS camera stream on Pygame")
screenheight = 480
screenwidth = 640 #4*screenheight//3
screenheight = 1024
screenwidth = 1280 #4*screenheight//3
screen = pygame.display.set_mode([screenwidth, screenheight])
red = (255, 0, 0)
teal = (0, 255, 255)
@ -31,33 +42,93 @@ camera_stream = "/cv_camera/image_raw"
#camera_stream = "/image_raw"
compression = False
# taken from https://www.learnopencv.com/rotation-matrix-to-euler-angles/
# Checks if a matrix is a valid rotation matrix.
def isRotationMatrix(R):
Rt = np.transpose(R)
shouldBeIdentity = np.dot(Rt, R)
I = np.identity(3, dtype=R.dtype)
n = np.linalg.norm(I - shouldBeIdentity)
return n < 1e-6
# Calculates rotation matrix to euler angles
# The result is the same as MATLAB except the order
# of the euler angles ( x and z are swapped ).
def rotationMatrixToEulerAngles(R):
assert (isRotationMatrix(R))
sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0])
singular = sy < 1e-6
if not singular:
x = math.atan2(R[2, 1], R[2, 2])
y = math.atan2(-R[2, 0], sy)
z = math.atan2(R[1, 0], R[0, 0])
else:
x = math.atan2(-R[1, 2], R[1, 1])
y = math.atan2(-R[2, 0], sy)
z = 0
return np.array([x, y, z])
class Robot:
def __init__(self, id, ip=None):
self.pos = None
self.orient = None
self.id = id
self.ip = None
self.pos = None
self.euler = None
self.ip = ip
def f_ode(t, x, u):
# dynamical model of the two-wheeled robot
# TODO: find exact values for these parameters
r = 0.03
R = 0.05
d = 0.02
theta = x[2]
omega_r = u[0]
omega_l = u[1]
dx = np.zeros(3)
dx[0] = (r/2.0 * np.cos(theta) - r*d/(2.0*R) * np.sin(theta)) * omega_r \
+ (r/2.0 * np.cos(theta) + r*d/(2.0 * R) * np.sin(theta)) * omega_l
dx[1] = (r/2.0 * np.sin(theta) + r*d/(2.0*R) * np.cos(theta)) * omega_r \
+ (r/2 * np.sin(theta) - r*d/(2.0*R) * np.cos(theta)) * omega_l
dx[2] = -r/(2.0*R) * (omega_r - omega_l)
return dx
class RemoteController:
def __init__(self):
self.cam = cv2.VideoCapture(2)
#self.cam = cv2.VideoCapture(1)
#self.image_pub = rospy.Publisher("pygame_image", Image)
#self.bridge = CvBridge()
#if compression:
# self.image_sub = rospy.Subscriber(camera_stream + "/compressed", CompressedImage, self.callback)
#else:
# self.image_sub = rospy.Subscriber(camera_stream, Image, self.callback)
self.bridge = CvBridge()
#self.cv_image = np.zeros((1, 1, 3), np.uint8)
self.cv_image = None
self.robots = [Robot(1), Robot(2), Robot(512)]
self.robots = [Robot(2), Robot(6), Robot(7), Robot(8), Robot(9)]
self.robot_ids = [r.id for r in self.robots]
screen.fill([0, 0, 0])
cv_file = cv2.FileStorage("test.yaml", cv2.FILE_STORAGE_READ)
self.camera_matrix = cv_file.getNode("camera_matrix").mat()
self.dist_matrix = cv_file.getNode("dist_coeff").mat()
self.aruco_dict = aruco.Dictionary_get(aruco.DICT_ARUCO_ORIGINAL)
self.parameters = aruco.DetectorParameters_create()
@ -68,14 +139,99 @@ class RemoteController:
except socket.error:
print("could not connect to socket")
def callback(self, data):
self.mutex = threading.Lock()
self.k = 0
self.ii = 0.1
self.pp = 0.4
self.inc = 0.0
self.alphas = []
self.speed = 1.0
self.controlling = False
self.u1 = 0.0
self.u2 = 0.0
self.t = time.time()
self.x0 = np.zeros(3)
self.ts = np.array([0.0])
self.xs = np.array([[0.0, 0.0, 0.0]])
self.ys = []
self.omegas = []
self.tms_0 = None
self.tms = None #np.array([0.0])
self.xm_0 = None
self.xms = None #np.array([[0.0, 0.0, 0.0]])
self.alpha_0 = None
self.alphas = []
self.pos_0 = None
self.possx = []
self.possy = []
if compression:
self.image_sub = rospy.Subscriber(camera_stream + "/compressed", CompressedImage, self.callback)
else:
self.image_sub = rospy.Subscriber(camera_stream, Image, self.callback)
self.fig = plt.figure()
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
self.ax = self.fig.add_subplot(1,1,1)
self.xdata, self.ydata = [], []
self.line, = self.ax.plot([],[])
self.line_sim, = self.ax.plot([], [])
self.dir, = self.ax.plot([], [])
plt.xlabel('x-position')
plt.ylabel('y-position')
def ani_init(self):
self.ax.set_xlim(-2, 2)
self.ax.set_ylim(-2, 2)
self.ax.set_aspect('equal', adjustable='box')
return self.line, self.line_sim, self.dir,
def ani_update(self, frame):
self.mutex.acquire()
try:
if compression:
self.cv_image = self.bridge.compressed_imgmsg_to_cv2(data)
else:
self.cv_image = self.bridge.imgmsg_to_cv2(data, "bgr8")
except CvBridgeError as e:
print(e)
# copy data for plot from global arrays
if self.tms is not None:
tm_local = deepcopy(self.tms)
xm_local = deepcopy(self.xms)
#print(len(tm_local))
if len(tm_local) > 0:
self.line.set_data(xm_local[:,0], xm_local[:,1])
a = xm_local[-1, 0]
b = xm_local[-1, 0]
a2 = a + np.cos(xm_local[-1, 2]) * 1.0
b2 = b + np.sin(xm_local[-1, 2]) * 1.0
self.dir.set_data(np.array([a, a2]), np.array([b, b2]))
ts_local = deepcopy(self.ts)
xs_local = deepcopy(self.xs)
if len(ts_local) > 0:
self.line_sim.set_data(xs_local[:,0], xs_local[:,1])
finally:
self.mutex.release()
return self.line, self.line_sim, self.dir,
def callback(self, data):
#self.cv_image_small = np.fliplr(self.cv_image_small) # why is this necessary?
# marker detection
@ -84,67 +240,247 @@ class RemoteController:
#print("robot {} pos = {}".format(r.id, r.pos))
def capture(self):
ret_val, self.cv_image = self.cam.read()
#ret_val, self.cv_image = self.cam.read()
try:
if compression:
self.cv_image = self.bridge.compressed_imgmsg_to_cv2(data)
else:
self.cv_image = self.bridge.imgmsg_to_cv2(data, "bgr8")
except CvBridgeError as e:
print(e)
corners, ids, rejectedImgPoints = aruco.detectMarkers(self.cv_image, self.aruco_dict, parameters=self.parameters)
rvec, tvec, _ = aruco.estimatePoseSingleMarkers(corners[0], 0.05, mtx, dist)
marker_found = len(corners) > 0
if marker_found:
for i in range(len(corners)):
#print("id = {}".format(ids[i]))
pos = np.mean(corners[i][0], axis=0)
markers = zip(corners, ids)
#print("found!")
# filter markers with unknown ids
#print("detected markers = {}".format(markers))
markers_filtered = list(filter(lambda x: x[1] in self.robot_ids, markers))
#print("filtered markers = {}".format(markers_filtered))
if len(markers_filtered) > 0:
filtered_corners, filtered_ids = zip(*markers_filtered)
#print("filtered corners = {}".format(filtered_corners[0]))
for r in self.robots:
if r.id == ids[i]:
pos_flipped = (self.cv_image.shape[1] - int(pos[0]), int(pos[1]))
r.pos = pos_flipped
rvec, tvec, _ = aruco.estimatePoseSingleMarkers(filtered_corners, 0.1, self.camera_matrix,
self.dist_matrix)
aruco.drawDetectedMarkers(self.cv_image, filtered_corners)
for i in range(len(filtered_corners)):
aruco.drawAxis(self.cv_image, self.camera_matrix, self.dist_matrix, rvec[i], tvec[i],
0.1)
for r in self.robots:
if r.id == filtered_ids[i]:
r.pos = tvec[i][0] # only x and y component are important for us
r.orient = rvec[i][0]
r.rot_mat, r.jacobian = cv2.Rodrigues(r.orient)
r.euler = rotationMatrixToEulerAngles(r.rot_mat)
# save measured position and angle for plotting
measurement = np.array([-r.pos[0], -r.pos[1], r.euler[2] + np.pi/4.0])
if self.xm_0 is None:
self.tms_0 = time.time()
self.xm_0 = deepcopy(measurement)
self.xm_0[2] = 0.0
self.tms = np.array([self.tms_0])
self.xms = measurement - self.xm_0
else:
self.mutex.acquire()
try:
self.tms = np.vstack((self.tms, time.time() - self.tms_0))
self.xms = np.vstack((self.xms, measurement - self.xm_0))
if len(self.tms) == 50:
self.alpha_0 = np.mean(self.xms[:,2])
finally:
self.mutex.release()
def show_display(self):
while self.alpha_0 is None:
pass
self.x0[2] = self.alpha_0
print("alpha_0 = {}".format(self.alpha_0))
print("show_display")
while True:
self.capture()
#self.capture()
# show ros camera image on the pygame screen
#image = cv2.resize(self.cv_image,(screenwidth,screenheight))
frame = cv2.cvtColor(self.cv_image, cv2.COLOR_BGR2RGB)
frame = np.rot90(frame)
frame = pygame.surfarray.make_surface(frame)
screen.blit(frame, (0, 0))
if self.cv_image is not None:
image = cv2.resize(self.cv_image,(screenwidth,screenheight))
frame = cv2.cvtColor(self.cv_image, cv2.COLOR_BGR2RGB)
frame = np.rot90(frame)
frame = pygame.surfarray.make_surface(frame)
screen.blit(frame, (0, 0))
# plot robot positions
for r in self.robots:
if r.pos is not None:
if r.euler is not None:
#print("r.pos = {}".format(r.pos))
#print("r.euler = {}".format(r.euler[2]))
#print("drawing at {}".format(r.pos))
pygame.draw.circle(screen, (255, 0, 0), r.pos, 10)
#pygame.draw.circle(screen, (255, 0, 0), r.pos, 10)
pass
pygame.display.update()
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
u1 = 1.0
u2 = -1.0
#print("turn left: ({},{})".format(u1, u2))
elif event.key == pygame.K_RIGHT:
u1 = -1.0
u2 = 1.0
#print("turn right: ({},{})".format(u1, u2))
elif event.key == pygame.K_UP:
u1 = -1.0
u2 = -1.0
#print("forward: ({},{})".format(u1, u2))
elif event.key == pygame.K_DOWN:
u1 = 1.0
u2 = 1.0
#print("forward: ({},{})".format(u1, u2))
self.rc_socket.send('({},{})\n'.format(u1, u2))
elif event.type == pygame.KEYUP:
u1 = 0
u2 = 0
#print("key released, resetting: ({},{})".format(u1, u2))
self.rc_socket.send('({},{})\n'.format(u1, u2))
keyboard_control = True
keyboard_control_speed_test = False
pid = False
if keyboard_control:
events = pygame.event.get()
speed = 1.0
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
self.u1 = speed
self.u2 = -speed
#print("turn left: ({},{})".format(u1, u2))
elif event.key == pygame.K_RIGHT:
self.u1 = -speed
self.u2 = speed
#print("turn right: ({},{})".format(u1, u2))
elif event.key == pygame.K_UP:
self.u1 = -speed
self.u2 = -speed
print("forward: ({},{})".format(self.u1, self.u2))
elif event.key == pygame.K_DOWN:
self.u1 = speed
self.u2 = speed
#print("forward: ({},{})".format(u1, u2))
self.rc_socket.send('({},{})\n'.format(self.u1, self.u2))
elif event.type == pygame.KEYUP:
self.u1 = 0
self.u2 = 0
#print("key released, resetting: ({},{})".format(u1, u2))
self.rc_socket.send('({},{})\n'.format(self.u1, self.u2))
tnew = time.time()
dt = tnew - self.t
r = scipy.integrate.ode(f_ode)
r.set_f_params(np.array([self.u1 * 13.32, self.u2 * 13.32]))
x = self.x0
r.set_initial_value(x, self.t)
xnew = r.integrate(r.t + dt)
self.t = tnew
self.x0 = xnew
self.mutex.acquire()
try:
self.ts = np.vstack((self.ts, tnew))
self.xs = np.vstack((self.xs, xnew))
#self.ys.append(xnew[1])
#self.omegas.append(xnew[2])
finally:
self.mutex.release()
# for r in self.robots:
# if r.euler is not None:
# if self.alpha_0 is not None:
# self.alphas.append(r.euler[2]-self.alpha_0)
# else:
# self.alpha_0 = r.euler[2]
# self.alphas.append(0.0)
# if r.pos is not None:
# if self.pos_0 is not None:
# self.possx.append(r.pos[0] - self.pos_0[0])
# self.possy.append(r.pos[1] - self.pos_0[1])
# else:
# self.pos_0 = r.pos[0:2]
# self.possx.append(0.0)
# self.possy.append(0.0)
#print("(t,x,u) = ({},{},{})".format(tnew,xnew,[self.u1, self.u2]))
#time.sleep(0.1)
elif keyboard_control_speed_test:
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
self.speed = self.speed / np.sqrt(np.sqrt(np.sqrt(10.0)))
elif event.key == pygame.K_RIGHT:
self.speed = self.speed * np.sqrt(np.sqrt(np.sqrt(10.0)))
elif event.key == pygame.K_UP:
u1 = self.speed
u2 = -self.speed
elif event.key == pygame.K_DOWN:
u1 = 0.0
u2 = 0.0
print("speed = {}".format(self.speed))
self.rc_socket.send('({},{})\n'.format(u1, u2))
elif pid:
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
self.ii = self.ii / np.sqrt(np.sqrt(np.sqrt(10.0)))
print("ii = {}".format(self.pp))
elif event.key == pygame.K_RIGHT:
self.ii = self.ii * np.sqrt(np.sqrt(np.sqrt(10.0)))
print("ii = {}".format(self.pp))
elif event.key == pygame.K_UP:
self.controlling = True
elif event.key == pygame.K_DOWN:
self.controlling = False
self.rc_socket.send('({},{})\n'.format(0, 0))
dt = 0.1
#i = 0.0 # 0.001
if self.controlling:
# test: turn robot such that angle is zero
for r in self.robots:
if r.euler is not None:
self.k = self.k + 1
alpha = r.euler[2]
self.alphas.append(alpha)
e = alpha - 0
p = self.pp * e
self.inc += e * dt
d = 0.0
u1 = p + self.ii * self.inc + d
u2 = - p - self.ii * self.inc - d
print("alpha = {}, u = ({}, {})".format(alpha, u1, u2))
self.rc_socket.send('({},{})\n'.format(u1, u2))
time.sleep(dt)
# elif self.k == kmax:
# u1 = u2 = 0.0
# self.rc_socket.send('({},{})\n'.format(u1, u2))
# self.k = self.k + 1
#
# plt.plot(np.array(self.alphas))
# plt.show()
pass
def calibration_sequence(self):
speed = 1.0
u1 = speed
u2 = speed
self.rc_socket.send('({},{})\n'.format(u1, u2))
time.sleep(4.0)
u1 = u2 = 0
self.rc_socket.send('({},{})\n'.format(u1, u2))
self.rc_socket.send('\n')
# test:
# -> 1.6 m in 4 seconds
# angular velocity: angle/second
# 1.6 / (2 * pi * 0.03)
# l = 1.6
# r = 0.03
# t = 4.0
# -> number of rotations n = l / (2 * pi * r)
# -> angular velocity = 2 * pi * n / t = l / (r * t)
# result: maximum angular velocity: omega_max = 13.32 rad/sec
def main(args):
@ -155,7 +491,16 @@ def main(args):
pygame.init()
pygame.display.set_mode((640, 480))
rc.show_display()
#p = multiprocessing.Process(target=rc.show_display)
threading.Thread(target=rc.show_display).start()
#p.start()
plt.ion()
plt.pause(0.01)
#p.join()
pass
#rc.show_display()
#rc.calibration_sequence()
#game.loop()
# try: