import pyrealsense2 as rs import numpy as np import cv2 import os import time from shapely.geometry import LineString from queue import Queue import aruco class ArucoEstimator: corner_marker_ids = { 'a': 0, 'b': 1, 'c': 2, 'd': 3 } angles = [] corner_estimates = { 'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0 }, 'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0 }, 'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0 }, 'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0 }, } def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8, draw_marker_coordinate_system=False): self.grid_columns = grid_columns self.grid_rows = grid_rows if robot_marker_ids is None: robot_marker_ids = [] self.robot_marker_ids = robot_marker_ids self.robot_marker_estimates = dict([(id, None) for id in self.robot_marker_ids]) self.event_queue = Queue() self.draw_marker_coordinate_system = draw_marker_coordinate_system if use_realsense: # check if realsense camera is connected # Configure depth and color streams self.pipeline = rs.pipeline() config = rs.config() # config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30) #config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30) config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30) # config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30) # Start streaming self.pipeline.start(config) camera_intrinsics = self.pipeline.get_active_profile().get_stream( rs.stream.color).as_video_stream_profile().get_intrinsics() self.camera_matrix = np.zeros((3, 3)) self.camera_matrix[0][0] = camera_intrinsics.fx self.camera_matrix[1][1] = camera_intrinsics.fy self.camera_matrix[0][2] = camera_intrinsics.ppx self.camera_matrix[1][2] = camera_intrinsics.ppy self.dist_coeffs = np.array(camera_intrinsics.coeffs) # more info: https://dev.intelrealsense.com/docs/projection-in-intel-realsense-sdk-20 else: # use other camera self.cv_camera = cv2.VideoCapture(0) self.pipeline = None # create detector and get parameters self.detector = aruco.MarkerDetector() #self.detector.setDictionary('ARUCO_MIP_36h12') #self.detector.setDictionary('ARUCO_MIP_16h3') #self.detector.setDictionary('ARUCO') #self.detector.setDetectionMode(aruco.DM_NORMAL, 0.05) #self.detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.05) self.detector_params = self.detector.getParameters() # print detector parameters print("detector params:") for val in dir(self.detector_params): if not val.startswith("__"): print("\t{} : {}".format(val, self.detector_params.__getattribute__(val))) self.camparam = aruco.CameraParameters() self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "dfk72_6mm_param2.yml")) def mouse_callback(self, event, px, py, flags, param): if event == 1: if self.all_corners_detected(): # inter/extrapolate from clicked point to marker position px1 = self.corner_estimates['a']['pixel_coordinate'][0] px3 = self.corner_estimates['c']['pixel_coordinate'][0] py1 = self.corner_estimates['a']['pixel_coordinate'][1] py3 = self.corner_estimates['c']['pixel_coordinate'][1] x1 = self.corner_estimates['a']['x'] x3 = self.corner_estimates['c']['x'] y1 = self.corner_estimates['a']['y'] y3 = self.corner_estimates['c']['y'] alpha = (px - px1) / (px3 - px1) beta = (py - py1) / (py3 - py1) print(f"alpha = {alpha}, beta = {beta}") target_x = x1 + alpha * (x3 - x1) target_y = y1 + beta * (y3 - y1) target = np.array([target_x, target_y, 0]) print(f"target = ({target_x},{target_y})") self.event_queue.put(('click', target)) else: print("not all markers have been detected!") def run_tracking(self): cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE) cv2.setMouseCallback('RoboRally', self.mouse_callback) try: running = True while running: if self.pipeline: frames = self.pipeline.wait_for_frames() color_frame = frames.get_color_frame() if not color_frame: continue # Convert images to numpy arrays color_image = np.asanyarray(color_frame.get_data()) else: # Capture frame-by-frame ret, color_image = self.cv_camera.read() t_image = time.time() gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY) # run aruco marker detection detected_markers = self.detector.detect(gray) # extract data for detected markers detected_marker_data = {} for marker in detected_markers: detected_marker_data[marker.id] = {'center': marker.getCenter()} if marker.id > 0: if marker.id in self.corner_marker_ids.values(): marker.calculateExtrinsics(0.1, self.camparam) else: marker.calculateExtrinsics(0.07, self.camparam) detected_marker_data[marker.id]['Rvec'] = marker.Rvec detected_marker_data[marker.id]['Tvec'] = marker.Tvec if marker.id > 0: # draw markers onto the image marker.draw(color_image, np.array([255, 255, 255]), 2, True) if self.draw_marker_coordinate_system: aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1) # store data for id, data in detected_marker_data.items(): self.update_estimate(id, data['center'], data['Rvec'], data['Tvec'], t_image) # draw grid color_image = self.draw_grid_lines(color_image, detected_marker_data) color_image = self.draw_robot_pos(color_image, detected_marker_data) # Show images cv2.imshow('RoboRally', color_image) key = cv2.waitKey(1) if key > 0: self.event_queue.put(('key', key)) if key == ord('q'): running = False finally: cv2.destroyAllWindows() if self.pipeline is not None: # Stop streaming self.pipeline.stop() def update_estimate(self, id, pixel_coord, rvec, tvec, t_image): # update the marker estimate with new data if id in self.corner_marker_ids.values(): # for corner markers we compute the mean of all measurements s.t. the position stabilizes over time # (we assume the markers do not move) corner = next(filter(lambda key: self.corner_marker_ids[key] == id, self.corner_marker_ids.keys())) # get corresponding corner to the detected marker old_estimate_x = self.corner_estimates[corner]['x'] old_estimate_y = self.corner_estimates[corner]['y'] n_estimates = self.corner_estimates[corner]['n_estimates'] x = tvec[0][0] y = -tvec[1][0] # flip y coordinate if not any([old_estimate_x is None, old_estimate_y is None]): new_estimate_x = (n_estimates * old_estimate_x + x) / (n_estimates + 1) # weighted update new_estimate_y = (n_estimates * old_estimate_y + y) / (n_estimates + 1) else: new_estimate_x = x # first estimate new_estimate_y = y self.corner_estimates[corner]['t'] = t_image self.corner_estimates[corner]['x'] = new_estimate_x self.corner_estimates[corner]['y'] = new_estimate_y self.corner_estimates[corner]['n_estimates'] = n_estimates + 1 self.corner_estimates[corner]['pixel_coordinate'] = pixel_coord elif id in self.robot_marker_ids: # for robot markers we extract x and y position as well as the angle # here we could also implement a filter x = tvec[0][0] y = -tvec[1][0] # flip y coordinate # compute angle rot_mat, _ = cv2.Rodrigues(rvec) pose_mat = cv2.hconcat((rot_mat, tvec)) _, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat) angle = -euler_angles[2][0] * np.pi / 180.0 self.angles.append(angle) self.robot_marker_estimates[id] = {'t': t_image, 'x': x, 'y': y, 'angle': angle} def all_corners_detected(self): # checks if all corner markers have been detected at least once return not any([estimate['n_estimates'] == 0 for estimate in self.corner_estimates.values()]) def all_robots_detected(self): # checks if all robot markers have been detected at least once return not any([estimate is None for estimate in self.robot_marker_estimates.values()]) def get_pos_from_grid_point(self, x, y, orientation=None): """ returns the position for the given grid point based on the current corner estimates :param x: x position on the grid ( 0 &le x < number of grid columns) :param y: y position on the grid ( 0 &le x < number of grid rows) :param orientation: (optional) orientation in the given grid cell (one of ^, >, v, < ) :return: numpy array with corresponding real world x- and y-position if orientation was specified the array also contains the matching angle for the orientation """ assert x >= 0 and x < self.grid_columns assert y >= 0 and y < self.grid_rows assert self.all_corners_detected() # compute column line a = np.array([self.corner_estimates['a']['x'], self.corner_estimates['a']['y']]) b = np.array([self.corner_estimates['b']['x'], self.corner_estimates['b']['y']]) c = np.array([self.corner_estimates['c']['x'], self.corner_estimates['c']['y']]) d = np.array([self.corner_estimates['d']['x'], self.corner_estimates['d']['y']]) x_frac = (x + 0.5) / self.grid_columns y_frac = (y + 0.5) / self.grid_rows vab = b - a vdc = c - d column_line_top = a + x_frac * vab column_line_bottom = d + x_frac * vdc vad = d - a vbc = c - b row_line_top = a + y_frac * vad row_line_bottom = b + y_frac * vbc column_line = LineString([column_line_top, column_line_bottom]) row_line = LineString([row_line_top, row_line_bottom]) int_pt = column_line.intersection(row_line) point_of_intersection = np.array([int_pt.x, int_pt.y]) if orientation is not None: # compute angle corresponding to the orientation w.r.t. the grid # TODO: test this code angle_ab = np.arctan2(vab[1], vab[0]) angle_dc = np.arctan2(vdc[1], vdc[0]) angle_ad = np.arctan2(vad[1], vad[0]) angle_bc = np.arctan2(vbc[1], vbc[0]) angle = 0.0 if orientation == '>': angle = y_frac * angle_ab + (1 - y_frac) * angle_dc elif orientation == '<': angle = y_frac * angle_ab + (1 - y_frac) * angle_dc + np.pi elif orientation == 'v': angle = x_frac * angle_ad + (1 - x_frac) * angle_bc elif orientation == '^': angle = x_frac * angle_ad + (1 - x_frac) * angle_bc + np.pi return np.array((point_of_intersection[0], point_of_intersection[1], angle)) else: return point_of_intersection def get_grid_point_from_pos(self): # TODO return the nearest grid point for the given position estimate pass def print_corner_estimates(self): for key, value in self.corner_estimates.items(): if value['n_estimates'] != 0: print(f"corner marker {key} at pos ({value['x']},{value['y']})") def draw_corner_line(self, frame, corner_1, corner_2, corner_coords_dict): # draws a line between the given markers onto the given frame if corner_1 in corner_coords_dict and corner_2 in corner_coords_dict: frame = cv2.line(frame, corner_coords_dict[corner_1], corner_coords_dict[corner_2], color=(0, 0, 255), thickness=2) return frame def draw_grid_lines(self, frame, detected_marker_data): # draws a grid onto the given frame board_corners_pixel_coords = {} for corner, id in self.corner_marker_ids.items(): if id in detected_marker_data.keys(): board_corners_pixel_coords[corner] = tuple(detected_marker_data[id]['center']) frame = self.draw_corner_line(frame, 'a', 'b', board_corners_pixel_coords) frame = self.draw_corner_line(frame, 'b', 'c', board_corners_pixel_coords) frame = self.draw_corner_line(frame, 'c', 'd', board_corners_pixel_coords) frame = self.draw_corner_line(frame, 'd', 'a', board_corners_pixel_coords) if set(board_corners_pixel_coords.keys()) == set(self.corner_marker_ids.keys()): # all markers have been detected # compute column line a = np.array(board_corners_pixel_coords['a']) b = np.array(board_corners_pixel_coords['b']) c = np.array(board_corners_pixel_coords['c']) d = np.array(board_corners_pixel_coords['d']) vab = b - a vdc = c - d for x in range(1,self.grid_columns): column_line_top = a + x / self.grid_columns * vab column_line_bottom = d + x / self.grid_columns * vdc frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(0, 255, 0), thickness=1) vad = d - a vbc = c - b for y in range(1, self.grid_rows): row_line_top = a + y / self.grid_rows * vad row_line_bottom = b + y / self.grid_rows * vbc frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(0, 255, 0), thickness=1) return frame def get_robot_state_estimate(self, id): if id in self.robot_marker_estimates: if self.robot_marker_estimates[id] is not None: return np.array([self.robot_marker_estimates[id]['x'], self.robot_marker_estimates[id]['y'], self.robot_marker_estimates[id]['angle']]) else: print(f"error: no estimate available for robot {id}") return None else: print(f"error: invalid robot id {id}") return None def draw_robot_pos(self, frame, detected_marker_data): # draws information about the robot positions onto the given frame robot_corners_pixel_coords = {} for id, estimate in self.robot_marker_estimates.items(): if id in detected_marker_data.keys(): robot_corners_pixel_coords[id] = tuple(detected_marker_data[id]['center']) for id, coord in robot_corners_pixel_coords.items(): x = self.robot_marker_estimates[id]['x'] y = self.robot_marker_estimates[id]['y'] angle = self.robot_marker_estimates[id]['angle'] frame = cv2.putText(frame, "pos = ({:5.3f}, {:5.3f}), ang = {:5.3f}".format(x, y, angle), coord, cv2.FONT_HERSHEY_SIMPLEX, 0.50, (0,255,0)) return frame if __name__ == "__main__": estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[15], draw_marker_coordinate_system=True) estimator.run_tracking()