import pyrealsense2 as rs import numpy as np import cv2 import os import time import math from pyqtgraph.Qt import QtCore, QtGui from pyqtgraph.parametertree import Parameter, ParameterTree, parameterTypes import pyqtgraph as pg from shapely.geometry import LineString from shapely.affinity import scale from queue import Queue import aruco class FPSCounter: def __init__(self): self.fps_start_time = time.time() self.fps_counter = 0 self.fps_display_rate = 1.0 def get_fps(self): self.fps_counter += 1 delta_t = time.time() - self.fps_start_time if delta_t > self.fps_display_rate: self.fps_counter = 0 self.fps_start_time = time.time() return self.fps_counter / delta_t ## test add/remove ## this group includes a menu allowing the user to add new parameters into its child list class RobotMarkerGroup(parameterTypes.GroupParameter): def __init__(self, **opts): opts['type'] = 'group' opts['addText'] = "Add Marker" opts['addList'] = ['Robot'] parameterTypes.GroupParameter.__init__(self, **opts) def addNew(self, typ): current_robots = self.children() current_indices = [int(r.name().split(' ')[1]) for r in current_robots] new_index = len(current_indices) + 1 val = 0 self.addChild(dict(name=f"Robot {new_index}", type='int', value=val, removable=False, renamable=True)) class CornerMarkerGroup(parameterTypes.GroupParameter): def __init__(self, **opts): opts['type'] = 'group' opts['addText'] = "Add Marker" opts['addList'] = ['Corner'] parameterTypes.GroupParameter.__init__(self, **opts) def addNew(self, typ): current_corners = self.children() current_chars = [str(r.name().split(' ')[1]) for r in current_corners] new_char = chr(ord(current_chars[-1]) + 1) val = 0 self.addChild(dict(name=f"Corner {new_char}", type='int', value=val, removable=False)) class ArucoEstimator: def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=7, grid_rows=4): self.app = QtGui.QApplication([]) ## Create window with GraphicsView widget self.win = QtGui.QWidget() self.layout = QtGui.QGridLayout() self.win.setLayout(self.layout) self.win.keyPressEvent = self.handleKeyPressEvent self.win.setWindowTitle('ArucoEstimator') self.plotwidget = pg.PlotWidget() self.layout.addWidget(self.plotwidget) ## lock the aspect ratio so pixels are always square self.plotwidget.setAspectLocked(True) self.plotwidget.getPlotItem().getAxis('left').hide() self.plotwidget.getPlotItem().getAxis('bottom').hide() ## Create image item self.img = pg.ImageItem(border='w') self.img.setLevels([[0, 255], [0, 255], [0, 255]]) self.img.mouseClickEvent = self.handleMouseEvent self.plotwidget.addItem(self.img) # fps display self.fps_counter = FPSCounter() self.fps_overlay = pg.TextItem('fps = 0', color=(255, 255, 0), anchor=(0,1)) self.plotwidget.addItem(self.fps_overlay) self.invert_grayscale = False self.draw_grid = True self.draw_markers = True self.draw_marker_coordinate_system = False self.corner_marker_size = 0.075 self.corner_marker_ids = { 'a': 0, 'b': 1, 'c': 2, 'd': 3 } self.corner_estimates = { 'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0}, 'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0}, 'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0}, 'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0}, } self.grid_columns = grid_columns self.grid_rows = grid_rows self.robot_marker_size = 0.07 if robot_marker_ids is None: robot_marker_ids = [] self.robot_marker_ids = robot_marker_ids self.robot_marker_estimates = dict([(marker_id, {'t': None, 'x': None, 'y': None, 'angle': None}) for marker_id in self.robot_marker_ids]) robot_marker_group = [{'name': f'Robot {ind}', 'type': 'int', 'value': marker_id} for ind, marker_id in enumerate(self.robot_marker_ids, 1)] corner_marker_group = [{'name': f'Corner {letter}', 'type': 'int', 'value': marker_id} for letter, marker_id in self.corner_marker_ids.items()] self.threshold = 10.0 # parameter params_spec = [ {'name': 'Corner marker size', 'type': 'float', 'value': self.corner_marker_size, 'siPrefix': True, 'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001}, {'name': 'Robot marker size', 'type': 'float', 'value': self.robot_marker_size, 'siPrefix': True, 'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001}, {'name': 'Draw markers', 'type': 'bool', 'value': self.draw_markers}, {'name': 'Draw marker coordinate system', 'type': 'bool', 'value': self.draw_marker_coordinate_system}, {'name': 'Threshold', 'type': 'float', 'value': self.threshold}, {'name': 'Invert grayscale', 'type': 'bool', 'value': self.invert_grayscale, 'tip': "Invert grayscale image before marker detection"}, {'name': 'Show FPS', 'type': 'bool', 'value': True, 'tip': "Display frames per second counter"}, {'name': 'Draw grid', 'type': 'bool', 'value': self.draw_grid, 'tip': "Draw grid spanned by the markers 0 - 3"}, {'name': 'Grid columns', 'type': 'int', 'value': self.grid_columns, 'tip': "Number of columns for the grid"}, {'name': 'Grid rows', 'type': 'int', 'value': self.grid_rows, 'tip': "Number of rows for the grid"}, {'name': 'Display mode', 'type': 'list', 'values': ['color', 'grayscale'], 'value': 'grayscale', 'tip': "Display mode for the video"}, {'name': 'Autoexposure', 'type': 'bool', 'value': True}, {'name': 'Controlled robot', 'type': 'list', 'values': self.robot_marker_ids, 'tip': 'Robot to control'}, RobotMarkerGroup(name="Robot markers", children=robot_marker_group), CornerMarkerGroup(name="Corner markers", children=corner_marker_group), ] self.params = Parameter.create(name='params', type='group', children=params_spec) self.params.param('Invert grayscale').sigValueChanged.connect(lambda _, v: self.__setattr__('invert_grayscale', v)) self.params.param('Threshold').sigValueChanged.connect(lambda _, v: self.__setattr__('threshold', v)) self.params.param('Draw markers').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_markers', v)) self.params.param('Draw marker coordinate system').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_marker_coordinate_system', v)) self.params.param('Draw grid').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_grid', v)) self.params.param('Grid columns').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_columns', v)) self.params.param('Grid rows').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_rows', v)) self.params.param('Corner marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('corner_marker_size', v)) self.params.param('Robot marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('robot_marker_size', v)) self.params.param('Show FPS').sigValueChanged.connect(lambda _, v: self.fps_overlay.show() if v else self.fps_overlay.hide()) self.params.param('Autoexposure').sigValueChanged.connect(lambda _, v: self.set_autoexposure(v)) self.params.param('Controlled robot').sigValueChanged.connect(lambda _, v: self.event_queue.put(('controlled_robot', {'robot_id': v}))) self.paramtree = ParameterTree() self.paramtree.setParameters(self.params, showTop=False) self.layout.addWidget(self.paramtree) self.win.show() ## show widget alone in its own window self.event_queue = Queue() if use_realsense: # check if realsense camera is connected # Configure depth and color streams self.pipeline = rs.pipeline() config = rs.config() # config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30) config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30) # Start streaming self.pipeline.start(config) # enable auto exposure self.set_autoexposure(self.params['Autoexposure']) camera_intrinsics = self.pipeline.get_active_profile().get_stream( rs.stream.color).as_video_stream_profile().get_intrinsics() self.camera_matrix = np.zeros((3, 3)) self.camera_matrix[0][0] = camera_intrinsics.fx self.camera_matrix[1][1] = camera_intrinsics.fy self.camera_matrix[0][2] = camera_intrinsics.ppx self.camera_matrix[1][2] = camera_intrinsics.ppy self.dist_coeffs = np.array(camera_intrinsics.coeffs) # more info: https://dev.intelrealsense.com/docs/projection-in-intel-realsense-sdk-20 else: # use other camera self.cv_camera = cv2.VideoCapture(0) self.pipeline = None # create detector and get parameters # print detector parameters # print("detector params:") # print(self.detector_params) # for val in dir(self.detector_params): # if not val.startswith("__"): # print("\t{} : {}".format(val, self.detector_params.__getattribute__(val))) self.camparam = aruco.CameraParameters() if use_realsense: self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "realsense.yml")) else: self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "dfk72_6mm_param2.yml")) self.drag_line_end = None self.drag_line_start = None self.previous_click = None def set_autoexposure(self, v): if self.pipeline is not None: color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1] color_sensor.set_option(rs.option.enable_auto_exposure, v) print(color_sensor.get_option(rs.option.enable_auto_exposure)) def compute_clicked_position(self, px, py): if self.all_corners_detected(): # inter/extrapolate from clicked point to marker position px1 = self.corner_estimates['a']['pixel_coordinate'][0] px3 = self.corner_estimates['c']['pixel_coordinate'][0] py1 = self.corner_estimates['a']['pixel_coordinate'][1] py3 = self.corner_estimates['c']['pixel_coordinate'][1] x1 = self.corner_estimates['a']['x'] x3 = self.corner_estimates['c']['x'] y1 = self.corner_estimates['a']['y'] y3 = self.corner_estimates['c']['y'] alpha = (px - px1) / (px3 - px1) beta = (py - py1) / (py3 - py1) print(f"alpha = {alpha}, beta = {beta}") target_x = x1 + alpha * (x3 - x1) target_y = y1 + beta * (y3 - y1) target = np.array([target_x, target_y]) else: print("not all markers have been detected!") target = np.array([px, -py]) return target def handleMouseEvent(self, event): # get click position as distance to top-left corner of the image px = event.pos().x() py = self.img.height() - event.pos().y() print(f"px = {px}, py = {py}") if event.button() == QtCore.Qt.MouseButton.LeftButton: # self.drag_line_start = (px, py) # elif event == cv2.EVENT_LBUTTONUP: self.drag_line_end = (px, py) self.drag_line_start = (px, py) target_pos = self.compute_clicked_position(self.drag_line_start[0], self.drag_line_start[1]) if self.drag_line_start != self.drag_line_end: # compute target angle for clicked position facing_pos = self.compute_clicked_position(px, py) v = facing_pos - target_pos target_angle = math.atan2(v[1], v[0]) else: # determine angle from previously clicked pos (= self.drag_line_end) if self.previous_click is not None: previous_pos = self.compute_clicked_position(self.previous_click[0], self.previous_click[1]) v = target_pos - previous_pos target_angle = math.atan2(v[1], v[0]) else: target_angle = 0.0 target = np.array([target_pos[0], target_pos[1], target_angle]) print(target) self.previous_click = (px, py) self.event_queue.put(('click', {'x': target[0], 'y': target[1], 'angle': target[2]})) self.drag_line_start = None elif event == cv2.EVENT_MOUSEMOVE: if self.drag_line_start is not None: self.drag_line_end = (px, py) def process_frame(self): if self.pipeline: frames = self.pipeline.wait_for_frames() color_frame = frames.get_color_frame() # Convert images to numpy arrays color_image = np.asanyarray(color_frame.get_data()) else: # Capture frame-by-frame ret, color_image = self.cv_camera.read() t_image = time.time() gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY) if self.invert_grayscale: cv2.bitwise_not(gray, gray) detector = aruco.MarkerDetector() detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.01) #detector_params = detector.getParameters() # run aruco marker detection detected_markers = detector.detect(gray) # detected_markers2 = detector.detect(gray) #gray = detector.getThresholdedImage() display_mode = self.params.param('Display mode').value() #print(f"detected_markers = {[marker.id for marker in detected_markers]}") #print("threshold = ", self.threshold) # extract data for detected markers detected_marker_data = {} for marker in detected_markers: if marker.id >= 0: detected_marker_data[marker.id] = {'marker_center': marker.getCenter()} if marker.id in self.corner_marker_ids.values(): marker.calculateExtrinsics(self.corner_marker_size, self.camparam) else: marker.calculateExtrinsics(self.robot_marker_size, self.camparam) detected_marker_data[marker.id]['Rvec'] = marker.Rvec detected_marker_data[marker.id]['Tvec'] = marker.Tvec if self.draw_markers: marker.draw(color_image, np.array([255, 255, 255]), 2, True) if self.draw_marker_coordinate_system: aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1) # store data for marker_id, data in detected_marker_data.items(): self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image) # draw grid if self.draw_grid: color_image = self.draw_grid_lines(color_image, detected_marker_data) color_image = self.draw_robot_pos(color_image, detected_marker_data) # draw drag if self.drag_line_start is not None and self.drag_line_end is not None: color_image = cv2.line(color_image, self.drag_line_start, self.drag_line_end, color=(0, 0, 255), thickness=2) # compute frame rate self.fps_overlay.setText(f"fps = {self.fps_counter.get_fps():.1f}") # Show images color_image_rgb = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) # convert to RGB if display_mode == 'color': self.img.setImage(np.transpose(np.flipud(color_image_rgb), axes=(1, 0, 2))) elif display_mode == 'grayscale': self.img.setImage(np.transpose(np.flipud(gray))) QtCore.QTimer.singleShot(1, self.process_frame) def handleKeyPressEvent(self, ev): key = ev.key() self.event_queue.put(('key', key)) if key == QtCore.Qt.Key_Q: if self.pipeline is not None: # Stop streaming self.pipeline.stop() self.app.quit() elif key == QtCore.Qt.Key_I: self.invert_grayscale = not self.invert_grayscale def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image): # update the marker estimate with new data if marker_id in self.corner_marker_ids.values(): # for corner markers we compute the mean of all measurements s.t. the position stabilizes over time # (we assume the markers do not move) # get corresponding corner to the detected marker corner = next(filter(lambda key: self.corner_marker_ids[key] == marker_id, self.corner_marker_ids.keys())) old_estimate_x = self.corner_estimates[corner]['x'] old_estimate_y = self.corner_estimates[corner]['y'] n_estimates = self.corner_estimates[corner]['n_estimates'] x = tvec[0][0] y = -tvec[1][0] # flip y coordinate if not any([old_estimate_x is None, old_estimate_y is None]): new_estimate_x = (n_estimates * old_estimate_x + x) / (n_estimates + 1) # weighted update new_estimate_y = (n_estimates * old_estimate_y + y) / (n_estimates + 1) else: new_estimate_x = x # first estimate new_estimate_y = y self.corner_estimates[corner]['t'] = t_image self.corner_estimates[corner]['x'] = new_estimate_x self.corner_estimates[corner]['y'] = new_estimate_y self.corner_estimates[corner]['n_estimates'] = n_estimates + 1 self.corner_estimates[corner]['pixel_coordinate'] = pixel_coord_center elif marker_id in self.robot_marker_ids: # for robot markers we extract x and y position as well as the angle # here we could also implement a filter x = tvec[0][0] y = -tvec[1][0] # flip y coordinate # compute angle rot_mat, _ = cv2.Rodrigues(rvec) pose_mat = cv2.hconcat((rot_mat, tvec)) _, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat) angle = -euler_angles[2][0] * np.pi / 180.0 self.robot_marker_estimates[marker_id] = {'t': float(t_image), 'x': float(x), 'y': float(y), 'angle': float(angle)} def all_corners_detected(self): # checks if all corner markers have been detected at least once return not any([estimate['n_estimates'] == 0 for estimate in self.corner_estimates.values()]) def all_robots_detected(self): # checks if all robot markers have been detected at least once return not any([estimate['t'] is None for estimate in self.robot_marker_estimates.values()]) def get_pos_from_grid_point(self, x, y, dimx, dimy, orientation=None): """ returns the position for the given grid point based on the current corner estimates :param x: x position on the grid ( 0 &le x < number of grid columns) :param y: y position on the grid ( 0 &le x < number of grid rows) :param orientation: (optional) orientation in the given grid cell (one of ^, >, v, < ) :return: numpy array with corresponding real world x- and y-position if orientation was specified the array also contains the matching angle for the orientation """ if not self.all_corners_detected(): #raise RuntimeError("not all corner markers have been detected yet") a = np.array([0,1]) b = np.array([1,1]) c = np.array([1,0]) d = np.array([0,0]) else: # compute column line a = np.array([self.corner_estimates['a']['x'], self.corner_estimates['a']['y']]) b = np.array([self.corner_estimates['b']['x'], self.corner_estimates['b']['y']]) c = np.array([self.corner_estimates['c']['x'], self.corner_estimates['c']['y']]) d = np.array([self.corner_estimates['d']['x'], self.corner_estimates['d']['y']]) x_frac = (x + 0.5) / dimx y_frac = (y + 0.5) / dimy vab = b - a vdc = c - d column_line_top = a + x_frac * vab column_line_bottom = d + x_frac * vdc vad = d - a vbc = c - b row_line_top = a + y_frac * vad row_line_bottom = b + y_frac * vbc column_line_orig = LineString([column_line_top, column_line_bottom]) row_line_orig = LineString([row_line_top, row_line_bottom]) column_line = scale(column_line_orig, 10, 10) row_line = scale(row_line_orig, 10, 10) int_pt = column_line.intersection(row_line) point_of_intersection = np.array([int_pt.x, int_pt.y]) if orientation is not None: # compute angle corresponding to the orientation w.r.t. the grid # TODO: test this code angle_ab = np.arctan2(vab[1], vab[0]) angle_dc = np.arctan2(vdc[1], vdc[0]) angle_ad = np.arctan2(vad[1], vad[0]) angle_bc = np.arctan2(vbc[1], vbc[0]) angle = 0.0 if orientation == '>': angle = y_frac * angle_ab + (1 - y_frac) * angle_dc elif orientation == '<': angle = y_frac * angle_ab + (1 - y_frac) * angle_dc + np.pi elif orientation == 'v': angle = x_frac * angle_ad + (1 - x_frac) * angle_bc elif orientation == '^': angle = x_frac * angle_ad + (1 - x_frac) * angle_bc + np.pi return np.array((point_of_intersection[0], point_of_intersection[1], angle)) else: return point_of_intersection def get_grid_point_from_pos(self): # TODO return the nearest grid point for the given position estimate pass def print_corner_estimates(self): for key, value in self.corner_estimates.items(): if value['n_estimates'] != 0: print(f"corner marker {key} at pos ({value['x']},{value['y']})") def draw_corner_line(self, frame, corner_1, corner_2): # draws a line between the given markers onto the given frame corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate'] corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate'] if corner_1_center is not None and corner_2_center is not None: frame = cv2.line(frame, tuple(corner_1_center.astype(int)), tuple(corner_2_center.astype(int)), color=(0, 0, 255), thickness=2) return frame def draw_grid_lines(self, frame, detected_marker_data): # draws a grid onto the given frame frame = self.draw_corner_line(frame, 'a', 'b') frame = self.draw_corner_line(frame, 'b', 'c') frame = self.draw_corner_line(frame, 'c', 'd') frame = self.draw_corner_line(frame, 'd', 'a') if not any([estimate['pixel_coordinate'] is None for estimate in self.corner_estimates.values()]): # compute outlines of board a = self.corner_estimates['a']['pixel_coordinate'] b = self.corner_estimates['b']['pixel_coordinate'] c = self.corner_estimates['c']['pixel_coordinate'] d = self.corner_estimates['d']['pixel_coordinate'] # draw vertical lines vab = b - a vdc = c - d for x in range(1, self.grid_columns): column_line_top = a + x / self.grid_columns * vab column_line_bottom = d + x / self.grid_columns * vdc frame = cv2.line(frame, tuple(column_line_top.astype(int)), tuple(column_line_bottom.astype(int)), color=(0, 255, 0), thickness=1) # draw horizontal lines vad = d - a vbc = c - b for y in range(1, self.grid_rows): row_line_top = a + y / self.grid_rows * vad row_line_bottom = b + y / self.grid_rows * vbc frame = cv2.line(frame, tuple(row_line_top.astype(int)), tuple(row_line_bottom.astype(int)), color=(0, 255, 0), thickness=1) return frame def get_robot_state_estimate(self, marker_id): if marker_id in self.robot_marker_estimates: if self.robot_marker_estimates[marker_id]['t'] is not None: return np.array([self.robot_marker_estimates[marker_id]['x'], self.robot_marker_estimates[marker_id]['y'], self.robot_marker_estimates[marker_id]['angle']]) else: print(f"error: no estimate available for robot {marker_id}") return None else: print(f"error: invalid robot id {marker_id}") return None def draw_robot_pos(self, frame, detected_marker_data): # draws information about the robot positions onto the given frame robot_corners_pixel_coords = {} for marker_id, estimate in self.robot_marker_estimates.items(): if marker_id in detected_marker_data.keys(): robot_corners_pixel_coords[marker_id] = tuple(detected_marker_data[marker_id]['marker_center']) for marker_id, coord in robot_corners_pixel_coords.items(): x = self.robot_marker_estimates[marker_id]['x'] y = self.robot_marker_estimates[marker_id]['y'] angle = self.robot_marker_estimates[marker_id]['angle'] #frame = cv2.putText(frame, "pos = ({:5.3f}, {:5.3f}), ang = {:5.3f}".format(x, y, angle), coord, # cv2.FONT_HERSHEY_SIMPLEX, 0.50, (0, 255, 0)) frame = cv2.putText(frame, f"{marker_id}", coord, cv2.FONT_HERSHEY_PLAIN, 2, (0, 255, 255), thickness=4) return frame if __name__ == "__main__": estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14]) estimator.process_frame() import sys if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'): QtGui.QApplication.instance().exec_()