Compare commits

..

No commits in common. "0d14738671812bad79b79dbf62db690a0dd0513e" and "9595a684948f0ccbd96664815b8b4880edb055ea" have entirely different histories.

2 changed files with 296 additions and 134 deletions

View File

@ -6,10 +6,8 @@ pygame.display.set_mode((640, 480))
rc_socket = socket.socket() rc_socket = socket.socket()
try: try:
#rc_socket.connect(('192.168.4.1', 1234)) # connect to robot rc_socket.connect(('192.168.4.1', 1234)) # connect to robot
rc_socket.connect(('192.168.1.101', 1234)) # connect to robot #rc_socket.connect(('192.168.1.101', 1234)) # connect to robot
#rc_socket.connect(('192.168.1.102', 1234)) # connect to robot
#rc_socket.connect(('192.168.1.103', 1234)) # connect to robot
except socket.error: except socket.error:
print("could not connect to socket") print("could not connect to socket")
@ -22,21 +20,21 @@ while True:
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT: if event.key == pygame.K_LEFT:
u1 = -vmax
u2 = vmax
print("turn left: ({},{})".format(u1, u2))
elif event.key == pygame.K_RIGHT:
u1 = vmax u1 = vmax
u2 = -vmax u2 = -vmax
print("turn left: ({},{})".format(u1, u2))
elif event.key == pygame.K_RIGHT:
u1 = -vmax
u2 = vmax
print("turn right: ({},{})".format(u1, u2)) print("turn right: ({},{})".format(u1, u2))
elif event.key == pygame.K_UP: elif event.key == pygame.K_UP:
u1 = -vmax
u2 = -vmax
print("forward: ({},{})".format(u1, u2))
elif event.key == pygame.K_DOWN:
u1 = vmax u1 = vmax
u2 = vmax u2 = vmax
print("forward: ({},{})".format(u1, u2)) print("forward: ({},{})".format(u1, u2))
elif event.key == pygame.K_DOWN:
u1 = -vmax
u2 = -vmax
print("backward: ({},{})".format(u1, u2))
rc_socket.send('({},{})\n'.format(u1, u2)) rc_socket.send('({},{})\n'.format(u1, u2))
elif event.type == pygame.KEYUP: elif event.type == pygame.KEYUP:
print("key released, resetting: ({},{})".format(u1, u2)) print("key released, resetting: ({},{})".format(u1, u2))

View File

@ -1,14 +1,14 @@
# startup: # startup:
# roscore -> start ros # roscore
# rosparam set cv_camera/device_id 0 -> set appropriate camera device # rosparam set cv_camera/device_id 0
# rosrun cv_camera cv_camera_node -> start the camera # rosrun cv_camera cv_camera_node
# roslaunch aruco_detect aruco_detect.launch camera:=cv_camera image:=image_raw dictionary:=16 transport:= fiducial_len:=0.1 # aruco marker detection
# python fiducial_to_2d_pos_angle.py -> compute position and angle of markers in 2d plane
import sys import sys
import rospy import rospy
import pygame import pygame
import numpy as np import numpy as np
import cv2
import cv2.aruco as aruco
import socket import socket
import scipy.integrate import scipy.integrate
@ -20,7 +20,59 @@ import matplotlib.animation as anim
import time import time
from marker_pos_angle.msg import id_pos_angle from sensor_msgs.msg import Image
from sensor_msgs.msg import CompressedImage
from cv_bridge import CvBridge, CvBridgeError
import math
pygame.init()
pygame.font.init()
#pygame.joystick.init()
myfont = pygame.font.SysFont('Comic Sans MS', 30)
pygame.display.set_caption("ROS camera stream on Pygame")
screenheight = 1024
screenwidth = 1280 #4*screenheight//3
screen = pygame.display.set_mode([screenwidth, screenheight])
red = (255, 0, 0)
teal = (0, 255, 255)
# ros setup
camera_stream = "/cv_camera/image_raw"
#camera_stream = "/image_raw"
compression = False
# taken from https://www.learnopencv.com/rotation-matrix-to-euler-angles/
# Checks if a matrix is a valid rotation matrix.
def isRotationMatrix(R):
Rt = np.transpose(R)
shouldBeIdentity = np.dot(Rt, R)
I = np.identity(3, dtype=R.dtype)
n = np.linalg.norm(I - shouldBeIdentity)
return n < 1e-6
# Calculates rotation matrix to euler angles
# The result is the same as MATLAB except the order
# of the euler angles ( x and z are swapped ).
def rotationMatrixToEulerAngles(R):
assert (isRotationMatrix(R))
sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0])
singular = sy < 1e-6
if not singular:
x = math.atan2(R[2, 1], R[2, 2])
y = math.atan2(-R[2, 0], sy)
z = math.atan2(R[1, 0], R[0, 0])
else:
x = math.atan2(-R[1, 2], R[1, 1])
y = math.atan2(-R[2, 0], sy)
z = 0
return np.array([x, y, z])
class Robot: class Robot:
def __init__(self, id, ip=None): def __init__(self, id, ip=None):
@ -58,40 +110,37 @@ def f_ode(t, x, u):
class RemoteController: class RemoteController:
def __init__(self): def __init__(self):
#self.cam = cv2.VideoCapture(1)
self.robots = [Robot(3)] #self.image_pub = rospy.Publisher("pygame_image", Image)
self.robot_ids = {} self.bridge = CvBridge()
for r in self.robots:
self.robot_ids[r.id] = r #self.cv_image = np.zeros((1, 1, 3), np.uint8)
self.cv_image = None
self.robots = [Robot(2), Robot(6), Robot(7), Robot(8), Robot(9)]
self.robot_ids = [r.id for r in self.robots]
screen.fill([0, 0, 0])
cv_file = cv2.FileStorage("test.yaml", cv2.FILE_STORAGE_READ)
self.camera_matrix = cv_file.getNode("camera_matrix").mat()
self.dist_matrix = cv_file.getNode("dist_coeff").mat()
self.aruco_dict = aruco.Dictionary_get(aruco.DICT_ARUCO_ORIGINAL)
self.parameters = aruco.DetectorParameters_create()
# connect to robot
self.rc_socket = socket.socket() self.rc_socket = socket.socket()
try: try:
pass pass
self.rc_socket.connect(('192.168.1.101', 1234)) # connect to robot self.rc_socket.connect(('192.168.4.1', 1234)) # connect to robot
except socket.error: except socket.error:
print("could not connect to socket") print("could not connect to socket")
self.t = time.time()
# variables for simulated state
self.x0 = None
self.ts = np.array([])
self.xs = []
# variables for measurements
self.tms_0 = None
self.xm_0 = None
self.tms = None
self.xms = None
self.mutex = threading.Lock() self.mutex = threading.Lock()
marker_sub = rospy.Subscriber("/marker_id_pos_angle", id_pos_angle, self.measurement_callback)
# pid parameters
self.k = 0 self.k = 0
self.ii = 0.1 self.ii = 0.1
self.pp = 0.4 self.pp = 0.4
@ -106,31 +155,51 @@ class RemoteController:
self.u1 = 0.0 self.u1 = 0.0
self.u2 = 0.0 self.u2 = 0.0
# animation self.t = time.time()
self.x0 = np.zeros(3)
self.ts = np.array([0.0])
self.xs = np.array([[0.0, 0.0, 0.0]])
self.ys = []
self.omegas = []
self.tms_0 = None
self.tms = None #np.array([0.0])
self.xm_0 = None
self.xms = None #np.array([[0.0, 0.0, 0.0]])
self.alpha_0 = None
self.alphas = []
self.pos_0 = None
self.possx = []
self.possy = []
if compression:
self.image_sub = rospy.Subscriber(camera_stream + "/compressed", CompressedImage, self.callback)
else:
self.image_sub = rospy.Subscriber(camera_stream, Image, self.callback)
self.fig = plt.figure() self.fig = plt.figure()
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
self.ax = self.fig.add_subplot(1,1,1) self.ax = self.fig.add_subplot(1,1,1)
self.xdata, self.ydata = [], [] self.xdata, self.ydata = [], []
self.line, = self.ax.plot([],[]) self.line, = self.ax.plot([],[])
self.line_sim, = self.ax.plot([], []) self.line_sim, = self.ax.plot([], [])
self.dirm, = self.ax.plot([], []) self.dir, = self.ax.plot([], [])
self.dirs, = self.ax.plot([], [])
plt.xlabel('x-position') plt.xlabel('x-position')
plt.ylabel('y-position') plt.ylabel('y-position')
def ani(self):
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
plt.ion()
plt.show(block=True)
def ani_init(self): def ani_init(self):
self.ax.set_xlim(-2, 2) self.ax.set_xlim(-2, 2)
self.ax.set_ylim(-2, 2) self.ax.set_ylim(-2, 2)
self.ax.set_aspect('equal', adjustable='box') self.ax.set_aspect('equal', adjustable='box')
return self.line, self.line_sim, self.dirm, self.dirs, return self.line, self.line_sim, self.dir,
def ani_update(self, frame): def ani_update(self, frame):
#print("plotting")
self.mutex.acquire() self.mutex.acquire()
try: try:
# copy data for plot from global arrays # copy data for plot from global arrays
@ -138,98 +207,149 @@ class RemoteController:
tm_local = deepcopy(self.tms) tm_local = deepcopy(self.tms)
xm_local = deepcopy(self.xms) xm_local = deepcopy(self.xms)
#print(len(tm_local))
if len(tm_local) > 0: if len(tm_local) > 0:
# plot path of the robot
self.line.set_data(xm_local[:,0], xm_local[:,1]) self.line.set_data(xm_local[:,0], xm_local[:,1])
# compute and plot direction the robot is facing
a = xm_local[-1, 0] a = xm_local[-1, 0]
b = xm_local[-1, 1] b = xm_local[-1, 0]
a2 = a + np.cos(xm_local[-1, 2]) * 1.0 a2 = a + np.cos(xm_local[-1, 2]) * 1.0
b2 = b + np.sin(xm_local[-1, 2]) * 1.0 b2 = b + np.sin(xm_local[-1, 2]) * 1.0
self.dirm.set_data(np.array([a, a2]), np.array([b, b2])) self.dir.set_data(np.array([a, a2]), np.array([b, b2]))
ts_local = deepcopy(self.ts) ts_local = deepcopy(self.ts)
xs_local = deepcopy(self.xs) xs_local = deepcopy(self.xs)
if len(ts_local) > 0: if len(ts_local) > 0:
# plot simulated path of the robot
self.line_sim.set_data(xs_local[:,0], xs_local[:,1]) self.line_sim.set_data(xs_local[:,0], xs_local[:,1])
# compute and plot direction the robot is facing
a = xs_local[-1, 0]
b = xs_local[-1, 1]
a2 = a + np.cos(xs_local[-1, 2]) * 1.0
b2 = b + np.sin(xs_local[-1, 2]) * 1.0
self.dirs.set_data(np.array([a, a2]), np.array([b, b2]))
finally: finally:
self.mutex.release() self.mutex.release()
return self.line, self.line_sim, self.dirm, self.dirs, return self.line, self.line_sim, self.dir,
def measurement_callback(self, data): def callback(self, data):
#print("data = {}".format(data))
if data.id in self.robot_ids:
r = self.robot_ids[data.id]
r.pos = (data.x, data.y) # only x and y component are important for us #self.cv_image_small = np.fliplr(self.cv_image_small) # why is this necessary?
r.euler = data.angle
#print("r.pos = {}".format(r.pos)) # marker detection
#print("r.angle = {}".format(r.euler)) #gray = cv2.cvtColor(self.cv_image, cv2.COLOR_BGR2GRAY)
# save measured position and angle for plotting
measurement = np.array([r.pos[0], r.pos[1], r.euler])
if self.tms_0 is None:
self.tms_0 = time.time()
self.xm_0 = measurement
self.mutex.acquire() #print("robot {} pos = {}".format(r.id, r.pos))
try:
self.tms = np.array([0.0]) #ret_val, self.cv_image = self.cam.read()
self.xms = measurement try:
finally: if compression:
self.mutex.release() self.cv_image = self.bridge.compressed_imgmsg_to_cv2(data)
else: else:
self.mutex.acquire() self.cv_image = self.bridge.imgmsg_to_cv2(data, "bgr8")
try: except CvBridgeError as e:
self.tms = np.vstack((self.tms, time.time() - self.tms_0)) print(e)
self.xms = np.vstack((self.xms, measurement))
finally:
self.mutex.release()
def controller(self): corners, ids, rejectedImgPoints = aruco.detectMarkers(self.cv_image, self.aruco_dict, parameters=self.parameters)
print("starting control") marker_found = len(corners) > 0
if marker_found:
markers = zip(corners, ids)
#print("found!")
# filter markers with unknown ids
#print("detected markers = {}".format(markers))
markers_filtered = list(filter(lambda x: x[1] in self.robot_ids, markers))
#print("filtered markers = {}".format(markers_filtered))
if len(markers_filtered) > 0:
filtered_corners, filtered_ids = zip(*markers_filtered)
#print("filtered corners = {}".format(filtered_corners[0]))
rvec, tvec, _ = aruco.estimatePoseSingleMarkers(filtered_corners, 0.1, self.camera_matrix,
self.dist_matrix)
aruco.drawDetectedMarkers(self.cv_image, filtered_corners)
for i in range(len(filtered_corners)):
aruco.drawAxis(self.cv_image, self.camera_matrix, self.dist_matrix, rvec[i], tvec[i],
0.1)
for r in self.robots:
if r.id == filtered_ids[i]:
r.pos = tvec[i][0] # only x and y component are important for us
r.orient = rvec[i][0]
r.rot_mat, r.jacobian = cv2.Rodrigues(r.orient)
r.euler = rotationMatrixToEulerAngles(r.rot_mat)
# save measured position and angle for plotting
measurement = np.array([-r.pos[0], -r.pos[1], r.euler[2] + np.pi/4.0])
if self.xm_0 is None:
self.tms_0 = time.time()
self.xm_0 = deepcopy(measurement)
self.xm_0[2] = 0.0
self.tms = np.array([self.tms_0])
self.xms = measurement - self.xm_0
else:
self.mutex.acquire()
try:
self.tms = np.vstack((self.tms, time.time() - self.tms_0))
self.xms = np.vstack((self.xms, measurement - self.xm_0))
if len(self.tms) == 50:
self.alpha_0 = np.mean(self.xms[:,2])
finally:
self.mutex.release()
def show_display(self):
while self.alpha_0 is None:
pass
self.x0[2] = self.alpha_0
print("alpha_0 = {}".format(self.alpha_0))
print("show_display")
while True: while True:
#self.capture()
keyboard_control = False # show ros camera image on the pygame screen
if self.cv_image is not None:
image = cv2.resize(self.cv_image,(screenwidth,screenheight))
frame = cv2.cvtColor(self.cv_image, cv2.COLOR_BGR2RGB)
frame = np.rot90(frame)
frame = pygame.surfarray.make_surface(frame)
screen.blit(frame, (0, 0))
# plot robot positions
for r in self.robots:
if r.euler is not None:
#print("r.pos = {}".format(r.pos))
#print("r.euler = {}".format(r.euler[2]))
#print("drawing at {}".format(r.pos))
#pygame.draw.circle(screen, (255, 0, 0), r.pos, 10)
pass
pygame.display.update()
keyboard_control = True
keyboard_control_speed_test = False keyboard_control_speed_test = False
pid = True pid = False
if keyboard_control: # keyboard controller if keyboard_control:
events = pygame.event.get() events = pygame.event.get()
speed = 0.5 speed = 1.0
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT: if event.key == pygame.K_LEFT:
self.u1 = -speed self.u1 = speed
self.u2 = speed self.u2 = -speed
#print("turn left: ({},{})".format(u1, u2)) #print("turn left: ({},{})".format(u1, u2))
elif event.key == pygame.K_RIGHT: elif event.key == pygame.K_RIGHT:
self.u1 = speed self.u1 = -speed
self.u2 = -speed self.u2 = speed
#print("turn right: ({},{})".format(u1, u2)) #print("turn right: ({},{})".format(u1, u2))
elif event.key == pygame.K_UP: elif event.key == pygame.K_UP:
self.u1 = speed
self.u2 = speed
#print("forward: ({},{})".format(self.u1, self.u2))
elif event.key == pygame.K_DOWN:
self.u1 = -speed self.u1 = -speed
self.u2 = -speed self.u2 = -speed
print("forward: ({},{})".format(self.u1, self.u2))
elif event.key == pygame.K_DOWN:
self.u1 = speed
self.u2 = speed
#print("forward: ({},{})".format(u1, u2)) #print("forward: ({},{})".format(u1, u2))
self.rc_socket.send('({},{})\n'.format(self.u1, self.u2)) self.rc_socket.send('({},{})\n'.format(self.u1, self.u2))
elif event.type == pygame.KEYUP: elif event.type == pygame.KEYUP:
@ -243,13 +363,6 @@ class RemoteController:
r = scipy.integrate.ode(f_ode) r = scipy.integrate.ode(f_ode)
r.set_f_params(np.array([self.u1 * 13.32, self.u2 * 13.32])) r.set_f_params(np.array([self.u1 * 13.32, self.u2 * 13.32]))
#print(self.x0)
if self.x0 is None:
if self.xm_0 is not None:
self.x0 = self.xm_0
self.xs = self.x0
else:
print("error: no measurement available to initialize simulation")
x = self.x0 x = self.x0
r.set_initial_value(x, self.t) r.set_initial_value(x, self.t)
xnew = r.integrate(r.t + dt) xnew = r.integrate(r.t + dt)
@ -259,10 +372,30 @@ class RemoteController:
self.mutex.acquire() self.mutex.acquire()
try: try:
self.ts = np.append(self.ts, tnew) self.ts = np.vstack((self.ts, tnew))
self.xs = np.vstack((self.xs, xnew)) self.xs = np.vstack((self.xs, xnew))
#self.ys.append(xnew[1])
#self.omegas.append(xnew[2])
finally: finally:
self.mutex.release() self.mutex.release()
# for r in self.robots:
# if r.euler is not None:
# if self.alpha_0 is not None:
# self.alphas.append(r.euler[2]-self.alpha_0)
# else:
# self.alpha_0 = r.euler[2]
# self.alphas.append(0.0)
# if r.pos is not None:
# if self.pos_0 is not None:
# self.possx.append(r.pos[0] - self.pos_0[0])
# self.possy.append(r.pos[1] - self.pos_0[1])
# else:
# self.pos_0 = r.pos[0:2]
# self.possx.append(0.0)
# self.possy.append(0.0)
#print("(t,x,u) = ({},{},{})".format(tnew,xnew,[self.u1, self.u2]))
#time.sleep(0.1)
elif keyboard_control_speed_test: elif keyboard_control_speed_test:
@ -283,8 +416,6 @@ class RemoteController:
self.rc_socket.send('({},{})\n'.format(u1, u2)) self.rc_socket.send('({},{})\n'.format(u1, u2))
elif pid: elif pid:
# pid controller
events = pygame.event.get() events = pygame.event.get()
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
@ -300,35 +431,57 @@ class RemoteController:
self.controlling = False self.controlling = False
self.rc_socket.send('({},{})\n'.format(0, 0)) self.rc_socket.send('({},{})\n'.format(0, 0))
dt = 0.05 dt = 0.1
#i = 0.0 # 0.001
if self.controlling: if self.controlling:
# test: turn robot such that angle is zero # test: turn robot such that angle is zero
for r in self.robots: for r in self.robots:
if r.euler is not None: if r.euler is not None:
self.k = self.k + 1 self.k = self.k + 1
alpha = r.euler alpha = r.euler[2]
self.alphas.append(alpha) self.alphas.append(alpha)
# compute error
e = alpha - 0 e = alpha - 0
# compute integral of error (approximately)
self.inc += e * dt
# PID
p = self.pp * e p = self.pp * e
i = self.ii * self.inc self.inc += e * dt
d = 0.0 d = 0.0
# compute controls for robot from PID u1 = p + self.ii * self.inc + d
u1 = p + i + d u2 = - p - self.ii * self.inc - d
u2 = - p - i - d
print("alpha = {}, u = ({}, {})".format(alpha, u1, u2)) print("alpha = {}, u = ({}, {})".format(alpha, u1, u2))
self.rc_socket.send('({},{})\n'.format(u1, u2)) self.rc_socket.send('({},{})\n'.format(u1, u2))
time.sleep(dt) time.sleep(dt)
# elif self.k == kmax:
# u1 = u2 = 0.0
# self.rc_socket.send('({},{})\n'.format(u1, u2))
# self.k = self.k + 1
#
# plt.plot(np.array(self.alphas))
# plt.show()
pass
def calibration_sequence(self):
speed = 1.0
u1 = speed
u2 = speed
self.rc_socket.send('({},{})\n'.format(u1, u2))
time.sleep(4.0)
u1 = u2 = 0
self.rc_socket.send('({},{})\n'.format(u1, u2))
self.rc_socket.send('\n')
# test:
# -> 1.6 m in 4 seconds
# angular velocity: angle/second
# 1.6 / (2 * pi * 0.03)
# l = 1.6
# r = 0.03
# t = 4.0
# -> number of rotations n = l / (2 * pi * r)
# -> angular velocity = 2 * pi * n / t = l / (r * t)
# result: maximum angular velocity: omega_max = 13.32 rad/sec
def main(args): def main(args):
rospy.init_node('controller_node', anonymous=True) rospy.init_node('controller_node', anonymous=True)
@ -336,14 +489,25 @@ def main(args):
rc = RemoteController() rc = RemoteController()
pygame.init() pygame.init()
pygame.display.set_mode((640, 480))
screenheight = 480 #p = multiprocessing.Process(target=rc.show_display)
screenwidth = 640 threading.Thread(target=rc.show_display).start()
screen = pygame.display.set_mode([screenwidth, screenheight]) #p.start()
plt.ion()
plt.pause(0.01)
#p.join()
pass
threading.Thread(target=rc.controller).start() #rc.show_display()
#rc.calibration_sequence()
#game.loop()
rc.ani() # try:
# rospy.spin()
# except KeyboardInterrupt:
# print("Shutting down")
# cv2.destroyAllWindows()
if __name__ == '__main__': if __name__ == '__main__':