From 480e1f523c72307cf1db96dcc5f39588392c1b50 Mon Sep 17 00:00:00 2001 From: spirkelmann Date: Tue, 11 Jun 2019 16:26:53 +0200 Subject: [PATCH] rewrote position controller script. now animated plot works for keyboard control and pid control --- remote_control/position_controller.py | 406 ++++++++------------------ 1 file changed, 121 insertions(+), 285 deletions(-) diff --git a/remote_control/position_controller.py b/remote_control/position_controller.py index 85d709b..a426e89 100644 --- a/remote_control/position_controller.py +++ b/remote_control/position_controller.py @@ -1,14 +1,14 @@ # startup: -# roscore -# rosparam set cv_camera/device_id 0 -# rosrun cv_camera cv_camera_node +# roscore -> start ros +# rosparam set cv_camera/device_id 0 -> set appropriate camera device +# rosrun cv_camera cv_camera_node -> start the camera +# roslaunch aruco_detect aruco_detect.launch camera:=cv_camera image:=image_raw dictionary:=16 transport:= fiducial_len:=0.1 # aruco marker detection +# python fiducial_to_2d_pos_angle.py -> compute position and angle of markers in 2d plane import sys import rospy import pygame import numpy as np -import cv2 -import cv2.aruco as aruco import socket import scipy.integrate @@ -20,59 +20,7 @@ import matplotlib.animation as anim import time -from sensor_msgs.msg import Image -from sensor_msgs.msg import CompressedImage -from cv_bridge import CvBridge, CvBridgeError -import math - -pygame.init() -pygame.font.init() -#pygame.joystick.init() - -myfont = pygame.font.SysFont('Comic Sans MS', 30) -pygame.display.set_caption("ROS camera stream on Pygame") -screenheight = 1024 -screenwidth = 1280 #4*screenheight//3 -screen = pygame.display.set_mode([screenwidth, screenheight]) -red = (255, 0, 0) -teal = (0, 255, 255) - -# ros setup -camera_stream = "/cv_camera/image_raw" -#camera_stream = "/image_raw" -compression = False - - -# taken from https://www.learnopencv.com/rotation-matrix-to-euler-angles/ -# Checks if a matrix is a valid rotation matrix. -def isRotationMatrix(R): - Rt = np.transpose(R) - shouldBeIdentity = np.dot(Rt, R) - I = np.identity(3, dtype=R.dtype) - n = np.linalg.norm(I - shouldBeIdentity) - return n < 1e-6 - - -# Calculates rotation matrix to euler angles -# The result is the same as MATLAB except the order -# of the euler angles ( x and z are swapped ). -def rotationMatrixToEulerAngles(R): - assert (isRotationMatrix(R)) - - sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0]) - - singular = sy < 1e-6 - - if not singular: - x = math.atan2(R[2, 1], R[2, 2]) - y = math.atan2(-R[2, 0], sy) - z = math.atan2(R[1, 0], R[0, 0]) - else: - x = math.atan2(-R[1, 2], R[1, 1]) - y = math.atan2(-R[2, 0], sy) - z = 0 - - return np.array([x, y, z]) +from marker_pos_angle.msg import id_pos_angle class Robot: def __init__(self, id, ip=None): @@ -110,37 +58,40 @@ def f_ode(t, x, u): class RemoteController: def __init__(self): - #self.cam = cv2.VideoCapture(1) - #self.image_pub = rospy.Publisher("pygame_image", Image) + self.robots = [Robot(3)] - self.bridge = CvBridge() - - #self.cv_image = np.zeros((1, 1, 3), np.uint8) - self.cv_image = None - - self.robots = [Robot(2), Robot(6), Robot(7), Robot(8), Robot(9)] - - self.robot_ids = [r.id for r in self.robots] - - screen.fill([0, 0, 0]) - - cv_file = cv2.FileStorage("test.yaml", cv2.FILE_STORAGE_READ) - self.camera_matrix = cv_file.getNode("camera_matrix").mat() - self.dist_matrix = cv_file.getNode("dist_coeff").mat() - - self.aruco_dict = aruco.Dictionary_get(aruco.DICT_ARUCO_ORIGINAL) - self.parameters = aruco.DetectorParameters_create() + self.robot_ids = {} + for r in self.robots: + self.robot_ids[r.id] = r + # connect to robot self.rc_socket = socket.socket() try: pass - self.rc_socket.connect(('192.168.4.1', 1234)) # connect to robot + self.rc_socket.connect(('192.168.1.101', 1234)) # connect to robot except socket.error: print("could not connect to socket") + self.t = time.time() + + # variables for simulated state + self.x0 = None + self.ts = np.array([]) + self.xs = [] + + # variables for measurements + self.tms_0 = None + self.xm_0 = None + self.tms = None + self.xms = None + self.mutex = threading.Lock() + marker_sub = rospy.Subscriber("/marker_id_pos_angle", id_pos_angle, self.measurement_callback) + + + # pid parameters self.k = 0 self.ii = 0.1 self.pp = 0.4 @@ -155,51 +106,31 @@ class RemoteController: self.u1 = 0.0 self.u2 = 0.0 - self.t = time.time() - self.x0 = np.zeros(3) - - self.ts = np.array([0.0]) - self.xs = np.array([[0.0, 0.0, 0.0]]) - self.ys = [] - self.omegas = [] - - self.tms_0 = None - self.tms = None #np.array([0.0]) - self.xm_0 = None - self.xms = None #np.array([[0.0, 0.0, 0.0]]) - self.alpha_0 = None - self.alphas = [] - - self.pos_0 = None - self.possx = [] - self.possy = [] - - if compression: - self.image_sub = rospy.Subscriber(camera_stream + "/compressed", CompressedImage, self.callback) - else: - self.image_sub = rospy.Subscriber(camera_stream, Image, self.callback) - + # animation self.fig = plt.figure() - self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True) - self.ax = self.fig.add_subplot(1,1,1) self.xdata, self.ydata = [], [] self.line, = self.ax.plot([],[]) self.line_sim, = self.ax.plot([], []) - self.dir, = self.ax.plot([], []) + self.dirm, = self.ax.plot([], []) + self.dirs, = self.ax.plot([], []) plt.xlabel('x-position') plt.ylabel('y-position') - + def ani(self): + self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True) + plt.ion() + plt.show(block=True) def ani_init(self): self.ax.set_xlim(-2, 2) self.ax.set_ylim(-2, 2) self.ax.set_aspect('equal', adjustable='box') - return self.line, self.line_sim, self.dir, + return self.line, self.line_sim, self.dirm, self.dirs, def ani_update(self, frame): + #print("plotting") self.mutex.acquire() try: # copy data for plot from global arrays @@ -207,149 +138,98 @@ class RemoteController: tm_local = deepcopy(self.tms) xm_local = deepcopy(self.xms) - #print(len(tm_local)) - if len(tm_local) > 0: + # plot path of the robot self.line.set_data(xm_local[:,0], xm_local[:,1]) + # compute and plot direction the robot is facing a = xm_local[-1, 0] - b = xm_local[-1, 0] + b = xm_local[-1, 1] a2 = a + np.cos(xm_local[-1, 2]) * 1.0 b2 = b + np.sin(xm_local[-1, 2]) * 1.0 - self.dir.set_data(np.array([a, a2]), np.array([b, b2])) + self.dirm.set_data(np.array([a, a2]), np.array([b, b2])) ts_local = deepcopy(self.ts) xs_local = deepcopy(self.xs) if len(ts_local) > 0: + # plot simulated path of the robot self.line_sim.set_data(xs_local[:,0], xs_local[:,1]) + + # compute and plot direction the robot is facing + a = xs_local[-1, 0] + b = xs_local[-1, 1] + + a2 = a + np.cos(xs_local[-1, 2]) * 1.0 + b2 = b + np.sin(xs_local[-1, 2]) * 1.0 + + self.dirs.set_data(np.array([a, a2]), np.array([b, b2])) finally: self.mutex.release() - return self.line, self.line_sim, self.dir, + return self.line, self.line_sim, self.dirm, self.dirs, - def callback(self, data): + def measurement_callback(self, data): + #print("data = {}".format(data)) + if data.id in self.robot_ids: + r = self.robot_ids[data.id] - #self.cv_image_small = np.fliplr(self.cv_image_small) # why is this necessary? + r.pos = (data.x, data.y) # only x and y component are important for us + r.euler = data.angle - # marker detection - #gray = cv2.cvtColor(self.cv_image, cv2.COLOR_BGR2GRAY) + #print("r.pos = {}".format(r.pos)) + #print("r.angle = {}".format(r.euler)) + # save measured position and angle for plotting + measurement = np.array([r.pos[0], r.pos[1], r.euler]) + if self.tms_0 is None: + self.tms_0 = time.time() + self.xm_0 = measurement - #print("robot {} pos = {}".format(r.id, r.pos)) - - #ret_val, self.cv_image = self.cam.read() - try: - if compression: - self.cv_image = self.bridge.compressed_imgmsg_to_cv2(data) + self.mutex.acquire() + try: + self.tms = np.array([0.0]) + self.xms = measurement + finally: + self.mutex.release() else: - self.cv_image = self.bridge.imgmsg_to_cv2(data, "bgr8") - except CvBridgeError as e: - print(e) + self.mutex.acquire() + try: + self.tms = np.vstack((self.tms, time.time() - self.tms_0)) + self.xms = np.vstack((self.xms, measurement)) + finally: + self.mutex.release() - corners, ids, rejectedImgPoints = aruco.detectMarkers(self.cv_image, self.aruco_dict, parameters=self.parameters) - marker_found = len(corners) > 0 - - if marker_found: - markers = zip(corners, ids) - #print("found!") - # filter markers with unknown ids - #print("detected markers = {}".format(markers)) - markers_filtered = list(filter(lambda x: x[1] in self.robot_ids, markers)) - #print("filtered markers = {}".format(markers_filtered)) - if len(markers_filtered) > 0: - filtered_corners, filtered_ids = zip(*markers_filtered) - #print("filtered corners = {}".format(filtered_corners[0])) - - rvec, tvec, _ = aruco.estimatePoseSingleMarkers(filtered_corners, 0.1, self.camera_matrix, - self.dist_matrix) - - aruco.drawDetectedMarkers(self.cv_image, filtered_corners) - for i in range(len(filtered_corners)): - aruco.drawAxis(self.cv_image, self.camera_matrix, self.dist_matrix, rvec[i], tvec[i], - 0.1) - - for r in self.robots: - if r.id == filtered_ids[i]: - r.pos = tvec[i][0] # only x and y component are important for us - r.orient = rvec[i][0] - r.rot_mat, r.jacobian = cv2.Rodrigues(r.orient) - r.euler = rotationMatrixToEulerAngles(r.rot_mat) - - # save measured position and angle for plotting - measurement = np.array([-r.pos[0], -r.pos[1], r.euler[2] + np.pi/4.0]) - if self.xm_0 is None: - self.tms_0 = time.time() - self.xm_0 = deepcopy(measurement) - self.xm_0[2] = 0.0 - - self.tms = np.array([self.tms_0]) - self.xms = measurement - self.xm_0 - else: - self.mutex.acquire() - try: - self.tms = np.vstack((self.tms, time.time() - self.tms_0)) - self.xms = np.vstack((self.xms, measurement - self.xm_0)) - - if len(self.tms) == 50: - self.alpha_0 = np.mean(self.xms[:,2]) - finally: - self.mutex.release() - - def show_display(self): - while self.alpha_0 is None: - pass - self.x0[2] = self.alpha_0 - print("alpha_0 = {}".format(self.alpha_0)) - print("show_display") + def controller(self): + print("starting control") while True: - #self.capture() - # show ros camera image on the pygame screen - if self.cv_image is not None: - image = cv2.resize(self.cv_image,(screenwidth,screenheight)) - frame = cv2.cvtColor(self.cv_image, cv2.COLOR_BGR2RGB) - frame = np.rot90(frame) - frame = pygame.surfarray.make_surface(frame) - screen.blit(frame, (0, 0)) - - # plot robot positions - for r in self.robots: - if r.euler is not None: - #print("r.pos = {}".format(r.pos)) - #print("r.euler = {}".format(r.euler[2])) - #print("drawing at {}".format(r.pos)) - #pygame.draw.circle(screen, (255, 0, 0), r.pos, 10) - pass - - pygame.display.update() - - keyboard_control = True + keyboard_control = False keyboard_control_speed_test = False - pid = False + pid = True - if keyboard_control: + if keyboard_control: # keyboard controller events = pygame.event.get() - speed = 1.0 + speed = 0.5 for event in events: if event.type == pygame.KEYDOWN: if event.key == pygame.K_LEFT: - self.u1 = speed - self.u2 = -speed + self.u1 = -speed + self.u2 = speed #print("turn left: ({},{})".format(u1, u2)) elif event.key == pygame.K_RIGHT: - self.u1 = -speed - self.u2 = speed + self.u1 = speed + self.u2 = -speed #print("turn right: ({},{})".format(u1, u2)) elif event.key == pygame.K_UP: - self.u1 = -speed - self.u2 = -speed - print("forward: ({},{})".format(self.u1, self.u2)) - elif event.key == pygame.K_DOWN: self.u1 = speed self.u2 = speed + #print("forward: ({},{})".format(self.u1, self.u2)) + elif event.key == pygame.K_DOWN: + self.u1 = -speed + self.u2 = -speed #print("forward: ({},{})".format(u1, u2)) self.rc_socket.send('({},{})\n'.format(self.u1, self.u2)) elif event.type == pygame.KEYUP: @@ -363,6 +243,13 @@ class RemoteController: r = scipy.integrate.ode(f_ode) r.set_f_params(np.array([self.u1 * 13.32, self.u2 * 13.32])) + #print(self.x0) + if self.x0 is None: + if self.xm_0 is not None: + self.x0 = self.xm_0 + self.xs = self.x0 + else: + print("error: no measurement available to initialize simulation") x = self.x0 r.set_initial_value(x, self.t) xnew = r.integrate(r.t + dt) @@ -372,30 +259,10 @@ class RemoteController: self.mutex.acquire() try: - self.ts = np.vstack((self.ts, tnew)) + self.ts = np.append(self.ts, tnew) self.xs = np.vstack((self.xs, xnew)) - #self.ys.append(xnew[1]) - #self.omegas.append(xnew[2]) finally: self.mutex.release() - # for r in self.robots: - # if r.euler is not None: - # if self.alpha_0 is not None: - # self.alphas.append(r.euler[2]-self.alpha_0) - # else: - # self.alpha_0 = r.euler[2] - # self.alphas.append(0.0) - # if r.pos is not None: - # if self.pos_0 is not None: - # self.possx.append(r.pos[0] - self.pos_0[0]) - # self.possy.append(r.pos[1] - self.pos_0[1]) - # else: - # self.pos_0 = r.pos[0:2] - # self.possx.append(0.0) - # self.possy.append(0.0) - - #print("(t,x,u) = ({},{},{})".format(tnew,xnew,[self.u1, self.u2])) - #time.sleep(0.1) elif keyboard_control_speed_test: @@ -416,6 +283,8 @@ class RemoteController: self.rc_socket.send('({},{})\n'.format(u1, u2)) elif pid: + # pid controller + events = pygame.event.get() for event in events: if event.type == pygame.KEYDOWN: @@ -431,57 +300,35 @@ class RemoteController: self.controlling = False self.rc_socket.send('({},{})\n'.format(0, 0)) - dt = 0.1 + dt = 0.05 - #i = 0.0 # 0.001 if self.controlling: # test: turn robot such that angle is zero for r in self.robots: if r.euler is not None: self.k = self.k + 1 - alpha = r.euler[2] + alpha = r.euler self.alphas.append(alpha) + + # compute error e = alpha - 0 - p = self.pp * e + + # compute integral of error (approximately) self.inc += e * dt + + # PID + p = self.pp * e + i = self.ii * self.inc d = 0.0 - u1 = p + self.ii * self.inc + d - u2 = - p - self.ii * self.inc - d + # compute controls for robot from PID + u1 = p + i + d + u2 = - p - i - d print("alpha = {}, u = ({}, {})".format(alpha, u1, u2)) self.rc_socket.send('({},{})\n'.format(u1, u2)) + time.sleep(dt) - # elif self.k == kmax: - # u1 = u2 = 0.0 - # self.rc_socket.send('({},{})\n'.format(u1, u2)) - # self.k = self.k + 1 - # - # plt.plot(np.array(self.alphas)) - # plt.show() - pass - - def calibration_sequence(self): - speed = 1.0 - u1 = speed - u2 = speed - self.rc_socket.send('({},{})\n'.format(u1, u2)) - time.sleep(4.0) - u1 = u2 = 0 - self.rc_socket.send('({},{})\n'.format(u1, u2)) - self.rc_socket.send('\n') - - # test: - # -> 1.6 m in 4 seconds - # angular velocity: angle/second - # 1.6 / (2 * pi * 0.03) - # l = 1.6 - # r = 0.03 - # t = 4.0 - # -> number of rotations n = l / (2 * pi * r) - # -> angular velocity = 2 * pi * n / t = l / (r * t) - # result: maximum angular velocity: omega_max = 13.32 rad/sec - def main(args): rospy.init_node('controller_node', anonymous=True) @@ -489,25 +336,14 @@ def main(args): rc = RemoteController() pygame.init() - pygame.display.set_mode((640, 480)) - #p = multiprocessing.Process(target=rc.show_display) - threading.Thread(target=rc.show_display).start() - #p.start() - plt.ion() - plt.pause(0.01) - #p.join() - pass + screenheight = 480 + screenwidth = 640 + screen = pygame.display.set_mode([screenwidth, screenheight]) - #rc.show_display() - #rc.calibration_sequence() - #game.loop() + threading.Thread(target=rc.controller).start() - # try: - # rospy.spin() - # except KeyboardInterrupt: - # print("Shutting down") - # cv2.destroyAllWindows() + rc.ani() if __name__ == '__main__':