RoboRally/remote_control/aruco_estimator.py

564 lines
26 KiB
Python

import pyrealsense2 as rs
import numpy as np
import cv2
import os
import time
import math
from pyqtgraph.Qt import QtCore, QtGui
from pyqtgraph.parametertree import Parameter, ParameterTree, parameterTypes
import pyqtgraph as pg
from shapely.geometry import LineString
from queue import Queue
import aruco
class FPSCounter:
def __init__(self):
self.fps_start_time = time.time()
self.fps_counter = 0
self.fps_display_rate = 1.0
def get_fps(self):
self.fps_counter += 1
delta_t = time.time() - self.fps_start_time
if delta_t > self.fps_display_rate:
self.fps_counter = 0
self.fps_start_time = time.time()
return self.fps_counter / delta_t
## test add/remove
## this group includes a menu allowing the user to add new parameters into its child list
class RobotMarkerGroup(parameterTypes.GroupParameter):
def __init__(self, **opts):
opts['type'] = 'group'
opts['addText'] = "Add Marker"
opts['addList'] = ['Robot']
parameterTypes.GroupParameter.__init__(self, **opts)
def addNew(self, typ):
current_robots = self.children()
current_indices = [int(r.name().split(' ')[1]) for r in current_robots]
new_index = len(current_indices) + 1
val = 0
self.addChild(dict(name=f"Robot {new_index}", type='int', value=val, removable=False,
renamable=True))
class CornerMarkerGroup(parameterTypes.GroupParameter):
def __init__(self, **opts):
opts['type'] = 'group'
opts['addText'] = "Add Marker"
opts['addList'] = ['Corner']
parameterTypes.GroupParameter.__init__(self, **opts)
def addNew(self, typ):
current_corners = self.children()
current_chars = [str(r.name().split(' ')[1]) for r in current_corners]
new_char = chr(ord(current_chars[-1]) + 1)
val = 0
self.addChild(dict(name=f"Corner {new_char}", type='int', value=val, removable=False))
class ArucoEstimator:
def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=7, grid_rows=4):
self.app = QtGui.QApplication([])
## Create window with GraphicsView widget
self.win = QtGui.QWidget()
self.layout = QtGui.QGridLayout()
self.win.setLayout(self.layout)
self.win.keyPressEvent = self.handleKeyPressEvent
self.win.setWindowTitle('ArucoEstimator')
self.plotwidget = pg.PlotWidget()
self.layout.addWidget(self.plotwidget)
## lock the aspect ratio so pixels are always square
self.plotwidget.setAspectLocked(True)
self.plotwidget.getPlotItem().getAxis('left').hide()
self.plotwidget.getPlotItem().getAxis('bottom').hide()
## Create image item
self.img = pg.ImageItem(border='w')
self.img.setLevels([[0, 255], [0, 255], [0, 255]])
self.img.mouseClickEvent = self.handleMouseEvent
self.plotwidget.addItem(self.img)
# fps display
self.fps_counter = FPSCounter()
self.fps_overlay = pg.TextItem('fps = 0', color=(255, 255, 0), anchor=(0,1))
self.plotwidget.addItem(self.fps_overlay)
self.invert_grayscale = False
self.draw_grid = True
self.draw_markers = True
self.draw_marker_coordinate_system = False
self.corner_marker_size = 0.075
self.corner_marker_ids = {
'a': 0,
'b': 1,
'c': 2,
'd': 3
}
self.corner_estimates = {
'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
}
self.grid_columns = grid_columns
self.grid_rows = grid_rows
self.robot_marker_size = 0.07
if robot_marker_ids is None:
robot_marker_ids = []
self.robot_marker_ids = robot_marker_ids
self.robot_marker_estimates = dict([(marker_id, {'t': None, 'x': None, 'y': None, 'angle': None})
for marker_id in self.robot_marker_ids])
robot_marker_group = [{'name': f'Robot {ind}', 'type': 'int', 'value': marker_id} for ind, marker_id in
enumerate(self.robot_marker_ids, 1)]
corner_marker_group = [{'name': f'Corner {letter}', 'type': 'int', 'value': marker_id} for letter, marker_id in
self.corner_marker_ids.items()]
self.threshold = 10.0
# parameter
params_spec = [
{'name': 'Corner marker size', 'type': 'float', 'value': self.corner_marker_size, 'siPrefix': True,
'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001},
{'name': 'Robot marker size', 'type': 'float', 'value': self.robot_marker_size, 'siPrefix': True,
'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001},
{'name': 'Draw markers', 'type': 'bool', 'value': self.draw_markers},
{'name': 'Draw marker coordinate system', 'type': 'bool', 'value': self.draw_marker_coordinate_system},
{'name': 'Threshold', 'type': 'float', 'value': self.threshold},
{'name': 'Invert grayscale', 'type': 'bool', 'value': self.invert_grayscale, 'tip': "Invert grayscale image before marker detection"},
{'name': 'Show FPS', 'type': 'bool', 'value': True, 'tip': "Display frames per second counter"},
{'name': 'Draw grid', 'type': 'bool', 'value': self.draw_grid, 'tip': "Draw grid spanned by the markers 0 - 3"},
{'name': 'Grid columns', 'type': 'int', 'value': self.grid_columns, 'tip': "Number of columns for the grid"},
{'name': 'Grid rows', 'type': 'int', 'value': self.grid_rows, 'tip': "Number of rows for the grid"},
{'name': 'Display mode', 'type': 'list', 'values': ['color', 'grayscale'], 'value': 'color', 'tip': "Display mode for the video"},
{'name': 'Autoexposure', 'type': 'bool', 'value': True},
{'name': 'Controlled robot', 'type': 'list', 'values': self.robot_marker_ids, 'tip': 'Robot to control'},
RobotMarkerGroup(name="Robot markers", children=robot_marker_group),
CornerMarkerGroup(name="Corner markers", children=corner_marker_group),
]
self.params = Parameter.create(name='params', type='group', children=params_spec)
self.params.param('Invert grayscale').sigValueChanged.connect(lambda _, v: self.__setattr__('invert_grayscale', v))
self.params.param('Threshold').sigValueChanged.connect(lambda _, v: self.__setattr__('threshold', v))
self.params.param('Draw markers').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_markers', v))
self.params.param('Draw marker coordinate system').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_marker_coordinate_system', v))
self.params.param('Draw grid').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_grid', v))
self.params.param('Grid columns').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_columns', v))
self.params.param('Grid rows').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_rows', v))
self.params.param('Corner marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('corner_marker_size', v))
self.params.param('Robot marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('robot_marker_size', v))
self.params.param('Show FPS').sigValueChanged.connect(lambda _, v: self.fps_overlay.show() if v else self.fps_overlay.hide())
self.params.param('Autoexposure').sigValueChanged.connect(lambda _, v: self.set_autoexposure(v))
self.params.param('Controlled robot').sigValueChanged.connect(lambda _, v: self.event_queue.put(('controlled_robot',
{'robot_id': v})))
self.paramtree = ParameterTree()
self.paramtree.setParameters(self.params, showTop=False)
self.layout.addWidget(self.paramtree)
self.win.show() ## show widget alone in its own window
self.event_queue = Queue()
if use_realsense: # check if realsense camera is connected
# Configure depth and color streams
self.pipeline = rs.pipeline()
config = rs.config()
# config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30)
config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
# Start streaming
self.pipeline.start(config)
# enable auto exposure
self.set_autoexposure(self.params['Autoexposure'])
camera_intrinsics = self.pipeline.get_active_profile().get_stream(
rs.stream.color).as_video_stream_profile().get_intrinsics()
self.camera_matrix = np.zeros((3, 3))
self.camera_matrix[0][0] = camera_intrinsics.fx
self.camera_matrix[1][1] = camera_intrinsics.fy
self.camera_matrix[0][2] = camera_intrinsics.ppx
self.camera_matrix[1][2] = camera_intrinsics.ppy
self.dist_coeffs = np.array(camera_intrinsics.coeffs)
# more info: https://dev.intelrealsense.com/docs/projection-in-intel-realsense-sdk-20
else:
# use other camera
self.cv_camera = cv2.VideoCapture(0)
self.pipeline = None
# create detector and get parameters
# print detector parameters
# print("detector params:")
# print(self.detector_params)
# for val in dir(self.detector_params):
# if not val.startswith("__"):
# print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
self.camparam = aruco.CameraParameters()
if use_realsense:
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "realsense.yml"))
else:
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "dfk72_6mm_param2.yml"))
self.drag_line_end = None
self.drag_line_start = None
self.previous_click = None
def set_autoexposure(self, v):
if self.pipeline is not None:
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
color_sensor.set_option(rs.option.enable_auto_exposure, v)
print(color_sensor.get_option(rs.option.enable_auto_exposure))
def compute_clicked_position(self, px, py):
if self.all_corners_detected():
# inter/extrapolate from clicked point to marker position
px1 = self.corner_estimates['a']['pixel_coordinate'][0]
px3 = self.corner_estimates['c']['pixel_coordinate'][0]
py1 = self.corner_estimates['a']['pixel_coordinate'][1]
py3 = self.corner_estimates['c']['pixel_coordinate'][1]
x1 = self.corner_estimates['a']['x']
x3 = self.corner_estimates['c']['x']
y1 = self.corner_estimates['a']['y']
y3 = self.corner_estimates['c']['y']
alpha = (px - px1) / (px3 - px1)
beta = (py - py1) / (py3 - py1)
print(f"alpha = {alpha}, beta = {beta}")
target_x = x1 + alpha * (x3 - x1)
target_y = y1 + beta * (y3 - y1)
target = np.array([target_x, target_y])
else:
print("not all markers have been detected!")
target = np.array([px, -py])
return target
def handleMouseEvent(self, event):
# get click position as distance to top-left corner of the image
px = event.pos().x()
py = self.img.height() - event.pos().y()
print(f"px = {px}, py = {py}")
if event.button() == QtCore.Qt.MouseButton.LeftButton:
# self.drag_line_start = (px, py)
# elif event == cv2.EVENT_LBUTTONUP:
self.drag_line_end = (px, py)
self.drag_line_start = (px, py)
target_pos = self.compute_clicked_position(self.drag_line_start[0], self.drag_line_start[1])
if self.drag_line_start != self.drag_line_end:
# compute target angle for clicked position
facing_pos = self.compute_clicked_position(px, py)
v = facing_pos - target_pos
target_angle = math.atan2(v[1], v[0])
else:
# determine angle from previously clicked pos (= self.drag_line_end)
if self.previous_click is not None:
previous_pos = self.compute_clicked_position(self.previous_click[0], self.previous_click[1])
v = target_pos - previous_pos
target_angle = math.atan2(v[1], v[0])
else:
target_angle = 0.0
target = np.array([target_pos[0], target_pos[1], target_angle])
print(target)
self.previous_click = (px, py)
self.event_queue.put(('click', {'x': target[0], 'y': target[1], 'angle': target[2]}))
self.drag_line_start = None
elif event == cv2.EVENT_MOUSEMOVE:
if self.drag_line_start is not None:
self.drag_line_end = (px, py)
def process_frame(self):
if self.pipeline:
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
# Convert images to numpy arrays
color_image = np.asanyarray(color_frame.get_data())
else:
# Capture frame-by-frame
ret, color_image = self.cv_camera.read()
t_image = time.time()
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
if self.invert_grayscale:
cv2.bitwise_not(gray, gray)
detector = aruco.MarkerDetector()
detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.01)
#detector_params = detector.getParameters()
# run aruco marker detection
detected_markers = detector.detect(gray)
# detected_markers2 = detector.detect(gray)
#gray = detector.getThresholdedImage()
display_mode = self.params.param('Display mode').value()
#print(f"detected_markers = {[marker.id for marker in detected_markers]}")
#print("threshold = ", self.threshold)
# extract data for detected markers
detected_marker_data = {}
for marker in detected_markers:
if marker.id >= 0:
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
if marker.id in self.corner_marker_ids.values():
marker.calculateExtrinsics(self.corner_marker_size, self.camparam)
else:
marker.calculateExtrinsics(self.robot_marker_size, self.camparam)
detected_marker_data[marker.id]['Rvec'] = marker.Rvec
detected_marker_data[marker.id]['Tvec'] = marker.Tvec
if self.draw_markers:
marker.draw(color_image, np.array([255, 255, 255]), 2, True)
if self.draw_marker_coordinate_system:
aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
# store data
for marker_id, data in detected_marker_data.items():
self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image)
# draw grid
if self.draw_grid:
color_image = self.draw_grid_lines(color_image, detected_marker_data)
color_image = self.draw_robot_pos(color_image, detected_marker_data)
# draw drag
if self.drag_line_start is not None and self.drag_line_end is not None:
color_image = cv2.line(color_image, self.drag_line_start, self.drag_line_end, color=(0, 0, 255), thickness=2)
# compute frame rate
self.fps_overlay.setText(f"fps = {self.fps_counter.get_fps():.1f}")
# Show images
color_image_rgb = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) # convert to RGB
if display_mode == 'color':
self.img.setImage(np.transpose(np.flipud(color_image_rgb), axes=(1, 0, 2)))
elif display_mode == 'grayscale':
self.img.setImage(np.transpose(np.flipud(gray)))
QtCore.QTimer.singleShot(1, self.process_frame)
def handleKeyPressEvent(self, ev):
key = ev.key()
self.event_queue.put(('key', key))
if key == QtCore.Qt.Key_Q:
if self.pipeline is not None:
# Stop streaming
self.pipeline.stop()
self.app.quit()
elif key == QtCore.Qt.Key_I:
self.invert_grayscale = not self.invert_grayscale
def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image):
# update the marker estimate with new data
if marker_id in self.corner_marker_ids.values():
# for corner markers we compute the mean of all measurements s.t. the position stabilizes over time
# (we assume the markers do not move)
# get corresponding corner to the detected marker
corner = next(filter(lambda key: self.corner_marker_ids[key] == marker_id, self.corner_marker_ids.keys()))
old_estimate_x = self.corner_estimates[corner]['x']
old_estimate_y = self.corner_estimates[corner]['y']
n_estimates = self.corner_estimates[corner]['n_estimates']
x = tvec[0][0]
y = -tvec[1][0] # flip y coordinate
if not any([old_estimate_x is None, old_estimate_y is None]):
new_estimate_x = (n_estimates * old_estimate_x + x) / (n_estimates + 1) # weighted update
new_estimate_y = (n_estimates * old_estimate_y + y) / (n_estimates + 1)
else:
new_estimate_x = x # first estimate
new_estimate_y = y
self.corner_estimates[corner]['t'] = t_image
self.corner_estimates[corner]['x'] = new_estimate_x
self.corner_estimates[corner]['y'] = new_estimate_y
self.corner_estimates[corner]['n_estimates'] = n_estimates + 1
self.corner_estimates[corner]['pixel_coordinate'] = pixel_coord_center
elif marker_id in self.robot_marker_ids:
# for robot markers we extract x and y position as well as the angle
# here we could also implement a filter
x = tvec[0][0]
y = -tvec[1][0] # flip y coordinate
# compute angle
rot_mat, _ = cv2.Rodrigues(rvec)
pose_mat = cv2.hconcat((rot_mat, tvec))
_, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat)
angle = -euler_angles[2][0] * np.pi / 180.0
self.robot_marker_estimates[marker_id] = {'t': float(t_image), 'x': float(x), 'y': float(y), 'angle': float(angle)}
def all_corners_detected(self):
# checks if all corner markers have been detected at least once
return not any([estimate['n_estimates'] == 0 for estimate in self.corner_estimates.values()])
def all_robots_detected(self):
# checks if all robot markers have been detected at least once
return not any([estimate['t'] is None for estimate in self.robot_marker_estimates.values()])
def get_pos_from_grid_point(self, x, y, orientation=None):
"""
returns the position for the given grid point based on the current corner estimates
:param x: x position on the grid ( 0 &le x &lt number of grid columns)
:param y: y position on the grid ( 0 &le x &lt number of grid rows)
:param orientation: (optional) orientation in the given grid cell (one of ^, >, v, &lt )
:return: numpy array with corresponding real world x- and y-position
if orientation was specified the array also contains the matching angle for the orientation
"""
assert 0 <= x < self.grid_columns
assert 0 <= y < self.grid_rows
assert self.all_corners_detected()
# compute column line
a = np.array([self.corner_estimates['a']['x'], self.corner_estimates['a']['y']])
b = np.array([self.corner_estimates['b']['x'], self.corner_estimates['b']['y']])
c = np.array([self.corner_estimates['c']['x'], self.corner_estimates['c']['y']])
d = np.array([self.corner_estimates['d']['x'], self.corner_estimates['d']['y']])
x_frac = (x + 0.5) / self.grid_columns
y_frac = (y + 0.5) / self.grid_rows
vab = b - a
vdc = c - d
column_line_top = a + x_frac * vab
column_line_bottom = d + x_frac * vdc
vad = d - a
vbc = c - b
row_line_top = a + y_frac * vad
row_line_bottom = b + y_frac * vbc
column_line = LineString([column_line_top, column_line_bottom])
row_line = LineString([row_line_top, row_line_bottom])
int_pt = column_line.intersection(row_line)
point_of_intersection = np.array([int_pt.x, int_pt.y])
if orientation is not None:
# compute angle corresponding to the orientation w.r.t. the grid
# TODO: test this code
angle_ab = np.arctan2(vab[1], vab[0])
angle_dc = np.arctan2(vdc[1], vdc[0])
angle_ad = np.arctan2(vad[1], vad[0])
angle_bc = np.arctan2(vbc[1], vbc[0])
angle = 0.0
if orientation == '>':
angle = y_frac * angle_ab + (1 - y_frac) * angle_dc
elif orientation == '<':
angle = y_frac * angle_ab + (1 - y_frac) * angle_dc + np.pi
elif orientation == 'v':
angle = x_frac * angle_ad + (1 - x_frac) * angle_bc
elif orientation == '^':
angle = x_frac * angle_ad + (1 - x_frac) * angle_bc + np.pi
return np.array((point_of_intersection[0], point_of_intersection[1], angle))
else:
return point_of_intersection
def get_grid_point_from_pos(self):
# TODO return the nearest grid point for the given position estimate
pass
def print_corner_estimates(self):
for key, value in self.corner_estimates.items():
if value['n_estimates'] != 0:
print(f"corner marker {key} at pos ({value['x']},{value['y']})")
def draw_corner_line(self, frame, corner_1, corner_2):
# draws a line between the given markers onto the given frame
corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate']
corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate']
if corner_1_center is not None and corner_2_center is not None:
frame = cv2.line(frame, tuple(corner_1_center.astype(int)), tuple(corner_2_center.astype(int)), color=(0, 0, 255), thickness=2)
return frame
def draw_grid_lines(self, frame, detected_marker_data):
# draws a grid onto the given frame
frame = self.draw_corner_line(frame, 'a', 'b')
frame = self.draw_corner_line(frame, 'b', 'c')
frame = self.draw_corner_line(frame, 'c', 'd')
frame = self.draw_corner_line(frame, 'd', 'a')
if not any([estimate['pixel_coordinate'] is None for estimate in self.corner_estimates.values()]):
# compute outlines of board
a = self.corner_estimates['a']['pixel_coordinate']
b = self.corner_estimates['b']['pixel_coordinate']
c = self.corner_estimates['c']['pixel_coordinate']
d = self.corner_estimates['d']['pixel_coordinate']
# draw vertical lines
vab = b - a
vdc = c - d
for x in range(1, self.grid_columns):
column_line_top = a + x / self.grid_columns * vab
column_line_bottom = d + x / self.grid_columns * vdc
frame = cv2.line(frame, tuple(column_line_top.astype(int)), tuple(column_line_bottom.astype(int)), color=(0, 255, 0),
thickness=1)
# draw horizontal lines
vad = d - a
vbc = c - b
for y in range(1, self.grid_rows):
row_line_top = a + y / self.grid_rows * vad
row_line_bottom = b + y / self.grid_rows * vbc
frame = cv2.line(frame, tuple(row_line_top.astype(int)), tuple(row_line_bottom.astype(int)), color=(0, 255, 0),
thickness=1)
return frame
def get_robot_state_estimate(self, marker_id):
if marker_id in self.robot_marker_estimates:
if self.robot_marker_estimates[marker_id]['t'] is not None:
return np.array([self.robot_marker_estimates[marker_id]['x'],
self.robot_marker_estimates[marker_id]['y'],
self.robot_marker_estimates[marker_id]['angle']])
else:
print(f"error: no estimate available for robot {marker_id}")
return None
else:
print(f"error: invalid robot id {marker_id}")
return None
def draw_robot_pos(self, frame, detected_marker_data):
# draws information about the robot positions onto the given frame
robot_corners_pixel_coords = {}
for marker_id, estimate in self.robot_marker_estimates.items():
if marker_id in detected_marker_data.keys():
robot_corners_pixel_coords[marker_id] = tuple(detected_marker_data[marker_id]['marker_center'])
for marker_id, coord in robot_corners_pixel_coords.items():
x = self.robot_marker_estimates[marker_id]['x']
y = self.robot_marker_estimates[marker_id]['y']
angle = self.robot_marker_estimates[marker_id]['angle']
#frame = cv2.putText(frame, "pos = ({:5.3f}, {:5.3f}), ang = {:5.3f}".format(x, y, angle), coord,
# cv2.FONT_HERSHEY_SIMPLEX, 0.50, (0, 255, 0))
frame = cv2.putText(frame, f"{marker_id}", coord, cv2.FONT_HERSHEY_PLAIN, 2, (0, 255, 255), thickness=4)
return frame
if __name__ == "__main__":
estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14])
estimator.process_frame()
import sys
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()