RoboRally/remote_control/aruco_estimator.py

380 lines
17 KiB
Python

import pyrealsense2 as rs
import numpy as np
import cv2
import os
import time
from shapely.geometry import LineString
from queue import Queue
import aruco
class ArucoEstimator:
corner_marker_ids = {
'a': 0,
'b': 1,
'c': 2,
'd': 3
}
angles = []
corner_estimates = {
'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0 },
'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0 },
'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0 },
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0 },
}
def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8, draw_marker_coordinate_system=False):
self.grid_columns = grid_columns
self.grid_rows = grid_rows
if robot_marker_ids is None:
robot_marker_ids = []
self.robot_marker_ids = robot_marker_ids
self.robot_marker_estimates = dict([(id, None) for id in self.robot_marker_ids])
self.event_queue = Queue()
self.draw_marker_coordinate_system = draw_marker_coordinate_system
if use_realsense: # check if realsense camera is connected
# Configure depth and color streams
self.pipeline = rs.pipeline()
config = rs.config()
# config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
#config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30)
config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
# config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
# Start streaming
self.pipeline.start(config)
camera_intrinsics = self.pipeline.get_active_profile().get_stream(
rs.stream.color).as_video_stream_profile().get_intrinsics()
self.camera_matrix = np.zeros((3, 3))
self.camera_matrix[0][0] = camera_intrinsics.fx
self.camera_matrix[1][1] = camera_intrinsics.fy
self.camera_matrix[0][2] = camera_intrinsics.ppx
self.camera_matrix[1][2] = camera_intrinsics.ppy
self.dist_coeffs = np.array(camera_intrinsics.coeffs)
# more info: https://dev.intelrealsense.com/docs/projection-in-intel-realsense-sdk-20
else:
# use other camera
self.cv_camera = cv2.VideoCapture(0)
self.pipeline = None
# create detector and get parameters
self.detector = aruco.MarkerDetector()
#self.detector.setDictionary('ARUCO_MIP_36h12')
#self.detector.setDictionary('ARUCO_MIP_16h3')
#self.detector.setDictionary('ARUCO')
#self.detector.setDetectionMode(aruco.DM_NORMAL, 0.05)
#self.detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.05)
self.detector_params = self.detector.getParameters()
# print detector parameters
print("detector params:")
for val in dir(self.detector_params):
if not val.startswith("__"):
print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
self.camparam = aruco.CameraParameters()
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "dfk72_6mm_param2.yml"))
def mouse_callback(self, event, px, py, flags, param):
"""
This function is called for each mouse event inside the opencv window.
If there are reference markers available we compute the real world position corresponding to the clicked
position and put it in an event queue.
:param event: type of event (mouse movement, left click, right click, etc.)
:param px: x-position of event
:param py: y-position of event
"""
if event == cv2.EVENT_LBUTTONDOWN:
if self.all_corners_detected():
# inter/extrapolate from clicked point to marker position
px1 = self.corner_estimates['a']['pixel_coordinate'][0]
px3 = self.corner_estimates['c']['pixel_coordinate'][0]
py1 = self.corner_estimates['a']['pixel_coordinate'][1]
py3 = self.corner_estimates['c']['pixel_coordinate'][1]
x1 = self.corner_estimates['a']['x']
x3 = self.corner_estimates['c']['x']
y1 = self.corner_estimates['a']['y']
y3 = self.corner_estimates['c']['y']
alpha = (px - px1) / (px3 - px1)
beta = (py - py1) / (py3 - py1)
print(f"alpha = {alpha}, beta = {beta}")
target_x = x1 + alpha * (x3 - x1)
target_y = y1 + beta * (y3 - y1)
target = np.array([target_x, target_y, 0])
print(f"target = ({target_x},{target_y})")
self.event_queue.put(('click', target))
else:
print("not all markers have been detected!")
def run_tracking(self):
"""
Run the marker tracking in a loop
"""
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
cv2.setMouseCallback('RoboRally', self.mouse_callback)
try:
running = True
while running:
if self.pipeline:
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
if not color_frame:
continue
# Convert images to numpy arrays
color_image = np.asanyarray(color_frame.get_data())
else:
# Capture frame-by-frame
ret, color_image = self.cv_camera.read()
t_image = time.time()
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
# run aruco marker detection
detected_markers = self.detector.detect(gray)
# extract data for detected markers
detected_marker_data = {}
for marker in detected_markers:
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
if marker.id > 0:
if marker.id in self.corner_marker_ids.values():
marker.calculateExtrinsics(0.1, self.camparam)
else:
marker.calculateExtrinsics(0.07, self.camparam)
detected_marker_data[marker.id]['Rvec'] = marker.Rvec
detected_marker_data[marker.id]['Tvec'] = marker.Tvec
if marker.id > 0: # draw markers onto the image
marker.draw(color_image, np.array([255, 255, 255]), 2, True)
if self.draw_marker_coordinate_system:
aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
# store data
for id, data in detected_marker_data.items():
self.update_estimate(id, data['marker_center'], data['Rvec'], data['Tvec'], t_image)
# draw grid
color_image = self.draw_grid_lines(color_image, detected_marker_data)
color_image = self.draw_robot_pos(color_image, detected_marker_data)
# Show images
cv2.imshow('RoboRally', color_image)
key = cv2.waitKey(1)
if key > 0:
self.event_queue.put(('key', key))
if key == ord('q'):
running = False
finally:
cv2.destroyAllWindows()
if self.pipeline is not None:
# Stop streaming
self.pipeline.stop()
def update_estimate(self, id, pixel_coord_center, rvec, tvec, t_image):
# update the marker estimate with new data
if id in self.corner_marker_ids.values():
# for corner markers we compute the mean of all measurements s.t. the position stabilizes over time
# (we assume the markers do not move)
# get corresponding corner to the detected marker
corner = next(filter(lambda key: self.corner_marker_ids[key] == id, self.corner_marker_ids.keys()))
old_estimate_x = self.corner_estimates[corner]['x']
old_estimate_y = self.corner_estimates[corner]['y']
n_estimates = self.corner_estimates[corner]['n_estimates']
x = tvec[0][0]
y = -tvec[1][0] # flip y coordinate
if not any([old_estimate_x is None, old_estimate_y is None]):
new_estimate_x = (n_estimates * old_estimate_x + x) / (n_estimates + 1) # weighted update
new_estimate_y = (n_estimates * old_estimate_y + y) / (n_estimates + 1)
else:
new_estimate_x = x # first estimate
new_estimate_y = y
self.corner_estimates[corner]['t'] = t_image
self.corner_estimates[corner]['x'] = new_estimate_x
self.corner_estimates[corner]['y'] = new_estimate_y
self.corner_estimates[corner]['n_estimates'] = n_estimates + 1
self.corner_estimates[corner]['pixel_coordinate'] = pixel_coord_center
elif id in self.robot_marker_ids:
# for robot markers we extract x and y position as well as the angle
# here we could also implement a filter
x = tvec[0][0]
y = -tvec[1][0] # flip y coordinate
# compute angle
rot_mat, _ = cv2.Rodrigues(rvec)
pose_mat = cv2.hconcat((rot_mat, tvec))
_, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat)
angle = -euler_angles[2][0] * np.pi / 180.0
self.angles.append(angle)
self.robot_marker_estimates[id] = {'t': t_image, 'x': x, 'y': y, 'angle': angle}
def all_corners_detected(self):
# checks if all corner markers have been detected at least once
return not any([estimate['n_estimates'] == 0 for estimate in self.corner_estimates.values()])
def all_robots_detected(self):
# checks if all robot markers have been detected at least once
return not any([estimate is None for estimate in self.robot_marker_estimates.values()])
def get_pos_from_grid_point(self, x, y, orientation=None):
"""
returns the position for the given grid point based on the current corner estimates
:param x: x position on the grid ( 0 &le x &lt number of grid columns)
:param y: y position on the grid ( 0 &le x &lt number of grid rows)
:param orientation: (optional) orientation in the given grid cell (one of ^, >, v, &lt )
:return: numpy array with corresponding real world x- and y-position
if orientation was specified the array also contains the matching angle for the orientation
"""
assert x >= 0 and x < self.grid_columns
assert y >= 0 and y < self.grid_rows
assert self.all_corners_detected()
# compute column line
a = np.array([self.corner_estimates['a']['x'], self.corner_estimates['a']['y']])
b = np.array([self.corner_estimates['b']['x'], self.corner_estimates['b']['y']])
c = np.array([self.corner_estimates['c']['x'], self.corner_estimates['c']['y']])
d = np.array([self.corner_estimates['d']['x'], self.corner_estimates['d']['y']])
x_frac = (x + 0.5) / self.grid_columns
y_frac = (y + 0.5) / self.grid_rows
vab = b - a
vdc = c - d
column_line_top = a + x_frac * vab
column_line_bottom = d + x_frac * vdc
vad = d - a
vbc = c - b
row_line_top = a + y_frac * vad
row_line_bottom = b + y_frac * vbc
column_line = LineString([column_line_top, column_line_bottom])
row_line = LineString([row_line_top, row_line_bottom])
int_pt = column_line.intersection(row_line)
point_of_intersection = np.array([int_pt.x, int_pt.y])
if orientation is not None:
# compute angle corresponding to the orientation w.r.t. the grid
# TODO: test this code
angle_ab = np.arctan2(vab[1], vab[0])
angle_dc = np.arctan2(vdc[1], vdc[0])
angle_ad = np.arctan2(vad[1], vad[0])
angle_bc = np.arctan2(vbc[1], vbc[0])
angle = 0.0
if orientation == '>':
angle = y_frac * angle_ab + (1 - y_frac) * angle_dc
elif orientation == '<':
angle = y_frac * angle_ab + (1 - y_frac) * angle_dc + np.pi
elif orientation == 'v':
angle = x_frac * angle_ad + (1 - x_frac) * angle_bc
elif orientation == '^':
angle = x_frac * angle_ad + (1 - x_frac) * angle_bc + np.pi
return np.array((point_of_intersection[0], point_of_intersection[1], angle))
else:
return point_of_intersection
def get_grid_point_from_pos(self):
# TODO return the nearest grid point for the given position estimate
pass
def print_corner_estimates(self):
for key, value in self.corner_estimates.items():
if value['n_estimates'] != 0:
print(f"corner marker {key} at pos ({value['x']},{value['y']})")
def draw_corner_line(self, frame, corner_1, corner_2):
# draws a line between the given markers onto the given frame
corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate']
corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate']
if corner_1_center is not None and corner_2_center is not None:
frame = cv2.line(frame, corner_1_center, corner_2_center, color=(0, 0, 255), thickness=2)
return frame
def draw_grid_lines(self, frame, detected_marker_data):
# draws a grid onto the given frame
frame = self.draw_corner_line(frame, 'a', 'b')
frame = self.draw_corner_line(frame, 'b', 'c')
frame = self.draw_corner_line(frame, 'c', 'd')
frame = self.draw_corner_line(frame, 'd', 'a')
if not any([estimate['pixel_coordinate'] is None for estimate in self.corner_estimates.values()]):
# compute outlines of board
a = self.corner_estimates['a']['pixel_coordinate']
b = self.corner_estimates['b']['pixel_coordinate']
c = self.corner_estimates['c']['pixel_coordinate']
d = self.corner_estimates['d']['pixel_coordinate']
# draw vertical lines
vab = b - a
vdc = c - d
for x in range(1,self.grid_columns):
column_line_top = a + x / self.grid_columns * vab
column_line_bottom = d + x / self.grid_columns * vdc
frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(0, 255, 0),
thickness=1)
# draw horizontal lines
vad = d - a
vbc = c - b
for y in range(1, self.grid_rows):
row_line_top = a + y / self.grid_rows * vad
row_line_bottom = b + y / self.grid_rows * vbc
frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(0, 255, 0),
thickness=1)
return frame
def get_robot_state_estimate(self, id):
if id in self.robot_marker_estimates:
if self.robot_marker_estimates[id] is not None:
return np.array([self.robot_marker_estimates[id]['x'], self.robot_marker_estimates[id]['y'],
self.robot_marker_estimates[id]['angle']])
else:
print(f"error: no estimate available for robot {id}")
return None
else:
print(f"error: invalid robot id {id}")
return None
def draw_robot_pos(self, frame, detected_marker_data):
# draws information about the robot positions onto the given frame
robot_corners_pixel_coords = {}
for id, estimate in self.robot_marker_estimates.items():
if id in detected_marker_data.keys():
robot_corners_pixel_coords[id] = tuple(detected_marker_data[id]['center'])
for id, coord in robot_corners_pixel_coords.items():
x = self.robot_marker_estimates[id]['x']
y = self.robot_marker_estimates[id]['y']
angle = self.robot_marker_estimates[id]['angle']
frame = cv2.putText(frame, "pos = ({:5.3f}, {:5.3f}), ang = {:5.3f}".format(x, y, angle), coord,
cv2.FONT_HERSHEY_SIMPLEX, 0.50, (0,255,0))
return frame
if __name__ == "__main__":
estimator = ArucoEstimator(use_realsense=False, draw_marker_coordinate_system=True)
estimator.run_tracking()