## License: Apache 2.0. See LICENSE file in root directory. ## Copyright(c) 2015-2017 Intel Corporation. All Rights Reserved. ############################################### ## Open CV and Numpy integration ## ############################################### import pyrealsense2 as rs import numpy as np import cv2 from cv2 import aruco from shapely.geometry import LineString import time DS5_product_ids = ["0AD1", "0AD2", "0AD3", "0AD4", "0AD5", "0AF6", "0AFE", "0AFF", "0B00", "0B01", "0B03", "0B07","0B3A"] def find_device_that_supports_advanced_mode() : ctx = rs.context() ds5_dev = rs.device() devices = ctx.query_devices() for dev in devices: if dev.supports(rs.camera_info.product_id) and str(dev.get_info(rs.camera_info.product_id)) in DS5_product_ids: if dev.supports(rs.camera_info.name): print("Found device that supports advanced mode:", dev.get_info(rs.camera_info.name)) return dev raise Exception("No device that supports advanced mode was found") class ArucoEstimator: grid_columns = 10 grid_rows = 8 corner_marker_ids = { 'a': 0, 'b': 1, 'c': 2, 'd': 3 } robot_marker_ids = [12] robot_marker_estimates = dict([(id, None) for id in robot_marker_ids]) angles = [] corner_estimates = { 'a': (None, 0), # (estimate, n_estimates) 'b': (None, 0), 'c': (None, 0), 'd': (None, 0) } def __init__(self): if True: # check if realsense camera is connected # Configure depth and color streams self.pipeline = rs.pipeline() config = rs.config() # config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30) # config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30) config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30) # config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30) # Start streaming self.pipeline.start(config) camera_intrinsics = self.pipeline.get_active_profile().get_stream( rs.stream.color).as_video_stream_profile().get_intrinsics() self.camera_matrix = np.zeros((3, 3)) self.camera_matrix[0][0] = camera_intrinsics.fx self.camera_matrix[1][1] = camera_intrinsics.fy self.camera_matrix[0][2] = camera_intrinsics.ppx self.camera_matrix[1][2] = camera_intrinsics.ppy self.dist_coeffs = np.array(camera_intrinsics.coeffs) # more info: https://dev.intelrealsense.com/docs/projection-in-intel-realsense-sdk-20 else: # use other camera self.cv_camera = cv2.VideoCapture(0) self.pipeline = None # array containing pose estimates for each marker estimates = {} def run_tracking(self): try: while True: if self.pipeline: frames = self.pipeline.wait_for_frames() color_frame = frames.get_color_frame() if not color_frame: continue # Convert images to numpy arrays color_image = np.asanyarray(color_frame.get_data()) else: # Capture frame-by-frame ret, color_image = self.cv_camera.read() gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY) # aruco_dict = aruco.Dictionary_get(aruco.DICT_5X5_250) aruco_dict = aruco.Dictionary_get(aruco.DICT_ARUCO_ORIGINAL) parameters = aruco.DetectorParameters_create() corners, ids, rejectedImgPoints = aruco.detectMarkers(gray, aruco_dict, parameters=parameters) frame = aruco.drawDetectedMarkers(color_image.copy(), corners, ids) if ids is not None: for id, c in zip(ids, corners): res = aruco.estimatePoseSingleMarkers(c, 0.10, self.camera_matrix, self.dist_coeffs) rvecs = res[0] tvecs = res[1] self.update_estimate(id, rvecs, tvecs) frame = self.draw_grid_lines(frame, corners, ids) frame = self.draw_robot_pos(frame, corners, ids) # Show images cv2.namedWindow('RealSense', cv2.WINDOW_AUTOSIZE) cv2.imshow('RealSense', frame) cv2.waitKey(1) finally: # Stop streaming self.pipeline.stop() # import matplotlib.pyplot as plt # plt.plot(playboard.angles) # plt.show() def update_estimate(self, id, rvec, tvec): # update the marker estimate with new data if id in self.corner_marker_ids.values(): # for corner markers we compute the mean of all measurements s.t. the position stabilizes over time # (we assume the markers do not move) corner = next(filter(lambda key: self.corner_marker_ids[key] == id, self.corner_marker_ids.keys())) # get corresponding corner to the detected marker old_estimate = self.corner_estimates[corner][0] n_estimates = self.corner_estimates[corner][1] tvec_proj = tvec[0][0][0:2] # projection to get rid of z coordinate tvec_proj = np.array((tvec_proj[0], -tvec_proj[1])) # flip y coordinate if old_estimate is not None: new_estimate = (n_estimates * old_estimate + tvec_proj) / (n_estimates + 1) # weighted update else: new_estimate = tvec_proj # first estimate self.corner_estimates[corner] = (new_estimate, n_estimates + 1) elif id in self.robot_marker_ids: # for robot markers we extract x and y position as well as the angle # here we could also implement a filter x = tvec[0][0][0] y = -tvec[0][0][1] # flip y coordinate # compute angle rot_mat, _ = cv2.Rodrigues(rvec[0][0]) pose_mat = cv2.hconcat((rot_mat, tvec[0][0])) _, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat) angle = -euler_angles[2][0] * np.pi / 180.0 self.angles.append(angle) self.robot_marker_estimates[id[0]] = (x, y, angle) def all_corners_detected(self): # checks if all corner markers have been detected at least once return not any([estimate[0] is None for estimate in self.corner_estimates.values()]) def get_pos_from_grid_point(self, x, y): # returns the position for the given grid point based on the current corner estimates assert x >= 0 and x < self.grid_columns assert y >= 0 and y < self.grid_rows assert self.all_corners_detected() # compute column line a = self.corner_estimates['a'][0] b = self.corner_estimates['b'][0] c = self.corner_estimates['c'][0] d = self.corner_estimates['d'][0] vab = b - a vdc = c - d column_line_top = a + (x + 0.5)/self.grid_columns * vab column_line_bottom = d + (x + 0.5) / self.grid_columns * vdc vad = d - a vbc = c - b row_line_top = a + (y + 0.5)/self.grid_rows * vad row_line_bottom = b + (y + 0.5) / self.grid_rows * vbc column_line = LineString([column_line_top, column_line_bottom]) row_line = LineString([row_line_top, row_line_bottom]) int_pt = column_line.intersection(row_line) point_of_intersection = np.array([int_pt.x, int_pt.y]) return point_of_intersection def get_grid_point_from_pos(self): # return the nearest grid point for the given position estimate pass def print_corner_estimates(self): for key, value in self.corner_estimates.items(): if value[0] is not None: print("corner marker {} at pos {}".format(key, value[0])) print() def draw_corner_line(self, frame, corner_1, corner_2, corner_coords_dict): # draws a line between the given markers onto the given frame if corner_1 in corner_coords_dict and corner_2 in corner_coords_dict: frame = cv2.line(frame, corner_coords_dict[corner_1], corner_coords_dict[corner_2], color=(0, 0, 255), thickness=2) return frame def draw_grid_lines(self, frame, corners, ids): # draws a grid onto the given frame board_corners_pixel_coords = {} for corner, id in self.corner_marker_ids.items(): try: ind, _ = np.where(ids == id) # find index ind = ind[0] board_corners_pixel_coords[corner] = tuple(np.mean(corners[ind][0], axis=0)) except IndexError: pass frame = self.draw_corner_line(frame, 'a', 'b', board_corners_pixel_coords) frame = self.draw_corner_line(frame, 'b', 'c', board_corners_pixel_coords) frame = self.draw_corner_line(frame, 'c', 'd', board_corners_pixel_coords) frame = self.draw_corner_line(frame, 'd', 'a', board_corners_pixel_coords) if set(board_corners_pixel_coords.keys()) == set(self.corner_marker_ids.keys()): # all markers have been detected # compute column line a = np.array(board_corners_pixel_coords['a']) b = np.array(board_corners_pixel_coords['b']) c = np.array(board_corners_pixel_coords['c']) d = np.array(board_corners_pixel_coords['d']) vab = b - a vdc = c - d for x in range(1,self.grid_columns): column_line_top = a + x / self.grid_columns * vab column_line_bottom = d + x / self.grid_columns * vdc frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(255, 0, 0), thickness=1) vad = d - a vbc = c - b for y in range(1, self.grid_rows): row_line_top = a + y / self.grid_rows * vad row_line_bottom = b + y / self.grid_rows * vbc frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(255, 0, 0), thickness=1) return frame def get_robot_state_estimate(self, id): if id in self.robot_marker_estimates: if self.robot_marker_estimates[id] is not None: return self.robot_marker_estimates[id] else: print(f"error: no estimate available for robot {id}") else: print(f"error: invalid robot id {id}") def draw_robot_pos(self, frame, corners, ids): # draws information about the robot positions onto the given frame robot_corners_pixel_coords = {} for id, estimate in self.robot_marker_estimates.items(): try: ind, _ = np.where(ids == id) # find index ind = ind[0] robot_corners_pixel_coords[id] = tuple(np.mean(corners[ind][0], axis=0)) except IndexError: pass for id, coord in robot_corners_pixel_coords.items(): x = self.robot_marker_estimates[id][0] y = self.robot_marker_estimates[id][1] angle = self.robot_marker_estimates[id][2] frame = cv2.putText(frame, "pos = ({:5.3f}, {:5.3f}), ang = {:5.3f}".format(x, y, angle), coord, cv2.FONT_HERSHEY_SIMPLEX, 0.50, (0,255,0)) return frame if __name__ == "__main__": playboard = Board()