# startup: # roscore # rosparam set cv_camera/device_id 0 # rosrun cv_camera cv_camera_node import sys import rospy import pygame import numpy as np import cv2 import cv2.aruco as aruco import socket import scipy.integrate import threading from copy import deepcopy import matplotlib.pyplot as plt import matplotlib.animation as anim import time from sensor_msgs.msg import Image from sensor_msgs.msg import CompressedImage from cv_bridge import CvBridge, CvBridgeError import math pygame.init() pygame.font.init() #pygame.joystick.init() myfont = pygame.font.SysFont('Comic Sans MS', 30) pygame.display.set_caption("ROS camera stream on Pygame") screenheight = 1024 screenwidth = 1280 #4*screenheight//3 screen = pygame.display.set_mode([screenwidth, screenheight]) red = (255, 0, 0) teal = (0, 255, 255) # ros setup camera_stream = "/cv_camera/image_raw" #camera_stream = "/image_raw" compression = False # taken from https://www.learnopencv.com/rotation-matrix-to-euler-angles/ # Checks if a matrix is a valid rotation matrix. def isRotationMatrix(R): Rt = np.transpose(R) shouldBeIdentity = np.dot(Rt, R) I = np.identity(3, dtype=R.dtype) n = np.linalg.norm(I - shouldBeIdentity) return n < 1e-6 # Calculates rotation matrix to euler angles # The result is the same as MATLAB except the order # of the euler angles ( x and z are swapped ). def rotationMatrixToEulerAngles(R): assert (isRotationMatrix(R)) sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0]) singular = sy < 1e-6 if not singular: x = math.atan2(R[2, 1], R[2, 2]) y = math.atan2(-R[2, 0], sy) z = math.atan2(R[1, 0], R[0, 0]) else: x = math.atan2(-R[1, 2], R[1, 1]) y = math.atan2(-R[2, 0], sy) z = 0 return np.array([x, y, z]) class Robot: def __init__(self, id, ip=None): self.pos = None self.orient = None self.id = id self.pos = None self.euler = None self.ip = ip def f_ode(t, x, u): # dynamical model of the two-wheeled robot # TODO: find exact values for these parameters r = 0.03 R = 0.05 d = 0.02 theta = x[2] omega_r = u[0] omega_l = u[1] dx = np.zeros(3) dx[0] = (r/2.0 * np.cos(theta) - r*d/(2.0*R) * np.sin(theta)) * omega_r \ + (r/2.0 * np.cos(theta) + r*d/(2.0 * R) * np.sin(theta)) * omega_l dx[1] = (r/2.0 * np.sin(theta) + r*d/(2.0*R) * np.cos(theta)) * omega_r \ + (r/2 * np.sin(theta) - r*d/(2.0*R) * np.cos(theta)) * omega_l dx[2] = -r/(2.0*R) * (omega_r - omega_l) return dx class RemoteController: def __init__(self): #self.cam = cv2.VideoCapture(1) #self.image_pub = rospy.Publisher("pygame_image", Image) self.bridge = CvBridge() #self.cv_image = np.zeros((1, 1, 3), np.uint8) self.cv_image = None self.robots = [Robot(2), Robot(6), Robot(7), Robot(8), Robot(9)] self.robot_ids = [r.id for r in self.robots] screen.fill([0, 0, 0]) cv_file = cv2.FileStorage("test.yaml", cv2.FILE_STORAGE_READ) self.camera_matrix = cv_file.getNode("camera_matrix").mat() self.dist_matrix = cv_file.getNode("dist_coeff").mat() self.aruco_dict = aruco.Dictionary_get(aruco.DICT_ARUCO_ORIGINAL) self.parameters = aruco.DetectorParameters_create() self.rc_socket = socket.socket() try: pass self.rc_socket.connect(('', 1234)) # connect to robot except socket.error: print("could not connect to socket") self.mutex = threading.Lock() self.k = 0 self.ii = 0.1 self.pp = 0.4 self.inc = 0.0 self.alphas = [] self.speed = 1.0 self.controlling = False self.u1 = 0.0 self.u2 = 0.0 self.t = time.time() self.x0 = np.zeros(3) self.ts = np.array([0.0]) self.xs = np.array([[0.0, 0.0, 0.0]]) self.ys = [] self.omegas = [] self.tms_0 = None self.tms = None #np.array([0.0]) self.xm_0 = None self.xms = None #np.array([[0.0, 0.0, 0.0]]) self.alpha_0 = None self.alphas = [] self.pos_0 = None self.possx = [] self.possy = [] if compression: self.image_sub = rospy.Subscriber(camera_stream + "/compressed", CompressedImage, self.callback) else: self.image_sub = rospy.Subscriber(camera_stream, Image, self.callback) self.fig = plt.figure() self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True) self.ax = self.fig.add_subplot(1,1,1) self.xdata, self.ydata = [], [] self.line, = self.ax.plot([],[]) self.line_sim, = self.ax.plot([], []) self.dir, = self.ax.plot([], []) plt.xlabel('x-position') plt.ylabel('y-position') def ani_init(self): self.ax.set_xlim(-2, 2) self.ax.set_ylim(-2, 2) self.ax.set_aspect('equal', adjustable='box') return self.line, self.line_sim, self.dir, def ani_update(self, frame): self.mutex.acquire() try: # copy data for plot from global arrays if self.tms is not None: tm_local = deepcopy(self.tms) xm_local = deepcopy(self.xms) #print(len(tm_local)) if len(tm_local) > 0: self.line.set_data(xm_local[:,0], xm_local[:,1]) a = xm_local[-1, 0] b = xm_local[-1, 0] a2 = a + np.cos(xm_local[-1, 2]) * 1.0 b2 = b + np.sin(xm_local[-1, 2]) * 1.0 self.dir.set_data(np.array([a, a2]), np.array([b, b2])) ts_local = deepcopy(self.ts) xs_local = deepcopy(self.xs) if len(ts_local) > 0: self.line_sim.set_data(xs_local[:,0], xs_local[:,1]) finally: self.mutex.release() return self.line, self.line_sim, self.dir, def callback(self, data): #self.cv_image_small = np.fliplr(self.cv_image_small) # why is this necessary? # marker detection #gray = cv2.cvtColor(self.cv_image, cv2.COLOR_BGR2GRAY) #print("robot {} pos = {}".format(r.id, r.pos)) #ret_val, self.cv_image = self.cam.read() try: if compression: self.cv_image = self.bridge.compressed_imgmsg_to_cv2(data) else: self.cv_image = self.bridge.imgmsg_to_cv2(data, "bgr8") except CvBridgeError as e: print(e) corners, ids, rejectedImgPoints = aruco.detectMarkers(self.cv_image, self.aruco_dict, parameters=self.parameters) marker_found = len(corners) > 0 if marker_found: markers = zip(corners, ids) #print("found!") # filter markers with unknown ids #print("detected markers = {}".format(markers)) markers_filtered = list(filter(lambda x: x[1] in self.robot_ids, markers)) #print("filtered markers = {}".format(markers_filtered)) if len(markers_filtered) > 0: filtered_corners, filtered_ids = zip(*markers_filtered) #print("filtered corners = {}".format(filtered_corners[0])) rvec, tvec, _ = aruco.estimatePoseSingleMarkers(filtered_corners, 0.1, self.camera_matrix, self.dist_matrix) aruco.drawDetectedMarkers(self.cv_image, filtered_corners) for i in range(len(filtered_corners)): aruco.drawAxis(self.cv_image, self.camera_matrix, self.dist_matrix, rvec[i], tvec[i], 0.1) for r in self.robots: if r.id == filtered_ids[i]: r.pos = tvec[i][0] # only x and y component are important for us r.orient = rvec[i][0] r.rot_mat, r.jacobian = cv2.Rodrigues(r.orient) r.euler = rotationMatrixToEulerAngles(r.rot_mat) # save measured position and angle for plotting measurement = np.array([-r.pos[0], -r.pos[1], r.euler[2] + np.pi/4.0]) if self.xm_0 is None: self.tms_0 = time.time() self.xm_0 = deepcopy(measurement) self.xm_0[2] = 0.0 self.tms = np.array([self.tms_0]) self.xms = measurement - self.xm_0 else: self.mutex.acquire() try: self.tms = np.vstack((self.tms, time.time() - self.tms_0)) self.xms = np.vstack((self.xms, measurement - self.xm_0)) if len(self.tms) == 50: self.alpha_0 = np.mean(self.xms[:,2]) finally: self.mutex.release() def show_display(self): while self.alpha_0 is None: pass self.x0[2] = self.alpha_0 print("alpha_0 = {}".format(self.alpha_0)) print("show_display") while True: #self.capture() # show ros camera image on the pygame screen if self.cv_image is not None: image = cv2.resize(self.cv_image,(screenwidth,screenheight)) frame = cv2.cvtColor(self.cv_image, cv2.COLOR_BGR2RGB) frame = np.rot90(frame) frame = pygame.surfarray.make_surface(frame) screen.blit(frame, (0, 0)) # plot robot positions for r in self.robots: if r.euler is not None: #print("r.pos = {}".format(r.pos)) #print("r.euler = {}".format(r.euler[2])) #print("drawing at {}".format(r.pos)) #pygame.draw.circle(screen, (255, 0, 0), r.pos, 10) pass pygame.display.update() keyboard_control = True keyboard_control_speed_test = False pid = False if keyboard_control: events = pygame.event.get() speed = 1.0 for event in events: if event.type == pygame.KEYDOWN: if event.key == pygame.K_LEFT: self.u1 = speed self.u2 = -speed #print("turn left: ({},{})".format(u1, u2)) elif event.key == pygame.K_RIGHT: self.u1 = -speed self.u2 = speed #print("turn right: ({},{})".format(u1, u2)) elif event.key == pygame.K_UP: self.u1 = -speed self.u2 = -speed print("forward: ({},{})".format(self.u1, self.u2)) elif event.key == pygame.K_DOWN: self.u1 = speed self.u2 = speed #print("forward: ({},{})".format(u1, u2)) self.rc_socket.send('({},{})\n'.format(self.u1, self.u2)) elif event.type == pygame.KEYUP: self.u1 = 0 self.u2 = 0 #print("key released, resetting: ({},{})".format(u1, u2)) self.rc_socket.send('({},{})\n'.format(self.u1, self.u2)) tnew = time.time() dt = tnew - self.t r = scipy.integrate.ode(f_ode) r.set_f_params(np.array([self.u1 * 13.32, self.u2 * 13.32])) x = self.x0 r.set_initial_value(x, self.t) xnew = r.integrate(r.t + dt) self.t = tnew self.x0 = xnew self.mutex.acquire() try: self.ts = np.vstack((self.ts, tnew)) self.xs = np.vstack((self.xs, xnew)) #self.ys.append(xnew[1]) #self.omegas.append(xnew[2]) finally: self.mutex.release() # for r in self.robots: # if r.euler is not None: # if self.alpha_0 is not None: # self.alphas.append(r.euler[2]-self.alpha_0) # else: # self.alpha_0 = r.euler[2] # self.alphas.append(0.0) # if r.pos is not None: # if self.pos_0 is not None: # self.possx.append(r.pos[0] - self.pos_0[0]) # self.possy.append(r.pos[1] - self.pos_0[1]) # else: # self.pos_0 = r.pos[0:2] # self.possx.append(0.0) # self.possy.append(0.0) #print("(t,x,u) = ({},{},{})".format(tnew,xnew,[self.u1, self.u2])) #time.sleep(0.1) elif keyboard_control_speed_test: events = pygame.event.get() for event in events: if event.type == pygame.KEYDOWN: if event.key == pygame.K_LEFT: self.speed = self.speed / np.sqrt(np.sqrt(np.sqrt(10.0))) elif event.key == pygame.K_RIGHT: self.speed = self.speed * np.sqrt(np.sqrt(np.sqrt(10.0))) elif event.key == pygame.K_UP: u1 = self.speed u2 = -self.speed elif event.key == pygame.K_DOWN: u1 = 0.0 u2 = 0.0 print("speed = {}".format(self.speed)) self.rc_socket.send('({},{})\n'.format(u1, u2)) elif pid: events = pygame.event.get() for event in events: if event.type == pygame.KEYDOWN: if event.key == pygame.K_LEFT: self.ii = self.ii / np.sqrt(np.sqrt(np.sqrt(10.0))) print("ii = {}".format(self.pp)) elif event.key == pygame.K_RIGHT: self.ii = self.ii * np.sqrt(np.sqrt(np.sqrt(10.0))) print("ii = {}".format(self.pp)) elif event.key == pygame.K_UP: self.controlling = True elif event.key == pygame.K_DOWN: self.controlling = False self.rc_socket.send('({},{})\n'.format(0, 0)) dt = 0.1 #i = 0.0 # 0.001 if self.controlling: # test: turn robot such that angle is zero for r in self.robots: if r.euler is not None: self.k = self.k + 1 alpha = r.euler[2] self.alphas.append(alpha) e = alpha - 0 p = self.pp * e self.inc += e * dt d = 0.0 u1 = p + self.ii * self.inc + d u2 = - p - self.ii * self.inc - d print("alpha = {}, u = ({}, {})".format(alpha, u1, u2)) self.rc_socket.send('({},{})\n'.format(u1, u2)) time.sleep(dt) # elif self.k == kmax: # u1 = u2 = 0.0 # self.rc_socket.send('({},{})\n'.format(u1, u2)) # self.k = self.k + 1 # # plt.plot(np.array(self.alphas)) # plt.show() pass def calibration_sequence(self): speed = 1.0 u1 = speed u2 = speed self.rc_socket.send('({},{})\n'.format(u1, u2)) time.sleep(4.0) u1 = u2 = 0 self.rc_socket.send('({},{})\n'.format(u1, u2)) self.rc_socket.send('\n') # test: # -> 1.6 m in 4 seconds # angular velocity: angle/second # 1.6 / (2 * pi * 0.03) # l = 1.6 # r = 0.03 # t = 4.0 # -> number of rotations n = l / (2 * pi * r) # -> angular velocity = 2 * pi * n / t = l / (r * t) # result: maximum angular velocity: omega_max = 13.32 rad/sec def main(args): rospy.init_node('controller_node', anonymous=True) rc = RemoteController() pygame.init() pygame.display.set_mode((640, 480)) #p = multiprocessing.Process(target=rc.show_display) threading.Thread(target=rc.show_display).start() #p.start() plt.ion() plt.pause(0.01) #p.join() pass #rc.show_display() #rc.calibration_sequence() #game.loop() # try: # rospy.spin() # except KeyboardInterrupt: # print("Shutting down") # cv2.destroyAllWindows() if __name__ == '__main__': main(sys.argv)