2020-10-14 19:15:43 +00:00
|
|
|
import pyrealsense2 as rs
|
|
|
|
import numpy as np
|
|
|
|
import cv2
|
|
|
|
from cv2 import aruco
|
|
|
|
from shapely.geometry import LineString
|
2020-10-15 20:41:30 +00:00
|
|
|
from queue import Queue
|
2020-10-14 19:15:43 +00:00
|
|
|
|
|
|
|
class ArucoEstimator:
|
|
|
|
corner_marker_ids = {
|
|
|
|
'a': 0,
|
|
|
|
'b': 1,
|
|
|
|
'c': 2,
|
|
|
|
'd': 3
|
|
|
|
}
|
|
|
|
|
|
|
|
angles = []
|
|
|
|
|
|
|
|
corner_estimates = {
|
2020-10-18 13:16:28 +00:00
|
|
|
'a': {'pixel_coordinate': None, 'real_world_estimate': None, 'n_estimates': 0 },
|
|
|
|
'b': {'pixel_coordinate': None, 'real_world_estimate': None, 'n_estimates': 0 },
|
|
|
|
'c': {'pixel_coordinate': None, 'real_world_estimate': None, 'n_estimates': 0 },
|
|
|
|
'd': {'pixel_coordinate': None, 'real_world_estimate': None, 'n_estimates': 0 },
|
2020-10-14 19:15:43 +00:00
|
|
|
}
|
|
|
|
|
2020-10-18 16:03:33 +00:00
|
|
|
def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8):
|
|
|
|
self.grid_columns = grid_columns
|
|
|
|
self.grid_rows = grid_rows
|
|
|
|
|
2020-10-18 13:16:28 +00:00
|
|
|
if robot_marker_ids is None:
|
|
|
|
robot_marker_ids = []
|
2020-10-15 18:35:32 +00:00
|
|
|
self.robot_marker_ids = robot_marker_ids
|
|
|
|
self.robot_marker_estimates = dict([(id, None) for id in self.robot_marker_ids])
|
|
|
|
|
2020-10-15 20:41:30 +00:00
|
|
|
self.event_queue = Queue()
|
|
|
|
|
2020-10-18 13:16:28 +00:00
|
|
|
if use_realsense: # check if realsense camera is connected
|
2020-10-14 19:15:43 +00:00
|
|
|
# Configure depth and color streams
|
|
|
|
self.pipeline = rs.pipeline()
|
|
|
|
config = rs.config()
|
|
|
|
# config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
|
|
|
|
# config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30)
|
|
|
|
config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
|
|
|
|
# config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
|
|
|
|
|
|
|
|
# Start streaming
|
|
|
|
self.pipeline.start(config)
|
|
|
|
|
|
|
|
camera_intrinsics = self.pipeline.get_active_profile().get_stream(
|
|
|
|
rs.stream.color).as_video_stream_profile().get_intrinsics()
|
|
|
|
self.camera_matrix = np.zeros((3, 3))
|
|
|
|
self.camera_matrix[0][0] = camera_intrinsics.fx
|
|
|
|
self.camera_matrix[1][1] = camera_intrinsics.fy
|
|
|
|
self.camera_matrix[0][2] = camera_intrinsics.ppx
|
|
|
|
self.camera_matrix[1][2] = camera_intrinsics.ppy
|
|
|
|
self.dist_coeffs = np.array(camera_intrinsics.coeffs)
|
|
|
|
# more info: https://dev.intelrealsense.com/docs/projection-in-intel-realsense-sdk-20
|
|
|
|
else:
|
|
|
|
# use other camera
|
|
|
|
self.cv_camera = cv2.VideoCapture(0)
|
|
|
|
self.pipeline = None
|
|
|
|
|
2020-10-18 13:16:28 +00:00
|
|
|
def mouse_callback(self, event, px, py, flags, param):
|
2020-10-15 20:41:30 +00:00
|
|
|
if event == 1:
|
2020-10-18 13:16:28 +00:00
|
|
|
if self.all_corners_detected():
|
|
|
|
# inter/extrapolate from clicked point to marker position
|
|
|
|
px1 = self.corner_estimates['a']['pixel_coordinate'][0]
|
|
|
|
px3 = self.corner_estimates['c']['pixel_coordinate'][0]
|
|
|
|
py1 = self.corner_estimates['a']['pixel_coordinate'][1]
|
|
|
|
py3 = self.corner_estimates['c']['pixel_coordinate'][1]
|
|
|
|
|
|
|
|
x1 = self.corner_estimates['a']['real_world_estimate'][0]
|
|
|
|
x3 = self.corner_estimates['c']['real_world_estimate'][0]
|
|
|
|
y1 = self.corner_estimates['a']['real_world_estimate'][1]
|
|
|
|
y3 = self.corner_estimates['c']['real_world_estimate'][1]
|
|
|
|
|
|
|
|
alpha = (px - px1) / (px3 - px1)
|
|
|
|
beta = (py - py1) / (py3 - py1)
|
|
|
|
|
|
|
|
print(f"alpha = {alpha}, beta = {beta}")
|
|
|
|
|
|
|
|
target_x = x1 + alpha * (x3 - x1)
|
|
|
|
target_y = y1 + beta * (y3 - y1)
|
|
|
|
target = np.array([target_x, target_y, 0])
|
|
|
|
|
|
|
|
print(f"target = ({target_x},{target_y})")
|
|
|
|
self.event_queue.put(('click', target))
|
|
|
|
else:
|
|
|
|
print("not all markers have been detected!")
|
2020-10-15 20:41:30 +00:00
|
|
|
|
2020-10-14 19:15:43 +00:00
|
|
|
def run_tracking(self):
|
2020-10-15 20:41:30 +00:00
|
|
|
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
|
|
|
|
cv2.setMouseCallback('RoboRally', self.mouse_callback)
|
|
|
|
|
2020-10-14 19:15:43 +00:00
|
|
|
try:
|
2020-10-15 20:41:30 +00:00
|
|
|
running = True
|
|
|
|
while running:
|
2020-10-14 19:15:43 +00:00
|
|
|
if self.pipeline:
|
|
|
|
frames = self.pipeline.wait_for_frames()
|
|
|
|
color_frame = frames.get_color_frame()
|
|
|
|
if not color_frame:
|
|
|
|
continue
|
|
|
|
# Convert images to numpy arrays
|
|
|
|
color_image = np.asanyarray(color_frame.get_data())
|
|
|
|
else:
|
|
|
|
# Capture frame-by-frame
|
|
|
|
ret, color_image = self.cv_camera.read()
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
|
|
|
|
|
|
|
|
# aruco_dict = aruco.Dictionary_get(aruco.DICT_5X5_250)
|
|
|
|
aruco_dict = aruco.Dictionary_get(aruco.DICT_ARUCO_ORIGINAL)
|
|
|
|
parameters = aruco.DetectorParameters_create()
|
|
|
|
corners, ids, rejectedImgPoints = aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
|
|
|
|
frame = aruco.drawDetectedMarkers(color_image.copy(), corners, ids)
|
|
|
|
|
2020-10-18 13:16:28 +00:00
|
|
|
if ids is not None:
|
|
|
|
for id, corner in zip(ids, corners):
|
|
|
|
corner_pixel_coord = np.mean(corner[0], axis=0)
|
|
|
|
res = aruco.estimatePoseSingleMarkers(corner, 0.10, self.camera_matrix, self.dist_coeffs)
|
2020-10-15 18:35:32 +00:00
|
|
|
rvecs = res[0][0][0]
|
|
|
|
tvecs = res[1][0][0]
|
2020-10-14 19:15:43 +00:00
|
|
|
|
2020-10-18 13:16:28 +00:00
|
|
|
self.update_estimate(id[0], corner_pixel_coord, rvecs, tvecs)
|
2020-10-14 19:15:43 +00:00
|
|
|
|
|
|
|
frame = self.draw_grid_lines(frame, corners, ids)
|
|
|
|
frame = self.draw_robot_pos(frame, corners, ids)
|
2020-10-15 18:35:32 +00:00
|
|
|
else:
|
|
|
|
# pretent we detected some markers
|
|
|
|
pass
|
2020-10-18 13:16:28 +00:00
|
|
|
self.update_estimate(0, rvec=np.array([0, 0, 0]), tvec=np.array([-1, 1, 0]))
|
2020-10-15 18:35:32 +00:00
|
|
|
self.update_estimate(1, rvec=np.array([0, 0, 0]), tvec=np.array([1, 1.5, 0]))
|
|
|
|
self.update_estimate(2, rvec=np.array([0, 0, 0]), tvec=np.array([1, -1.5, 0]))
|
|
|
|
self.update_estimate(3, rvec=np.array([0, 0, 0]), tvec=np.array([-1, -1, 0]))
|
|
|
|
#import random
|
|
|
|
#self.update_estimate(12, rvec=np.array([0.0, 0.0, 0.0]), tvec=np.array([-1.0 + random.random() * 2, -1.0 + random.random() * 2, 0]))
|
|
|
|
self.update_estimate(12, rvec=np.array([0.0, 0.0, 0.0]),
|
|
|
|
tvec=np.array([1.2, 0.42, 0]))
|
2020-10-18 13:16:28 +00:00
|
|
|
self.update_estimate(13, rvec=np.array([0.0, 0.0, 0.0]),
|
|
|
|
tvec=np.array([-1.2, -0.42, 0]))
|
2020-10-14 19:15:43 +00:00
|
|
|
|
|
|
|
# Show images
|
2020-10-15 20:41:30 +00:00
|
|
|
cv2.imshow('RoboRally', frame)
|
|
|
|
key = cv2.waitKey(1)
|
|
|
|
|
|
|
|
if key > 0:
|
|
|
|
self.event_queue.put(('key', key))
|
|
|
|
print('key = ', key)
|
|
|
|
if key == ord('q'):
|
|
|
|
running = False
|
2020-10-14 19:15:43 +00:00
|
|
|
finally:
|
2020-10-15 20:41:30 +00:00
|
|
|
cv2.destroyAllWindows()
|
2020-10-15 18:35:32 +00:00
|
|
|
if self.pipeline is not None:
|
|
|
|
# Stop streaming
|
|
|
|
self.pipeline.stop()
|
2020-10-14 19:15:43 +00:00
|
|
|
|
2020-10-18 13:16:28 +00:00
|
|
|
def update_estimate(self, id, pixel_coord, rvec, tvec):
|
2020-10-14 19:15:43 +00:00
|
|
|
# update the marker estimate with new data
|
|
|
|
if id in self.corner_marker_ids.values():
|
|
|
|
# for corner markers we compute the mean of all measurements s.t. the position stabilizes over time
|
|
|
|
# (we assume the markers do not move)
|
|
|
|
corner = next(filter(lambda key: self.corner_marker_ids[key] == id, self.corner_marker_ids.keys())) # get corresponding corner to the detected marker
|
2020-10-18 13:16:28 +00:00
|
|
|
old_estimate = self.corner_estimates[corner]['real_world_estimate']
|
|
|
|
n_estimates = self.corner_estimates[corner]['n_estimates']
|
|
|
|
|
2020-10-15 18:35:32 +00:00
|
|
|
tvec_proj = tvec[0:2] # projection to get rid of z coordinate
|
2020-10-14 19:15:43 +00:00
|
|
|
tvec_proj = np.array((tvec_proj[0], -tvec_proj[1])) # flip y coordinate
|
|
|
|
if old_estimate is not None:
|
|
|
|
new_estimate = (n_estimates * old_estimate + tvec_proj) / (n_estimates + 1) # weighted update
|
|
|
|
else:
|
|
|
|
new_estimate = tvec_proj # first estimate
|
2020-10-18 13:16:28 +00:00
|
|
|
self.corner_estimates[corner]['real_world_estimate'] = new_estimate
|
|
|
|
self.corner_estimates[corner]['n_estimates'] = n_estimates + 1
|
|
|
|
self.corner_estimates[corner]['pixel_coordinate'] = pixel_coord
|
|
|
|
|
2020-10-14 19:15:43 +00:00
|
|
|
elif id in self.robot_marker_ids:
|
|
|
|
# for robot markers we extract x and y position as well as the angle
|
|
|
|
# here we could also implement a filter
|
2020-10-15 18:35:32 +00:00
|
|
|
x = tvec[0]
|
|
|
|
y = -tvec[1] # flip y coordinate
|
2020-10-14 19:15:43 +00:00
|
|
|
|
|
|
|
# compute angle
|
2020-10-15 18:35:32 +00:00
|
|
|
rot_mat, _ = cv2.Rodrigues(rvec)
|
|
|
|
pose_mat = cv2.hconcat((rot_mat, tvec))
|
2020-10-14 19:15:43 +00:00
|
|
|
_, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat)
|
|
|
|
angle = -euler_angles[2][0] * np.pi / 180.0
|
|
|
|
|
|
|
|
self.angles.append(angle)
|
2020-10-15 18:35:32 +00:00
|
|
|
self.robot_marker_estimates[id] = (x, y, angle)
|
2020-10-14 19:15:43 +00:00
|
|
|
|
|
|
|
def all_corners_detected(self):
|
|
|
|
# checks if all corner markers have been detected at least once
|
2020-10-18 13:16:28 +00:00
|
|
|
return not any([estimate['n_estimates'] == 0 for estimate in self.corner_estimates.values()])
|
|
|
|
|
|
|
|
def all_robots_detected(self):
|
|
|
|
# checks if all robot markers have been detected at least once
|
|
|
|
return not any([estimate is None for estimate in self.robot_marker_estimates.values()])
|
2020-10-14 19:15:43 +00:00
|
|
|
|
2020-10-14 21:08:19 +00:00
|
|
|
def get_pos_from_grid_point(self, x, y, orientation=None):
|
|
|
|
"""
|
|
|
|
returns the position for the given grid point based on the current corner estimates
|
|
|
|
:param x: x position on the grid ( 0 &le x < number of grid columns)
|
|
|
|
:param y: y position on the grid ( 0 &le x < number of grid rows)
|
|
|
|
:param orientation: (optional) orientation in the given grid cell (one of ^, >, v, < )
|
|
|
|
:return: numpy array with corresponding real world x- and y-position
|
|
|
|
if orientation was specified the array also contains the matching angle for the orientation
|
|
|
|
"""
|
2020-10-14 19:15:43 +00:00
|
|
|
assert x >= 0 and x < self.grid_columns
|
|
|
|
assert y >= 0 and y < self.grid_rows
|
|
|
|
assert self.all_corners_detected()
|
|
|
|
|
|
|
|
# compute column line
|
2020-10-18 13:16:28 +00:00
|
|
|
a = self.corner_estimates['a']['real_world_estimate']
|
|
|
|
b = self.corner_estimates['b']['real_world_estimate']
|
|
|
|
c = self.corner_estimates['c']['real_world_estimate']
|
|
|
|
d = self.corner_estimates['d']['real_world_estimate']
|
2020-10-14 21:08:19 +00:00
|
|
|
x_frac = (x + 0.5) / self.grid_columns
|
|
|
|
y_frac = (y + 0.5) / self.grid_rows
|
|
|
|
|
2020-10-14 19:15:43 +00:00
|
|
|
vab = b - a
|
|
|
|
vdc = c - d
|
2020-10-14 21:08:19 +00:00
|
|
|
column_line_top = a + x_frac * vab
|
|
|
|
column_line_bottom = d + x_frac * vdc
|
2020-10-14 19:15:43 +00:00
|
|
|
|
|
|
|
vad = d - a
|
|
|
|
vbc = c - b
|
2020-10-14 21:08:19 +00:00
|
|
|
row_line_top = a + y_frac * vad
|
|
|
|
row_line_bottom = b + y_frac * vbc
|
2020-10-14 19:15:43 +00:00
|
|
|
|
|
|
|
column_line = LineString([column_line_top, column_line_bottom])
|
|
|
|
row_line = LineString([row_line_top, row_line_bottom])
|
|
|
|
|
|
|
|
int_pt = column_line.intersection(row_line)
|
|
|
|
point_of_intersection = np.array([int_pt.x, int_pt.y])
|
|
|
|
|
2020-10-14 21:08:19 +00:00
|
|
|
if orientation is not None:
|
|
|
|
# compute angle corresponding to the orientation w.r.t. the grid
|
|
|
|
# TODO: test this code
|
|
|
|
angle_ab = np.arctan2(vab[1], vab[0])
|
|
|
|
angle_dc = np.arctan2(vdc[1], vdc[0])
|
|
|
|
|
|
|
|
angle_ad = np.arctan2(vad[1], vad[0])
|
|
|
|
angle_bc = np.arctan2(vbc[1], vbc[0])
|
|
|
|
|
|
|
|
angle = 0.0
|
|
|
|
if orientation == '>':
|
|
|
|
angle = y_frac * angle_ab + (1 - y_frac) * angle_dc
|
|
|
|
elif orientation == '<':
|
2020-10-18 14:58:32 +00:00
|
|
|
angle = y_frac * angle_ab + (1 - y_frac) * angle_dc + np.pi
|
2020-10-14 21:08:19 +00:00
|
|
|
elif orientation == 'v':
|
|
|
|
angle = x_frac * angle_ad + (1 - x_frac) * angle_bc
|
|
|
|
elif orientation == '^':
|
2020-10-18 16:03:33 +00:00
|
|
|
angle = x_frac * angle_ad + (1 - x_frac) * angle_bc + np.pi
|
2020-10-14 21:08:19 +00:00
|
|
|
|
|
|
|
return np.array((point_of_intersection[0], point_of_intersection[1], angle))
|
|
|
|
else:
|
|
|
|
return point_of_intersection
|
2020-10-14 19:15:43 +00:00
|
|
|
|
|
|
|
def get_grid_point_from_pos(self):
|
|
|
|
# return the nearest grid point for the given position estimate
|
|
|
|
pass
|
|
|
|
|
|
|
|
def print_corner_estimates(self):
|
|
|
|
for key, value in self.corner_estimates.items():
|
2020-10-18 13:16:28 +00:00
|
|
|
if value['n_estimates'] != 0:
|
|
|
|
print("corner marker {} at pos {}".format(key, value['real_world_estimate']))
|
2020-10-14 19:15:43 +00:00
|
|
|
print()
|
|
|
|
|
|
|
|
def draw_corner_line(self, frame, corner_1, corner_2, corner_coords_dict):
|
|
|
|
# draws a line between the given markers onto the given frame
|
|
|
|
if corner_1 in corner_coords_dict and corner_2 in corner_coords_dict:
|
|
|
|
frame = cv2.line(frame, corner_coords_dict[corner_1], corner_coords_dict[corner_2], color=(0, 0, 255),
|
|
|
|
thickness=2)
|
|
|
|
return frame
|
|
|
|
|
|
|
|
def draw_grid_lines(self, frame, corners, ids):
|
|
|
|
# draws a grid onto the given frame
|
|
|
|
board_corners_pixel_coords = {}
|
|
|
|
for corner, id in self.corner_marker_ids.items():
|
|
|
|
try:
|
|
|
|
ind, _ = np.where(ids == id) # find index
|
|
|
|
ind = ind[0]
|
|
|
|
board_corners_pixel_coords[corner] = tuple(np.mean(corners[ind][0], axis=0))
|
|
|
|
except IndexError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
frame = self.draw_corner_line(frame, 'a', 'b', board_corners_pixel_coords)
|
|
|
|
frame = self.draw_corner_line(frame, 'b', 'c', board_corners_pixel_coords)
|
|
|
|
frame = self.draw_corner_line(frame, 'c', 'd', board_corners_pixel_coords)
|
|
|
|
frame = self.draw_corner_line(frame, 'd', 'a', board_corners_pixel_coords)
|
|
|
|
|
|
|
|
if set(board_corners_pixel_coords.keys()) == set(self.corner_marker_ids.keys()): # all markers have been detected
|
|
|
|
# compute column line
|
|
|
|
a = np.array(board_corners_pixel_coords['a'])
|
|
|
|
b = np.array(board_corners_pixel_coords['b'])
|
|
|
|
c = np.array(board_corners_pixel_coords['c'])
|
|
|
|
d = np.array(board_corners_pixel_coords['d'])
|
|
|
|
|
|
|
|
vab = b - a
|
|
|
|
vdc = c - d
|
|
|
|
for x in range(1,self.grid_columns):
|
|
|
|
column_line_top = a + x / self.grid_columns * vab
|
|
|
|
column_line_bottom = d + x / self.grid_columns * vdc
|
2020-10-18 13:16:28 +00:00
|
|
|
frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(0, 255, 0),
|
2020-10-14 19:15:43 +00:00
|
|
|
thickness=1)
|
|
|
|
|
|
|
|
vad = d - a
|
|
|
|
vbc = c - b
|
|
|
|
for y in range(1, self.grid_rows):
|
|
|
|
row_line_top = a + y / self.grid_rows * vad
|
|
|
|
row_line_bottom = b + y / self.grid_rows * vbc
|
2020-10-18 13:16:28 +00:00
|
|
|
frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(0, 255, 0),
|
2020-10-14 19:15:43 +00:00
|
|
|
thickness=1)
|
|
|
|
|
|
|
|
return frame
|
|
|
|
|
|
|
|
def get_robot_state_estimate(self, id):
|
|
|
|
if id in self.robot_marker_estimates:
|
|
|
|
if self.robot_marker_estimates[id] is not None:
|
2020-10-18 16:03:33 +00:00
|
|
|
return np.array(self.robot_marker_estimates[id])
|
2020-10-14 19:15:43 +00:00
|
|
|
else:
|
|
|
|
print(f"error: no estimate available for robot {id}")
|
2020-10-18 16:03:33 +00:00
|
|
|
return None
|
2020-10-14 19:15:43 +00:00
|
|
|
else:
|
|
|
|
print(f"error: invalid robot id {id}")
|
2020-10-18 16:03:33 +00:00
|
|
|
return None
|
2020-10-14 19:15:43 +00:00
|
|
|
|
|
|
|
def draw_robot_pos(self, frame, corners, ids):
|
|
|
|
# draws information about the robot positions onto the given frame
|
|
|
|
robot_corners_pixel_coords = {}
|
|
|
|
for id, estimate in self.robot_marker_estimates.items():
|
|
|
|
try:
|
|
|
|
ind, _ = np.where(ids == id) # find index
|
|
|
|
ind = ind[0]
|
|
|
|
robot_corners_pixel_coords[id] = tuple(np.mean(corners[ind][0], axis=0))
|
|
|
|
except IndexError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
for id, coord in robot_corners_pixel_coords.items():
|
|
|
|
x = self.robot_marker_estimates[id][0]
|
|
|
|
y = self.robot_marker_estimates[id][1]
|
|
|
|
angle = self.robot_marker_estimates[id][2]
|
|
|
|
frame = cv2.putText(frame, "pos = ({:5.3f}, {:5.3f}), ang = {:5.3f}".format(x, y, angle), coord, cv2.FONT_HERSHEY_SIMPLEX, 0.50, (0,255,0))
|
|
|
|
return frame
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2020-10-15 18:35:32 +00:00
|
|
|
estimator = ArucoEstimator()
|
2020-10-15 20:41:30 +00:00
|
|
|
|
|
|
|
estimator.run_tracking()
|