import pyrealsense2 as rs import numpy as np import cv2 import os import time from shapely.geometry import LineString from queue import Queue import aruco class ArucoEstimator: corner_marker_ids = { 'a': 0, 'b': 1, 'c': 2, 'd': 3 } corner_estimates = { 'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0}, 'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0}, 'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0}, 'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0}, } def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8, draw_marker_coordinate_system=False): self.grid_columns = grid_columns self.grid_rows = grid_rows if robot_marker_ids is None: robot_marker_ids = [] self.robot_marker_ids = robot_marker_ids self.robot_marker_estimates = dict([(marker_id, {'t': None, 'x': None, 'y': None, 'angle': None}) for marker_id in self.robot_marker_ids]) self.event_queue = Queue() self.draw_marker_coordinate_system = draw_marker_coordinate_system if use_realsense: # check if realsense camera is connected # Configure depth and color streams self.pipeline = rs.pipeline() config = rs.config() # config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30) # config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30) config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30) # config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30) # Start streaming self.pipeline.start(config) camera_intrinsics = self.pipeline.get_active_profile().get_stream( rs.stream.color).as_video_stream_profile().get_intrinsics() self.camera_matrix = np.zeros((3, 3)) self.camera_matrix[0][0] = camera_intrinsics.fx self.camera_matrix[1][1] = camera_intrinsics.fy self.camera_matrix[0][2] = camera_intrinsics.ppx self.camera_matrix[1][2] = camera_intrinsics.ppy self.dist_coeffs = np.array(camera_intrinsics.coeffs) # more info: https://dev.intelrealsense.com/docs/projection-in-intel-realsense-sdk-20 else: # use other camera self.cv_camera = cv2.VideoCapture(0) self.pipeline = None # create detector and get parameters self.detector = aruco.MarkerDetector() # self.detector.setDictionary('ARUCO_MIP_36h12') # self.detector.setDictionary('ARUCO_MIP_16h3') # self.detector.setDictionary('ARUCO') # self.detector.setDetectionMode(aruco.DM_NORMAL, 0.05) # self.detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.05) self.detector_params = self.detector.getParameters() # print detector parameters print("detector params:") for val in dir(self.detector_params): if not val.startswith("__"): print("\t{} : {}".format(val, self.detector_params.__getattribute__(val))) self.camparam = aruco.CameraParameters() self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "dfk72_6mm_param2.yml")) def mouse_callback(self, event, px, py, flags, param): """ This function is called for each mouse event inside the opencv window. If there are reference markers available we compute the real world position corresponding to the clicked position and put it in an event queue. :param event: type of event (mouse movement, left click, right click, etc.) :param px: x-position of event :param py: y-position of event """ if event == cv2.EVENT_LBUTTONDOWN: if self.all_corners_detected(): # inter/extrapolate from clicked point to marker position px1 = self.corner_estimates['a']['pixel_coordinate'][0] px3 = self.corner_estimates['c']['pixel_coordinate'][0] py1 = self.corner_estimates['a']['pixel_coordinate'][1] py3 = self.corner_estimates['c']['pixel_coordinate'][1] x1 = self.corner_estimates['a']['x'] x3 = self.corner_estimates['c']['x'] y1 = self.corner_estimates['a']['y'] y3 = self.corner_estimates['c']['y'] alpha = (px - px1) / (px3 - px1) beta = (py - py1) / (py3 - py1) print(f"alpha = {alpha}, beta = {beta}") target_x = x1 + alpha * (x3 - x1) target_y = y1 + beta * (y3 - y1) target = np.array([target_x, target_y, 0]) print(f"target = ({target_x},{target_y})") self.event_queue.put(('click', target)) else: print("not all markers have been detected!") def run_tracking(self): """ Run the marker tracking in a loop """ cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE) cv2.setMouseCallback('RoboRally', self.mouse_callback) try: running = True while running: if self.pipeline: frames = self.pipeline.wait_for_frames() color_frame = frames.get_color_frame() if not color_frame: continue # Convert images to numpy arrays color_image = np.asanyarray(color_frame.get_data()) else: # Capture frame-by-frame ret, color_image = self.cv_camera.read() t_image = time.time() gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY) # run aruco marker detection detected_markers = self.detector.detect(gray) # extract data for detected markers detected_marker_data = {} for marker in detected_markers: detected_marker_data[marker.id] = {'marker_center': marker.getCenter()} if marker.id > 0: if marker.id in self.corner_marker_ids.values(): marker.calculateExtrinsics(0.1, self.camparam) else: marker.calculateExtrinsics(0.07, self.camparam) detected_marker_data[marker.id]['Rvec'] = marker.Rvec detected_marker_data[marker.id]['Tvec'] = marker.Tvec if marker.id > 0: # draw markers onto the image marker.draw(color_image, np.array([255, 255, 255]), 2, True) if self.draw_marker_coordinate_system: aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1) # store data for marker_id, data in detected_marker_data.items(): self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image) # draw grid color_image = self.draw_grid_lines(color_image, detected_marker_data) color_image = self.draw_robot_pos(color_image, detected_marker_data) # Show images cv2.imshow('RoboRally', color_image) key = cv2.waitKey(1) if key > 0: self.event_queue.put(('key', key)) if key == ord('q'): running = False finally: cv2.destroyAllWindows() if self.pipeline is not None: # Stop streaming self.pipeline.stop() def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image): # update the marker estimate with new data if marker_id in self.corner_marker_ids.values(): # for corner markers we compute the mean of all measurements s.t. the position stabilizes over time # (we assume the markers do not move) # get corresponding corner to the detected marker corner = next(filter(lambda key: self.corner_marker_ids[key] == marker_id, self.corner_marker_ids.keys())) old_estimate_x = self.corner_estimates[corner]['x'] old_estimate_y = self.corner_estimates[corner]['y'] n_estimates = self.corner_estimates[corner]['n_estimates'] x = tvec[0][0] y = -tvec[1][0] # flip y coordinate if not any([old_estimate_x is None, old_estimate_y is None]): new_estimate_x = (n_estimates * old_estimate_x + x) / (n_estimates + 1) # weighted update new_estimate_y = (n_estimates * old_estimate_y + y) / (n_estimates + 1) else: new_estimate_x = x # first estimate new_estimate_y = y self.corner_estimates[corner]['t'] = t_image self.corner_estimates[corner]['x'] = new_estimate_x self.corner_estimates[corner]['y'] = new_estimate_y self.corner_estimates[corner]['n_estimates'] = n_estimates + 1 self.corner_estimates[corner]['pixel_coordinate'] = pixel_coord_center elif marker_id in self.robot_marker_ids: # for robot markers we extract x and y position as well as the angle # here we could also implement a filter x = tvec[0][0] y = -tvec[1][0] # flip y coordinate # compute angle rot_mat, _ = cv2.Rodrigues(rvec) pose_mat = cv2.hconcat((rot_mat, tvec)) _, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat) angle = -euler_angles[2][0] * np.pi / 180.0 self.robot_marker_estimates[marker_id] = {'t': t_image, 'x': x, 'y': y, 'angle': angle} def all_corners_detected(self): # checks if all corner markers have been detected at least once return not any([estimate['n_estimates'] == 0 for estimate in self.corner_estimates.values()]) def all_robots_detected(self): # checks if all robot markers have been detected at least once return not any([estimate['t'] is None for estimate in self.robot_marker_estimates.values()]) def get_pos_from_grid_point(self, x, y, orientation=None): """ returns the position for the given grid point based on the current corner estimates :param x: x position on the grid ( 0 &le x < number of grid columns) :param y: y position on the grid ( 0 &le x < number of grid rows) :param orientation: (optional) orientation in the given grid cell (one of ^, >, v, < ) :return: numpy array with corresponding real world x- and y-position if orientation was specified the array also contains the matching angle for the orientation """ assert 0 <= x < self.grid_columns assert 0 <= y < self.grid_rows assert self.all_corners_detected() # compute column line a = np.array([self.corner_estimates['a']['x'], self.corner_estimates['a']['y']]) b = np.array([self.corner_estimates['b']['x'], self.corner_estimates['b']['y']]) c = np.array([self.corner_estimates['c']['x'], self.corner_estimates['c']['y']]) d = np.array([self.corner_estimates['d']['x'], self.corner_estimates['d']['y']]) x_frac = (x + 0.5) / self.grid_columns y_frac = (y + 0.5) / self.grid_rows vab = b - a vdc = c - d column_line_top = a + x_frac * vab column_line_bottom = d + x_frac * vdc vad = d - a vbc = c - b row_line_top = a + y_frac * vad row_line_bottom = b + y_frac * vbc column_line = LineString([column_line_top, column_line_bottom]) row_line = LineString([row_line_top, row_line_bottom]) int_pt = column_line.intersection(row_line) point_of_intersection = np.array([int_pt.x, int_pt.y]) if orientation is not None: # compute angle corresponding to the orientation w.r.t. the grid # TODO: test this code angle_ab = np.arctan2(vab[1], vab[0]) angle_dc = np.arctan2(vdc[1], vdc[0]) angle_ad = np.arctan2(vad[1], vad[0]) angle_bc = np.arctan2(vbc[1], vbc[0]) angle = 0.0 if orientation == '>': angle = y_frac * angle_ab + (1 - y_frac) * angle_dc elif orientation == '<': angle = y_frac * angle_ab + (1 - y_frac) * angle_dc + np.pi elif orientation == 'v': angle = x_frac * angle_ad + (1 - x_frac) * angle_bc elif orientation == '^': angle = x_frac * angle_ad + (1 - x_frac) * angle_bc + np.pi return np.array((point_of_intersection[0], point_of_intersection[1], angle)) else: return point_of_intersection def get_grid_point_from_pos(self): # TODO return the nearest grid point for the given position estimate pass def print_corner_estimates(self): for key, value in self.corner_estimates.items(): if value['n_estimates'] != 0: print(f"corner marker {key} at pos ({value['x']},{value['y']})") def draw_corner_line(self, frame, corner_1, corner_2): # draws a line between the given markers onto the given frame corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate'] corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate'] if corner_1_center is not None and corner_2_center is not None: frame = cv2.line(frame, corner_1_center, corner_2_center, color=(0, 0, 255), thickness=2) return frame def draw_grid_lines(self, frame, detected_marker_data): # draws a grid onto the given frame frame = self.draw_corner_line(frame, 'a', 'b') frame = self.draw_corner_line(frame, 'b', 'c') frame = self.draw_corner_line(frame, 'c', 'd') frame = self.draw_corner_line(frame, 'd', 'a') if not any([estimate['pixel_coordinate'] is None for estimate in self.corner_estimates.values()]): # compute outlines of board a = self.corner_estimates['a']['pixel_coordinate'] b = self.corner_estimates['b']['pixel_coordinate'] c = self.corner_estimates['c']['pixel_coordinate'] d = self.corner_estimates['d']['pixel_coordinate'] # draw vertical lines vab = b - a vdc = c - d for x in range(1, self.grid_columns): column_line_top = a + x / self.grid_columns * vab column_line_bottom = d + x / self.grid_columns * vdc frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(0, 255, 0), thickness=1) # draw horizontal lines vad = d - a vbc = c - b for y in range(1, self.grid_rows): row_line_top = a + y / self.grid_rows * vad row_line_bottom = b + y / self.grid_rows * vbc frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(0, 255, 0), thickness=1) return frame def get_robot_state_estimate(self, marker_id): if marker_id in self.robot_marker_estimates: if self.robot_marker_estimates[marker_id]['t'] is not None: return np.array([self.robot_marker_estimates[marker_id]['x'], self.robot_marker_estimates[marker_id]['y'], self.robot_marker_estimates[marker_id]['angle']]) else: print(f"error: no estimate available for robot {marker_id}") return None else: print(f"error: invalid robot id {marker_id}") return None def draw_robot_pos(self, frame, detected_marker_data): # draws information about the robot positions onto the given frame robot_corners_pixel_coords = {} for marker_id, estimate in self.robot_marker_estimates.items(): if marker_id in detected_marker_data.keys(): robot_corners_pixel_coords[marker_id] = tuple(detected_marker_data[marker_id]['center']) for marker_id, coord in robot_corners_pixel_coords.items(): x = self.robot_marker_estimates[marker_id]['x'] y = self.robot_marker_estimates[marker_id]['y'] angle = self.robot_marker_estimates[marker_id]['angle'] frame = cv2.putText(frame, "pos = ({:5.3f}, {:5.3f}), ang = {:5.3f}".format(x, y, angle), coord, cv2.FONT_HERSHEY_SIMPLEX, 0.50, (0, 255, 0)) return frame if __name__ == "__main__": estimator = ArucoEstimator(use_realsense=False, draw_marker_coordinate_system=True) estimator.run_tracking()