Compare commits

..

2 Commits

2 changed files with 134 additions and 296 deletions

View File

@ -6,8 +6,10 @@ pygame.display.set_mode((640, 480))
rc_socket = socket.socket() rc_socket = socket.socket()
try: try:
rc_socket.connect(('192.168.4.1', 1234)) # connect to robot #rc_socket.connect(('192.168.4.1', 1234)) # connect to robot
#rc_socket.connect(('192.168.1.101', 1234)) # connect to robot rc_socket.connect(('192.168.1.101', 1234)) # connect to robot
#rc_socket.connect(('192.168.1.102', 1234)) # connect to robot
#rc_socket.connect(('192.168.1.103', 1234)) # connect to robot
except socket.error: except socket.error:
print("could not connect to socket") print("could not connect to socket")
@ -20,21 +22,21 @@ while True:
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT: if event.key == pygame.K_LEFT:
u1 = vmax u1 = -vmax
u2 = -vmax u2 = vmax
print("turn left: ({},{})".format(u1, u2)) print("turn left: ({},{})".format(u1, u2))
elif event.key == pygame.K_RIGHT: elif event.key == pygame.K_RIGHT:
u1 = -vmax u1 = vmax
u2 = vmax u2 = -vmax
print("turn right: ({},{})".format(u1, u2)) print("turn right: ({},{})".format(u1, u2))
elif event.key == pygame.K_UP: elif event.key == pygame.K_UP:
u1 = -vmax
u2 = -vmax
print("forward: ({},{})".format(u1, u2))
elif event.key == pygame.K_DOWN:
u1 = vmax u1 = vmax
u2 = vmax u2 = vmax
print("forward: ({},{})".format(u1, u2)) print("forward: ({},{})".format(u1, u2))
elif event.key == pygame.K_DOWN:
u1 = -vmax
u2 = -vmax
print("backward: ({},{})".format(u1, u2))
rc_socket.send('({},{})\n'.format(u1, u2)) rc_socket.send('({},{})\n'.format(u1, u2))
elif event.type == pygame.KEYUP: elif event.type == pygame.KEYUP:
print("key released, resetting: ({},{})".format(u1, u2)) print("key released, resetting: ({},{})".format(u1, u2))

View File

@ -1,14 +1,14 @@
# startup: # startup:
# roscore # roscore -> start ros
# rosparam set cv_camera/device_id 0 # rosparam set cv_camera/device_id 0 -> set appropriate camera device
# rosrun cv_camera cv_camera_node # rosrun cv_camera cv_camera_node -> start the camera
# roslaunch aruco_detect aruco_detect.launch camera:=cv_camera image:=image_raw dictionary:=16 transport:= fiducial_len:=0.1 # aruco marker detection
# python fiducial_to_2d_pos_angle.py -> compute position and angle of markers in 2d plane
import sys import sys
import rospy import rospy
import pygame import pygame
import numpy as np import numpy as np
import cv2
import cv2.aruco as aruco
import socket import socket
import scipy.integrate import scipy.integrate
@ -20,59 +20,7 @@ import matplotlib.animation as anim
import time import time
from sensor_msgs.msg import Image from marker_pos_angle.msg import id_pos_angle
from sensor_msgs.msg import CompressedImage
from cv_bridge import CvBridge, CvBridgeError
import math
pygame.init()
pygame.font.init()
#pygame.joystick.init()
myfont = pygame.font.SysFont('Comic Sans MS', 30)
pygame.display.set_caption("ROS camera stream on Pygame")
screenheight = 1024
screenwidth = 1280 #4*screenheight//3
screen = pygame.display.set_mode([screenwidth, screenheight])
red = (255, 0, 0)
teal = (0, 255, 255)
# ros setup
camera_stream = "/cv_camera/image_raw"
#camera_stream = "/image_raw"
compression = False
# taken from https://www.learnopencv.com/rotation-matrix-to-euler-angles/
# Checks if a matrix is a valid rotation matrix.
def isRotationMatrix(R):
Rt = np.transpose(R)
shouldBeIdentity = np.dot(Rt, R)
I = np.identity(3, dtype=R.dtype)
n = np.linalg.norm(I - shouldBeIdentity)
return n < 1e-6
# Calculates rotation matrix to euler angles
# The result is the same as MATLAB except the order
# of the euler angles ( x and z are swapped ).
def rotationMatrixToEulerAngles(R):
assert (isRotationMatrix(R))
sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0])
singular = sy < 1e-6
if not singular:
x = math.atan2(R[2, 1], R[2, 2])
y = math.atan2(-R[2, 0], sy)
z = math.atan2(R[1, 0], R[0, 0])
else:
x = math.atan2(-R[1, 2], R[1, 1])
y = math.atan2(-R[2, 0], sy)
z = 0
return np.array([x, y, z])
class Robot: class Robot:
def __init__(self, id, ip=None): def __init__(self, id, ip=None):
@ -110,37 +58,40 @@ def f_ode(t, x, u):
class RemoteController: class RemoteController:
def __init__(self): def __init__(self):
#self.cam = cv2.VideoCapture(1)
#self.image_pub = rospy.Publisher("pygame_image", Image) self.robots = [Robot(3)]
self.bridge = CvBridge() self.robot_ids = {}
for r in self.robots:
#self.cv_image = np.zeros((1, 1, 3), np.uint8) self.robot_ids[r.id] = r
self.cv_image = None
self.robots = [Robot(2), Robot(6), Robot(7), Robot(8), Robot(9)]
self.robot_ids = [r.id for r in self.robots]
screen.fill([0, 0, 0])
cv_file = cv2.FileStorage("test.yaml", cv2.FILE_STORAGE_READ)
self.camera_matrix = cv_file.getNode("camera_matrix").mat()
self.dist_matrix = cv_file.getNode("dist_coeff").mat()
self.aruco_dict = aruco.Dictionary_get(aruco.DICT_ARUCO_ORIGINAL)
self.parameters = aruco.DetectorParameters_create()
# connect to robot
self.rc_socket = socket.socket() self.rc_socket = socket.socket()
try: try:
pass pass
self.rc_socket.connect(('192.168.4.1', 1234)) # connect to robot self.rc_socket.connect(('192.168.1.101', 1234)) # connect to robot
except socket.error: except socket.error:
print("could not connect to socket") print("could not connect to socket")
self.t = time.time()
# variables for simulated state
self.x0 = None
self.ts = np.array([])
self.xs = []
# variables for measurements
self.tms_0 = None
self.xm_0 = None
self.tms = None
self.xms = None
self.mutex = threading.Lock() self.mutex = threading.Lock()
marker_sub = rospy.Subscriber("/marker_id_pos_angle", id_pos_angle, self.measurement_callback)
# pid parameters
self.k = 0 self.k = 0
self.ii = 0.1 self.ii = 0.1
self.pp = 0.4 self.pp = 0.4
@ -155,51 +106,31 @@ class RemoteController:
self.u1 = 0.0 self.u1 = 0.0
self.u2 = 0.0 self.u2 = 0.0
self.t = time.time() # animation
self.x0 = np.zeros(3)
self.ts = np.array([0.0])
self.xs = np.array([[0.0, 0.0, 0.0]])
self.ys = []
self.omegas = []
self.tms_0 = None
self.tms = None #np.array([0.0])
self.xm_0 = None
self.xms = None #np.array([[0.0, 0.0, 0.0]])
self.alpha_0 = None
self.alphas = []
self.pos_0 = None
self.possx = []
self.possy = []
if compression:
self.image_sub = rospy.Subscriber(camera_stream + "/compressed", CompressedImage, self.callback)
else:
self.image_sub = rospy.Subscriber(camera_stream, Image, self.callback)
self.fig = plt.figure() self.fig = plt.figure()
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
self.ax = self.fig.add_subplot(1,1,1) self.ax = self.fig.add_subplot(1,1,1)
self.xdata, self.ydata = [], [] self.xdata, self.ydata = [], []
self.line, = self.ax.plot([],[]) self.line, = self.ax.plot([],[])
self.line_sim, = self.ax.plot([], []) self.line_sim, = self.ax.plot([], [])
self.dir, = self.ax.plot([], []) self.dirm, = self.ax.plot([], [])
self.dirs, = self.ax.plot([], [])
plt.xlabel('x-position') plt.xlabel('x-position')
plt.ylabel('y-position') plt.ylabel('y-position')
def ani(self):
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
plt.ion()
plt.show(block=True)
def ani_init(self): def ani_init(self):
self.ax.set_xlim(-2, 2) self.ax.set_xlim(-2, 2)
self.ax.set_ylim(-2, 2) self.ax.set_ylim(-2, 2)
self.ax.set_aspect('equal', adjustable='box') self.ax.set_aspect('equal', adjustable='box')
return self.line, self.line_sim, self.dir, return self.line, self.line_sim, self.dirm, self.dirs,
def ani_update(self, frame): def ani_update(self, frame):
#print("plotting")
self.mutex.acquire() self.mutex.acquire()
try: try:
# copy data for plot from global arrays # copy data for plot from global arrays
@ -207,149 +138,98 @@ class RemoteController:
tm_local = deepcopy(self.tms) tm_local = deepcopy(self.tms)
xm_local = deepcopy(self.xms) xm_local = deepcopy(self.xms)
#print(len(tm_local))
if len(tm_local) > 0: if len(tm_local) > 0:
# plot path of the robot
self.line.set_data(xm_local[:,0], xm_local[:,1]) self.line.set_data(xm_local[:,0], xm_local[:,1])
# compute and plot direction the robot is facing
a = xm_local[-1, 0] a = xm_local[-1, 0]
b = xm_local[-1, 0] b = xm_local[-1, 1]
a2 = a + np.cos(xm_local[-1, 2]) * 1.0 a2 = a + np.cos(xm_local[-1, 2]) * 1.0
b2 = b + np.sin(xm_local[-1, 2]) * 1.0 b2 = b + np.sin(xm_local[-1, 2]) * 1.0
self.dir.set_data(np.array([a, a2]), np.array([b, b2])) self.dirm.set_data(np.array([a, a2]), np.array([b, b2]))
ts_local = deepcopy(self.ts) ts_local = deepcopy(self.ts)
xs_local = deepcopy(self.xs) xs_local = deepcopy(self.xs)
if len(ts_local) > 0: if len(ts_local) > 0:
# plot simulated path of the robot
self.line_sim.set_data(xs_local[:,0], xs_local[:,1]) self.line_sim.set_data(xs_local[:,0], xs_local[:,1])
# compute and plot direction the robot is facing
a = xs_local[-1, 0]
b = xs_local[-1, 1]
a2 = a + np.cos(xs_local[-1, 2]) * 1.0
b2 = b + np.sin(xs_local[-1, 2]) * 1.0
self.dirs.set_data(np.array([a, a2]), np.array([b, b2]))
finally: finally:
self.mutex.release() self.mutex.release()
return self.line, self.line_sim, self.dir, return self.line, self.line_sim, self.dirm, self.dirs,
def callback(self, data): def measurement_callback(self, data):
#print("data = {}".format(data))
if data.id in self.robot_ids:
r = self.robot_ids[data.id]
#self.cv_image_small = np.fliplr(self.cv_image_small) # why is this necessary? r.pos = (data.x, data.y) # only x and y component are important for us
r.euler = data.angle
# marker detection #print("r.pos = {}".format(r.pos))
#gray = cv2.cvtColor(self.cv_image, cv2.COLOR_BGR2GRAY) #print("r.angle = {}".format(r.euler))
#print("robot {} pos = {}".format(r.id, r.pos))
#ret_val, self.cv_image = self.cam.read()
try:
if compression:
self.cv_image = self.bridge.compressed_imgmsg_to_cv2(data)
else:
self.cv_image = self.bridge.imgmsg_to_cv2(data, "bgr8")
except CvBridgeError as e:
print(e)
corners, ids, rejectedImgPoints = aruco.detectMarkers(self.cv_image, self.aruco_dict, parameters=self.parameters)
marker_found = len(corners) > 0
if marker_found:
markers = zip(corners, ids)
#print("found!")
# filter markers with unknown ids
#print("detected markers = {}".format(markers))
markers_filtered = list(filter(lambda x: x[1] in self.robot_ids, markers))
#print("filtered markers = {}".format(markers_filtered))
if len(markers_filtered) > 0:
filtered_corners, filtered_ids = zip(*markers_filtered)
#print("filtered corners = {}".format(filtered_corners[0]))
rvec, tvec, _ = aruco.estimatePoseSingleMarkers(filtered_corners, 0.1, self.camera_matrix,
self.dist_matrix)
aruco.drawDetectedMarkers(self.cv_image, filtered_corners)
for i in range(len(filtered_corners)):
aruco.drawAxis(self.cv_image, self.camera_matrix, self.dist_matrix, rvec[i], tvec[i],
0.1)
for r in self.robots:
if r.id == filtered_ids[i]:
r.pos = tvec[i][0] # only x and y component are important for us
r.orient = rvec[i][0]
r.rot_mat, r.jacobian = cv2.Rodrigues(r.orient)
r.euler = rotationMatrixToEulerAngles(r.rot_mat)
# save measured position and angle for plotting # save measured position and angle for plotting
measurement = np.array([-r.pos[0], -r.pos[1], r.euler[2] + np.pi/4.0]) measurement = np.array([r.pos[0], r.pos[1], r.euler])
if self.xm_0 is None: if self.tms_0 is None:
self.tms_0 = time.time() self.tms_0 = time.time()
self.xm_0 = deepcopy(measurement) self.xm_0 = measurement
self.xm_0[2] = 0.0
self.tms = np.array([self.tms_0]) self.mutex.acquire()
self.xms = measurement - self.xm_0 try:
self.tms = np.array([0.0])
self.xms = measurement
finally:
self.mutex.release()
else: else:
self.mutex.acquire() self.mutex.acquire()
try: try:
self.tms = np.vstack((self.tms, time.time() - self.tms_0)) self.tms = np.vstack((self.tms, time.time() - self.tms_0))
self.xms = np.vstack((self.xms, measurement - self.xm_0)) self.xms = np.vstack((self.xms, measurement))
if len(self.tms) == 50:
self.alpha_0 = np.mean(self.xms[:,2])
finally: finally:
self.mutex.release() self.mutex.release()
def show_display(self): def controller(self):
while self.alpha_0 is None: print("starting control")
pass
self.x0[2] = self.alpha_0
print("alpha_0 = {}".format(self.alpha_0))
print("show_display")
while True: while True:
#self.capture()
# show ros camera image on the pygame screen keyboard_control = False
if self.cv_image is not None:
image = cv2.resize(self.cv_image,(screenwidth,screenheight))
frame = cv2.cvtColor(self.cv_image, cv2.COLOR_BGR2RGB)
frame = np.rot90(frame)
frame = pygame.surfarray.make_surface(frame)
screen.blit(frame, (0, 0))
# plot robot positions
for r in self.robots:
if r.euler is not None:
#print("r.pos = {}".format(r.pos))
#print("r.euler = {}".format(r.euler[2]))
#print("drawing at {}".format(r.pos))
#pygame.draw.circle(screen, (255, 0, 0), r.pos, 10)
pass
pygame.display.update()
keyboard_control = True
keyboard_control_speed_test = False keyboard_control_speed_test = False
pid = False pid = True
if keyboard_control: if keyboard_control: # keyboard controller
events = pygame.event.get() events = pygame.event.get()
speed = 1.0 speed = 0.5
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT: if event.key == pygame.K_LEFT:
self.u1 = speed self.u1 = -speed
self.u2 = -speed self.u2 = speed
#print("turn left: ({},{})".format(u1, u2)) #print("turn left: ({},{})".format(u1, u2))
elif event.key == pygame.K_RIGHT: elif event.key == pygame.K_RIGHT:
self.u1 = -speed self.u1 = speed
self.u2 = speed self.u2 = -speed
#print("turn right: ({},{})".format(u1, u2)) #print("turn right: ({},{})".format(u1, u2))
elif event.key == pygame.K_UP: elif event.key == pygame.K_UP:
self.u1 = -speed
self.u2 = -speed
print("forward: ({},{})".format(self.u1, self.u2))
elif event.key == pygame.K_DOWN:
self.u1 = speed self.u1 = speed
self.u2 = speed self.u2 = speed
#print("forward: ({},{})".format(self.u1, self.u2))
elif event.key == pygame.K_DOWN:
self.u1 = -speed
self.u2 = -speed
#print("forward: ({},{})".format(u1, u2)) #print("forward: ({},{})".format(u1, u2))
self.rc_socket.send('({},{})\n'.format(self.u1, self.u2)) self.rc_socket.send('({},{})\n'.format(self.u1, self.u2))
elif event.type == pygame.KEYUP: elif event.type == pygame.KEYUP:
@ -363,6 +243,13 @@ class RemoteController:
r = scipy.integrate.ode(f_ode) r = scipy.integrate.ode(f_ode)
r.set_f_params(np.array([self.u1 * 13.32, self.u2 * 13.32])) r.set_f_params(np.array([self.u1 * 13.32, self.u2 * 13.32]))
#print(self.x0)
if self.x0 is None:
if self.xm_0 is not None:
self.x0 = self.xm_0
self.xs = self.x0
else:
print("error: no measurement available to initialize simulation")
x = self.x0 x = self.x0
r.set_initial_value(x, self.t) r.set_initial_value(x, self.t)
xnew = r.integrate(r.t + dt) xnew = r.integrate(r.t + dt)
@ -372,30 +259,10 @@ class RemoteController:
self.mutex.acquire() self.mutex.acquire()
try: try:
self.ts = np.vstack((self.ts, tnew)) self.ts = np.append(self.ts, tnew)
self.xs = np.vstack((self.xs, xnew)) self.xs = np.vstack((self.xs, xnew))
#self.ys.append(xnew[1])
#self.omegas.append(xnew[2])
finally: finally:
self.mutex.release() self.mutex.release()
# for r in self.robots:
# if r.euler is not None:
# if self.alpha_0 is not None:
# self.alphas.append(r.euler[2]-self.alpha_0)
# else:
# self.alpha_0 = r.euler[2]
# self.alphas.append(0.0)
# if r.pos is not None:
# if self.pos_0 is not None:
# self.possx.append(r.pos[0] - self.pos_0[0])
# self.possy.append(r.pos[1] - self.pos_0[1])
# else:
# self.pos_0 = r.pos[0:2]
# self.possx.append(0.0)
# self.possy.append(0.0)
#print("(t,x,u) = ({},{},{})".format(tnew,xnew,[self.u1, self.u2]))
#time.sleep(0.1)
elif keyboard_control_speed_test: elif keyboard_control_speed_test:
@ -416,6 +283,8 @@ class RemoteController:
self.rc_socket.send('({},{})\n'.format(u1, u2)) self.rc_socket.send('({},{})\n'.format(u1, u2))
elif pid: elif pid:
# pid controller
events = pygame.event.get() events = pygame.event.get()
for event in events: for event in events:
if event.type == pygame.KEYDOWN: if event.type == pygame.KEYDOWN:
@ -431,57 +300,35 @@ class RemoteController:
self.controlling = False self.controlling = False
self.rc_socket.send('({},{})\n'.format(0, 0)) self.rc_socket.send('({},{})\n'.format(0, 0))
dt = 0.1 dt = 0.05
#i = 0.0 # 0.001
if self.controlling: if self.controlling:
# test: turn robot such that angle is zero # test: turn robot such that angle is zero
for r in self.robots: for r in self.robots:
if r.euler is not None: if r.euler is not None:
self.k = self.k + 1 self.k = self.k + 1
alpha = r.euler[2] alpha = r.euler
self.alphas.append(alpha) self.alphas.append(alpha)
# compute error
e = alpha - 0 e = alpha - 0
p = self.pp * e
# compute integral of error (approximately)
self.inc += e * dt self.inc += e * dt
# PID
p = self.pp * e
i = self.ii * self.inc
d = 0.0 d = 0.0
u1 = p + self.ii * self.inc + d # compute controls for robot from PID
u2 = - p - self.ii * self.inc - d u1 = p + i + d
u2 = - p - i - d
print("alpha = {}, u = ({}, {})".format(alpha, u1, u2)) print("alpha = {}, u = ({}, {})".format(alpha, u1, u2))
self.rc_socket.send('({},{})\n'.format(u1, u2)) self.rc_socket.send('({},{})\n'.format(u1, u2))
time.sleep(dt) time.sleep(dt)
# elif self.k == kmax:
# u1 = u2 = 0.0
# self.rc_socket.send('({},{})\n'.format(u1, u2))
# self.k = self.k + 1
#
# plt.plot(np.array(self.alphas))
# plt.show()
pass
def calibration_sequence(self):
speed = 1.0
u1 = speed
u2 = speed
self.rc_socket.send('({},{})\n'.format(u1, u2))
time.sleep(4.0)
u1 = u2 = 0
self.rc_socket.send('({},{})\n'.format(u1, u2))
self.rc_socket.send('\n')
# test:
# -> 1.6 m in 4 seconds
# angular velocity: angle/second
# 1.6 / (2 * pi * 0.03)
# l = 1.6
# r = 0.03
# t = 4.0
# -> number of rotations n = l / (2 * pi * r)
# -> angular velocity = 2 * pi * n / t = l / (r * t)
# result: maximum angular velocity: omega_max = 13.32 rad/sec
def main(args): def main(args):
rospy.init_node('controller_node', anonymous=True) rospy.init_node('controller_node', anonymous=True)
@ -489,25 +336,14 @@ def main(args):
rc = RemoteController() rc = RemoteController()
pygame.init() pygame.init()
pygame.display.set_mode((640, 480))
#p = multiprocessing.Process(target=rc.show_display) screenheight = 480
threading.Thread(target=rc.show_display).start() screenwidth = 640
#p.start() screen = pygame.display.set_mode([screenwidth, screenheight])
plt.ion()
plt.pause(0.01)
#p.join()
pass
#rc.show_display() threading.Thread(target=rc.controller).start()
#rc.calibration_sequence()
#game.loop()
# try: rc.ani()
# rospy.spin()
# except KeyboardInterrupt:
# print("Shutting down")
# cv2.destroyAllWindows()
if __name__ == '__main__': if __name__ == '__main__':