Compare commits
No commits in common. "d0c17c0b9108459a6baa7702e63a199df8ab65ff" and "17637f427b26844b97f1b78e8e5a89d63a7e1162" have entirely different histories.
d0c17c0b91
...
17637f427b
|
@ -4,163 +4,39 @@ import cv2
|
|||
import os
|
||||
import time
|
||||
import math
|
||||
from pyqtgraph.Qt import QtCore, QtGui
|
||||
from pyqtgraph.parametertree import Parameter, ParameterTree, parameterTypes
|
||||
import pyqtgraph as pg
|
||||
|
||||
|
||||
from shapely.geometry import LineString
|
||||
from queue import Queue
|
||||
|
||||
import aruco
|
||||
|
||||
class FPSCounter:
|
||||
def __init__(self):
|
||||
self.fps_start_time = time.time()
|
||||
self.fps_counter = 0
|
||||
self.fps_display_rate = 1.0
|
||||
|
||||
def get_fps(self):
|
||||
self.fps_counter += 1
|
||||
delta_t = time.time() - self.fps_start_time
|
||||
if delta_t > self.fps_display_rate:
|
||||
self.fps_counter = 0
|
||||
self.fps_start_time = time.time()
|
||||
return self.fps_counter / delta_t
|
||||
|
||||
## test add/remove
|
||||
## this group includes a menu allowing the user to add new parameters into its child list
|
||||
class RobotMarkerGroup(parameterTypes.GroupParameter):
|
||||
def __init__(self, **opts):
|
||||
opts['type'] = 'group'
|
||||
opts['addText'] = "Add Marker"
|
||||
opts['addList'] = ['Robot']
|
||||
parameterTypes.GroupParameter.__init__(self, **opts)
|
||||
|
||||
def addNew(self, typ):
|
||||
current_robots = self.children()
|
||||
current_indices = [int(r.name().split(' ')[1]) for r in current_robots]
|
||||
new_index = len(current_indices) + 1
|
||||
val = 0
|
||||
self.addChild(dict(name=f"Robot {new_index}", type='int', value=val, removable=False,
|
||||
renamable=True))
|
||||
|
||||
class CornerMarkerGroup(parameterTypes.GroupParameter):
|
||||
def __init__(self, **opts):
|
||||
opts['type'] = 'group'
|
||||
opts['addText'] = "Add Marker"
|
||||
opts['addList'] = ['Corner']
|
||||
parameterTypes.GroupParameter.__init__(self, **opts)
|
||||
|
||||
def addNew(self, typ):
|
||||
current_corners = self.children()
|
||||
current_chars = [str(r.name().split(' ')[1]) for r in current_corners]
|
||||
new_char = chr(ord(current_chars[-1]) + 1)
|
||||
val = 0
|
||||
self.addChild(dict(name=f"Corner {new_char}", type='int', value=val, removable=False))
|
||||
|
||||
|
||||
class ArucoEstimator:
|
||||
corner_marker_ids = {
|
||||
'a': 0,
|
||||
'b': 1,
|
||||
'c': 2,
|
||||
'd': 3
|
||||
}
|
||||
|
||||
corner_estimates = {
|
||||
'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
|
||||
'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
|
||||
'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
|
||||
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
|
||||
}
|
||||
|
||||
def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8):
|
||||
self.app = QtGui.QApplication([])
|
||||
|
||||
## Create window with GraphicsView widget
|
||||
self.win = QtGui.QWidget()
|
||||
self.layout = QtGui.QGridLayout()
|
||||
self.win.setLayout(self.layout)
|
||||
|
||||
self.win.keyPressEvent = self.handleKeyPressEvent
|
||||
self.win.setWindowTitle('ArucoEstimator')
|
||||
|
||||
self.plotwidget = pg.PlotWidget()
|
||||
self.layout.addWidget(self.plotwidget)
|
||||
|
||||
## lock the aspect ratio so pixels are always square
|
||||
self.plotwidget.setAspectLocked(True)
|
||||
self.plotwidget.getPlotItem().getAxis('left').hide()
|
||||
self.plotwidget.getPlotItem().getAxis('bottom').hide()
|
||||
|
||||
## Create image item
|
||||
self.img = pg.ImageItem(border='w')
|
||||
self.img.setLevels([[0, 255], [0, 255], [0, 255]])
|
||||
self.img.mouseClickEvent = self.handleMouseEvent
|
||||
self.plotwidget.addItem(self.img)
|
||||
|
||||
# fps display
|
||||
self.fps_counter = FPSCounter()
|
||||
self.fps_overlay = pg.TextItem('fps = 0', color=(255, 255, 0), anchor=(0,1))
|
||||
self.plotwidget.addItem(self.fps_overlay)
|
||||
|
||||
self.invert_grayscale = True
|
||||
self.draw_grid = False
|
||||
self.draw_markers = True
|
||||
self.draw_marker_coordinate_system = False
|
||||
self.corner_marker_size = 0.075
|
||||
self.corner_marker_ids = {
|
||||
'a': 0,
|
||||
'b': 1,
|
||||
'c': 2,
|
||||
'd': 3
|
||||
}
|
||||
self.corner_estimates = {
|
||||
'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
|
||||
'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
|
||||
'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
|
||||
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
|
||||
}
|
||||
self.grid_columns = grid_columns
|
||||
self.grid_rows = grid_rows
|
||||
|
||||
self.robot_marker_size = 0.07
|
||||
if robot_marker_ids is None:
|
||||
robot_marker_ids = []
|
||||
self.robot_marker_ids = robot_marker_ids
|
||||
self.robot_marker_estimates = dict([(marker_id, {'t': None, 'x': None, 'y': None, 'angle': None})
|
||||
for marker_id in self.robot_marker_ids])
|
||||
|
||||
robot_marker_group = [{'name': f'Robot {ind}', 'type': 'int', 'value': marker_id} for ind, marker_id in
|
||||
enumerate(self.robot_marker_ids, 1)]
|
||||
corner_marker_group = [{'name': f'Corner {letter}', 'type': 'int', 'value': marker_id} for letter, marker_id in
|
||||
self.corner_marker_ids.items()]
|
||||
|
||||
self.threshold = 10.0
|
||||
|
||||
# parameter
|
||||
params_spec = [
|
||||
{'name': 'Corner marker size', 'type': 'float', 'value': self.corner_marker_size, 'siPrefix': True,
|
||||
'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001},
|
||||
{'name': 'Robot marker size', 'type': 'float', 'value': self.robot_marker_size, 'siPrefix': True,
|
||||
'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001},
|
||||
{'name': 'Draw markers', 'type': 'bool', 'value': self.draw_markers},
|
||||
{'name': 'Draw marker coordinate system', 'type': 'bool', 'value': self.draw_marker_coordinate_system},
|
||||
{'name': 'Threshold', 'type': 'float', 'value': self.threshold},
|
||||
{'name': 'Invert grayscale', 'type': 'bool', 'value': self.invert_grayscale, 'tip': "Invert grayscale image before marker detection"},
|
||||
{'name': 'Show FPS', 'type': 'bool', 'value': True, 'tip': "Display frames per second counter"},
|
||||
{'name': 'Draw grid', 'type': 'bool', 'value': self.draw_grid, 'tip': "Draw grid spanned by the markers 0 - 3"},
|
||||
{'name': 'Grid columns', 'type': 'int', 'value': self.grid_columns, 'tip': "Number of columns for the grid"},
|
||||
{'name': 'Grid rows', 'type': 'int', 'value': self.grid_rows, 'tip': "Number of rows for the grid"},
|
||||
{'name': 'Display mode', 'type': 'list', 'values': ['color', 'grayscale'], 'value': 'color', 'tip': "Display mode for the video"},
|
||||
{'name': 'Autoexposure', 'type': 'bool', 'value': True},
|
||||
RobotMarkerGroup(name="Robot markers", children=robot_marker_group),
|
||||
CornerMarkerGroup(name="Corner markers", children=corner_marker_group),
|
||||
]
|
||||
self.params = Parameter.create(name='params', type='group', children=params_spec)
|
||||
self.params.param('Invert grayscale').sigValueChanged.connect(lambda _, v: self.__setattr__('invert_grayscale', v))
|
||||
self.params.param('Threshold').sigValueChanged.connect(lambda _, v: self.__setattr__('threshold', v))
|
||||
self.params.param('Draw markers').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_markers', v))
|
||||
self.params.param('Draw marker coordinate system').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_marker_coordinate_system', v))
|
||||
self.params.param('Draw grid').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_grid', v))
|
||||
self.params.param('Grid columns').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_columns', v))
|
||||
self.params.param('Grid rows').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_rows', v))
|
||||
self.params.param('Corner marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('corner_marker_size', v))
|
||||
self.params.param('Robot marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('robot_marker_size', v))
|
||||
self.params.param('Show FPS').sigValueChanged.connect(lambda _, v: self.fps_overlay.show() if v else self.fps_overlay.hide())
|
||||
self.params.param('Autoexposure').sigValueChanged.connect(lambda _, v: self.set_autoexposure(v))
|
||||
|
||||
self.paramtree = ParameterTree()
|
||||
self.paramtree.setParameters(self.params, showTop=False)
|
||||
self.layout.addWidget(self.paramtree)
|
||||
self.win.show() ## show widget alone in its own window
|
||||
self.draw_grid = False
|
||||
|
||||
self.event_queue = Queue()
|
||||
|
||||
|
@ -174,8 +50,9 @@ class ArucoEstimator:
|
|||
# Start streaming
|
||||
self.pipeline.start(config)
|
||||
|
||||
# enable auto exposure
|
||||
self.set_autoexposure(self.params['Autoexposure'])
|
||||
# disable auto exposure
|
||||
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
|
||||
color_sensor.set_option(rs.option.enable_auto_exposure, False)
|
||||
|
||||
camera_intrinsics = self.pipeline.get_active_profile().get_stream(
|
||||
rs.stream.color).as_video_stream_profile().get_intrinsics()
|
||||
|
@ -192,13 +69,16 @@ class ArucoEstimator:
|
|||
self.pipeline = None
|
||||
|
||||
# create detector and get parameters
|
||||
self.detector = aruco.MarkerDetector()
|
||||
self.detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.05)
|
||||
|
||||
self.detector_params = self.detector.getParameters()
|
||||
|
||||
# print detector parameters
|
||||
# print("detector params:")
|
||||
# print(self.detector_params)
|
||||
# for val in dir(self.detector_params):
|
||||
# if not val.startswith("__"):
|
||||
# print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
|
||||
print("detector params:")
|
||||
for val in dir(self.detector_params):
|
||||
if not val.startswith("__"):
|
||||
print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
|
||||
|
||||
self.camparam = aruco.CameraParameters()
|
||||
if use_realsense:
|
||||
|
@ -210,12 +90,6 @@ class ArucoEstimator:
|
|||
self.drag_line_start = None
|
||||
self.previous_click = None
|
||||
|
||||
def set_autoexposure(self, v):
|
||||
if self.pipeline is not None:
|
||||
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
|
||||
color_sensor.set_option(rs.option.enable_auto_exposure, v)
|
||||
print(color_sensor.get_option(rs.option.enable_auto_exposure))
|
||||
|
||||
def compute_clicked_position(self, px, py):
|
||||
if self.all_corners_detected():
|
||||
# inter/extrapolate from clicked point to marker position
|
||||
|
@ -242,16 +116,21 @@ class ArucoEstimator:
|
|||
target = np.array([px, -py])
|
||||
return target
|
||||
|
||||
def handleMouseEvent(self, event):
|
||||
# get click position as distance to top-left corner of the image
|
||||
px = event.pos().x()
|
||||
py = self.img.height() - event.pos().y()
|
||||
print(f"px = {px}, py = {py}")
|
||||
if event.button() == QtCore.Qt.MouseButton.LeftButton:
|
||||
# self.drag_line_start = (px, py)
|
||||
# elif event == cv2.EVENT_LBUTTONUP:
|
||||
self.drag_line_end = (px, py)
|
||||
def mouse_callback(self, event, px, py, flags, param):
|
||||
"""
|
||||
This function is called for each mouse event inside the opencv window.
|
||||
If there are reference markers available we compute the real world position corresponding to the clicked
|
||||
position and put it in an event queue.
|
||||
:param event: type of event (mouse movement, left click, right click, etc.)
|
||||
:param px: x-position of event
|
||||
:param py: y-position of event
|
||||
"""
|
||||
target = None
|
||||
|
||||
if event == cv2.EVENT_LBUTTONDOWN:
|
||||
self.drag_line_start = (px, py)
|
||||
elif event == cv2.EVENT_LBUTTONUP:
|
||||
self.drag_line_end = (px, py)
|
||||
target_pos = self.compute_clicked_position(self.drag_line_start[0], self.drag_line_start[1])
|
||||
if self.drag_line_start != self.drag_line_end:
|
||||
# compute target angle for clicked position
|
||||
|
@ -276,91 +155,105 @@ class ArucoEstimator:
|
|||
if self.drag_line_start is not None:
|
||||
self.drag_line_end = (px, py)
|
||||
|
||||
def process_frame(self):
|
||||
if self.pipeline:
|
||||
frames = self.pipeline.wait_for_frames()
|
||||
color_frame = frames.get_color_frame()
|
||||
def run_tracking(self, draw_markers=True, draw_marker_coordinate_system=False, invert_grayscale=False):
|
||||
"""
|
||||
Run the marker tracking in a loop
|
||||
"""
|
||||
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
|
||||
cv2.setMouseCallback('RoboRally', self.mouse_callback)
|
||||
|
||||
# Convert images to numpy arrays
|
||||
color_image = np.asanyarray(color_frame.get_data())
|
||||
else:
|
||||
# Capture frame-by-frame
|
||||
ret, color_image = self.cv_camera.read()
|
||||
t_image = time.time()
|
||||
|
||||
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
|
||||
if self.invert_grayscale:
|
||||
cv2.bitwise_not(gray, gray)
|
||||
|
||||
detector = aruco.MarkerDetector()
|
||||
detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.01)
|
||||
|
||||
#detector_params = detector.getParameters()
|
||||
|
||||
# run aruco marker detection
|
||||
detected_markers = detector.detect(gray)
|
||||
# detected_markers2 = detector.detect(gray)
|
||||
#gray = detector.getThresholdedImage()
|
||||
|
||||
display_mode = self.params.param('Display mode').value()
|
||||
|
||||
#print(f"detected_markers = {[marker.id for marker in detected_markers]}")
|
||||
#print("threshold = ", self.threshold)
|
||||
|
||||
# extract data for detected markers
|
||||
detected_marker_data = {}
|
||||
for marker in detected_markers:
|
||||
if marker.id >= 0:
|
||||
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
|
||||
|
||||
if marker.id in self.corner_marker_ids.values():
|
||||
marker.calculateExtrinsics(self.corner_marker_size, self.camparam)
|
||||
fps_display_rate = 1 # displays the frame rate every 1 second
|
||||
fps_counter = 0
|
||||
start_time = time.time()
|
||||
try:
|
||||
running = True
|
||||
while running:
|
||||
if self.pipeline:
|
||||
frames = self.pipeline.wait_for_frames()
|
||||
color_frame = frames.get_color_frame()
|
||||
if not color_frame:
|
||||
continue
|
||||
# Convert images to numpy arrays
|
||||
color_image = np.asanyarray(color_frame.get_data())
|
||||
else:
|
||||
marker.calculateExtrinsics(self.robot_marker_size, self.camparam)
|
||||
detected_marker_data[marker.id]['Rvec'] = marker.Rvec
|
||||
detected_marker_data[marker.id]['Tvec'] = marker.Tvec
|
||||
# Capture frame-by-frame
|
||||
ret, color_image = self.cv_camera.read()
|
||||
t_image = time.time()
|
||||
|
||||
if self.draw_markers:
|
||||
marker.draw(color_image, np.array([255, 255, 255]), 2, True)
|
||||
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
|
||||
if invert_grayscale:
|
||||
cv2.bitwise_not(gray, gray)
|
||||
|
||||
if self.draw_marker_coordinate_system:
|
||||
aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
|
||||
# run aruco marker detection
|
||||
detected_markers = self.detector.detect(gray)
|
||||
|
||||
# store data
|
||||
for marker_id, data in detected_marker_data.items():
|
||||
self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image)
|
||||
# extract data for detected markers
|
||||
detected_marker_data = {}
|
||||
for marker in detected_markers:
|
||||
if marker.id >= 0:
|
||||
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
|
||||
|
||||
# draw grid
|
||||
if self.draw_grid:
|
||||
color_image = self.draw_grid_lines(color_image, detected_marker_data)
|
||||
color_image = self.draw_robot_pos(color_image, detected_marker_data)
|
||||
if marker.id in self.corner_marker_ids.values():
|
||||
marker.calculateExtrinsics(0.075, self.camparam)
|
||||
else:
|
||||
marker.calculateExtrinsics(0.07, self.camparam)
|
||||
detected_marker_data[marker.id]['Rvec'] = marker.Rvec
|
||||
detected_marker_data[marker.id]['Tvec'] = marker.Tvec
|
||||
|
||||
# draw drag
|
||||
if self.drag_line_start is not None and self.drag_line_end is not None:
|
||||
color_image = cv2.line(color_image, self.drag_line_start, self.drag_line_end, color=(0, 0, 255), thickness=2)
|
||||
if draw_markers:
|
||||
marker.draw(color_image, np.array([255, 255, 255]), 2, True)
|
||||
|
||||
# compute frame rate
|
||||
self.fps_overlay.setText(f"fps = {self.fps_counter.get_fps():.1f}")
|
||||
if draw_marker_coordinate_system:
|
||||
aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
|
||||
|
||||
# Show images
|
||||
color_image_rgb = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) # convert to RGB
|
||||
if display_mode == 'color':
|
||||
self.img.setImage(np.transpose(np.flipud(color_image_rgb), axes=(1, 0, 2)))
|
||||
elif display_mode == 'grayscale':
|
||||
self.img.setImage(np.transpose(np.flipud(gray)))
|
||||
# store data
|
||||
for marker_id, data in detected_marker_data.items():
|
||||
self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image)
|
||||
|
||||
QtCore.QTimer.singleShot(1, self.process_frame)
|
||||
# draw grid
|
||||
if self.draw_grid:
|
||||
color_image = self.draw_grid_lines(color_image, detected_marker_data)
|
||||
color_image = self.draw_robot_pos(color_image, detected_marker_data)
|
||||
|
||||
def handleKeyPressEvent(self, ev):
|
||||
key = ev.key()
|
||||
self.event_queue.put(('key', key))
|
||||
if key == QtCore.Qt.Key_Q:
|
||||
# draw drag
|
||||
if self.drag_line_start is not None and self.drag_line_end is not None:
|
||||
color_image = cv2.line(color_image, self.drag_line_start, self.drag_line_end, color=(0, 0, 255), thickness=2)
|
||||
|
||||
# compute frame rate
|
||||
fps_counter += 1
|
||||
delta_t = time.time() - start_time
|
||||
if delta_t > fps_display_rate:
|
||||
fps_counter = 0
|
||||
start_time = time.time()
|
||||
color_image = cv2.putText(color_image, f"fps = {(fps_counter / delta_t):.2f}", (10, 25), cv2.FONT_HERSHEY_PLAIN, 2,
|
||||
(0, 255, 255),
|
||||
thickness=2)
|
||||
|
||||
# Show images
|
||||
cv2.imshow('RoboRally', color_image)
|
||||
key = cv2.waitKey(1)
|
||||
|
||||
if key > 0:
|
||||
self.event_queue.put(('key', key))
|
||||
if key == ord('g'):
|
||||
self.draw_grid = not self.draw_grid
|
||||
if key == ord('q'):
|
||||
running = False
|
||||
if key == ord('x'):
|
||||
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
|
||||
if color_sensor.get_option(rs.option.enable_auto_exposure) == 1.0:
|
||||
color_sensor.set_option(rs.option.enable_auto_exposure, False)
|
||||
print("auto exposure OFF")
|
||||
else:
|
||||
color_sensor.set_option(rs.option.enable_auto_exposure, True)
|
||||
print("auto exposure ON")
|
||||
if key == ord('i'):
|
||||
invert_grayscale = not invert_grayscale
|
||||
finally:
|
||||
cv2.destroyAllWindows()
|
||||
if self.pipeline is not None:
|
||||
# Stop streaming
|
||||
self.pipeline.stop()
|
||||
self.app.quit()
|
||||
elif key == QtCore.Qt.Key_I:
|
||||
self.invert_grayscale = not self.invert_grayscale
|
||||
|
||||
def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image):
|
||||
# update the marker estimate with new data
|
||||
|
@ -484,7 +377,7 @@ class ArucoEstimator:
|
|||
corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate']
|
||||
corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate']
|
||||
if corner_1_center is not None and corner_2_center is not None:
|
||||
frame = cv2.line(frame, tuple(corner_1_center.astype(int)), tuple(corner_2_center.astype(int)), color=(0, 0, 255), thickness=2)
|
||||
frame = cv2.line(frame, tuple(corner_1_center), tuple(corner_2_center), color=(0, 0, 255), thickness=2)
|
||||
return frame
|
||||
|
||||
def draw_grid_lines(self, frame, detected_marker_data):
|
||||
|
@ -507,7 +400,7 @@ class ArucoEstimator:
|
|||
for x in range(1, self.grid_columns):
|
||||
column_line_top = a + x / self.grid_columns * vab
|
||||
column_line_bottom = d + x / self.grid_columns * vdc
|
||||
frame = cv2.line(frame, tuple(column_line_top.astype(int)), tuple(column_line_bottom.astype(int)), color=(0, 255, 0),
|
||||
frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(0, 255, 0),
|
||||
thickness=1)
|
||||
|
||||
# draw horizontal lines
|
||||
|
@ -516,7 +409,7 @@ class ArucoEstimator:
|
|||
for y in range(1, self.grid_rows):
|
||||
row_line_top = a + y / self.grid_rows * vad
|
||||
row_line_bottom = b + y / self.grid_rows * vbc
|
||||
frame = cv2.line(frame, tuple(row_line_top.astype(int)), tuple(row_line_bottom.astype(int)), color=(0, 255, 0),
|
||||
frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(0, 255, 0),
|
||||
thickness=1)
|
||||
|
||||
return frame
|
||||
|
@ -553,9 +446,4 @@ class ArucoEstimator:
|
|||
|
||||
if __name__ == "__main__":
|
||||
estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14])
|
||||
estimator.process_frame()
|
||||
|
||||
import sys
|
||||
|
||||
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
|
||||
QtGui.QApplication.instance().exec_()
|
||||
estimator.run_tracking(draw_markers=True, invert_grayscale=True)
|
||||
|
|
|
@ -3,40 +3,19 @@ import time
|
|||
|
||||
from robot import ControlledRobot
|
||||
from pid_controller import PIDController
|
||||
from mpc_controller import MPCController
|
||||
from event_listener import EventListener
|
||||
|
||||
|
||||
class CommanderBase:
|
||||
valid_controller_types = {'pid': PIDController,
|
||||
'mpc': MPCController}
|
||||
def __init__(self):
|
||||
self.robots = []
|
||||
self.robots = [ControlledRobot(12, '10.10.11.91')] # , Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
|
||||
# self.robots = [ControlledRobot(marker_id=13, ip='10.10.11.90'),
|
||||
# ControlledRobot(marker_id=14, ip='10.10.11.89')]
|
||||
|
||||
def __init__(self, id_ip_dict, controller_type='pid'):
|
||||
"""
|
||||
|
||||
:param id_ip_dict: dictionary containing robot marker id and ip of the robot, e.g. { 12: '10.10.11.91' }
|
||||
:param controller_type: string 'pid', 'mpc'; or dictionary with robot id and controller type string, e.g. { 12: 'mpc', 13: 'pid'}
|
||||
"""
|
||||
self.robots = {}
|
||||
for id, ip in id_ip_dict.items():
|
||||
self.robots[id] = ControlledRobot(id, ip)
|
||||
|
||||
for id, r in self.robots.items():
|
||||
for r in self.robots:
|
||||
r.connect()
|
||||
|
||||
if type(controller_type) == dict:
|
||||
for id, ctype in controller_type.items():
|
||||
if ctype in CommanderBase.valid_controller_types:
|
||||
self.robots[id].attach_controller(CommanderBase.valid_controller_types[ctype]())
|
||||
else:
|
||||
raise Exception(f"invalid controller type {ctype} specified for robot {id}. "
|
||||
f"valid controller types are {list(CommanderBase.valid_controller_types.keys())}")
|
||||
elif controller_type in CommanderBase.valid_controller_types:
|
||||
for id, r in self.robots.items():
|
||||
r.attach_controller(CommanderBase.valid_controller_types[controller_type]())
|
||||
else:
|
||||
raise Exception(f"invalid controller type {controller_type} specified. valid controller types are "
|
||||
f"{list(CommanderBase.valid_controller_types.keys())}")
|
||||
r.attach_controller(PIDController())
|
||||
|
||||
self.event_listener = EventListener(event_server=('127.0.0.1', 42424))
|
||||
|
||||
|
@ -45,14 +24,14 @@ class CommanderBase:
|
|||
self.running = False
|
||||
|
||||
def run(self):
|
||||
unconnected_robots = list(filter(lambda r: not r.connected, self.robots.values()))
|
||||
unconnected_robots = list(filter(lambda r: not r.connected, self.robots))
|
||||
if len(unconnected_robots) > 0:
|
||||
print(f"warning: could not connect to the following robots: {unconnected_robots}")
|
||||
return
|
||||
|
||||
all_detected = False
|
||||
while not all_detected:
|
||||
undetected_robots = list(filter(lambda r: None in r.get_measurement(), self.robots.values()))
|
||||
undetected_robots = list(filter(lambda r: None in r.get_measurement(), self.robots))
|
||||
all_detected = len(undetected_robots) == 0
|
||||
if not all_detected:
|
||||
print(f"warning: no measurements available for the following robots: {undetected_robots}")
|
||||
|
@ -71,25 +50,23 @@ class CommanderBase:
|
|||
if event[0] == 'click':
|
||||
target = event[1]
|
||||
target_pos = np.array([target['x'], target['y'], target['angle']])
|
||||
controlled_robot_id = list(self.robots.keys())[self.current_robot_index]
|
||||
self.robots[controlled_robot_id].move_to_pos(target_pos)
|
||||
self.robots[self.current_robot_index].move_to_pos(target_pos)
|
||||
elif event[0] == 'key':
|
||||
key = event[1]
|
||||
if key == 32: # arrow up
|
||||
self.controlling = not self.controlling
|
||||
if not self.controlling:
|
||||
print("disable control")
|
||||
for _, r in self.robots.items():
|
||||
for r in self.robots:
|
||||
r.stop_control()
|
||||
else:
|
||||
print("enable control")
|
||||
for _, r in self.robots.items():
|
||||
for r in self.robots:
|
||||
r.start_control()
|
||||
elif key == 9: # TAB
|
||||
# switch controlled robot
|
||||
self.current_robot_index = (self.current_robot_index + 1) % len(self.robots)
|
||||
controlled_robot_id = list(self.robots.keys())[self.current_robot_index]
|
||||
robot = self.robots[controlled_robot_id]
|
||||
robot = self.robots[self.current_robot_index]
|
||||
print(f"controlled robot: {robot.id}")
|
||||
elif key == 113 or key == 27: # q or ESCAPE
|
||||
print("quit!")
|
||||
|
@ -99,15 +76,5 @@ class CommanderBase:
|
|||
|
||||
|
||||
if __name__ == '__main__':
|
||||
id_ip_dict = {
|
||||
#11: '10.10.11.88',
|
||||
#12: '10.10.11.91',
|
||||
#13: '10.10.11.90',
|
||||
14: '10.10.11.89',
|
||||
}
|
||||
|
||||
# controller_type = {12: 'mpc', 13: 'pid'}
|
||||
controller_type = 'mpc'
|
||||
|
||||
rc = CommanderBase(id_ip_dict, controller_type=controller_type)
|
||||
rc = CommanderBase()
|
||||
rc.run()
|
||||
|
|
|
@ -18,11 +18,10 @@ class ControllerBase:
|
|||
|
||||
control = self.compute_control(state)
|
||||
|
||||
if self.controlling:
|
||||
self.apply_control(control)
|
||||
self.apply_control(control)
|
||||
|
||||
if self.robot is not None:
|
||||
self.robot.send_cmd(u1=0, u2=0) # stop robot
|
||||
time.sleep(self.control_rate)
|
||||
self.apply_control((0.0, 0.0)) # stop robot
|
||||
|
||||
def set_target_position(self, target_pos):
|
||||
self.target_pos = target_pos
|
||||
|
@ -43,7 +42,6 @@ class ControllerBase:
|
|||
else:
|
||||
raise Exception("error: controller cannot apply control!\n"
|
||||
" there is no robot attached to the controller!")
|
||||
time.sleep(self.control_rate)
|
||||
|
||||
def attach_robot(self, robot):
|
||||
self.robot = robot
|
||||
|
|
|
@ -1,62 +0,0 @@
|
|||
import socket
|
||||
import pygame
|
||||
from argparse import ArgumentParser
|
||||
|
||||
class ManualController:
|
||||
def __init__(self, estimator):
|
||||
self.estimator = estimator
|
||||
self.estimator.external_keyboard_callback = self.handle_keypress
|
||||
|
||||
self.controlling = False
|
||||
|
||||
def handle_keypress(self, key):
|
||||
pass
|
||||
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument('ip', metavar='ip', type=str, help='ip address of the controlled robot')
|
||||
args = parser.parse_args()
|
||||
|
||||
ip = args.ip
|
||||
|
||||
pygame.init()
|
||||
pygame.display.set_mode((640, 480))
|
||||
|
||||
rc_socket = socket.socket()
|
||||
try:
|
||||
rc_socket.connect((ip, 1234)) # connect to robot
|
||||
except socket.error:
|
||||
print("could not connect to socket")
|
||||
|
||||
running = True
|
||||
while running:
|
||||
u1 = 0
|
||||
u2 = 0
|
||||
vmax = 1.0
|
||||
events = pygame.event.get()
|
||||
for event in events:
|
||||
if event.type == pygame.KEYDOWN:
|
||||
if event.key == pygame.K_LEFT:
|
||||
u1 = -vmax
|
||||
u2 = vmax
|
||||
print(f"turn left: ({u1},{u2})")
|
||||
elif event.key == pygame.K_RIGHT:
|
||||
u1 = vmax
|
||||
u2 = -vmax
|
||||
print(f"turn right: ({u1},{u2})")
|
||||
elif event.key == pygame.K_UP:
|
||||
u1 = vmax
|
||||
u2 = vmax
|
||||
print(f"forward: ({u1},{u2})")
|
||||
elif event.key == pygame.K_DOWN:
|
||||
u1 = -vmax
|
||||
u2 = -vmax
|
||||
print(f"backward: ({u1},{u2})")
|
||||
elif event.key == pygame.K_ESCAPE:
|
||||
print("quit")
|
||||
running = False
|
||||
u1 = 0.0
|
||||
u2 = 0.0
|
||||
rc_socket.send(f'({u1},{u2})\n'.encode())
|
||||
elif event.type == pygame.KEYUP:
|
||||
print("key released, resetting: ({},{})".format(u1, u2))
|
||||
rc_socket.send(f'({u1},{u2})\n'.encode())
|
|
@ -22,13 +22,7 @@ class MeasurementHandler(socketserver.BaseRequestHandler):
|
|||
self.request.sendall((json.dumps(event) + '\n').encode())
|
||||
self.server.estimator.last_event = None
|
||||
time.sleep(1.0 / self.server.max_measurements_per_second)
|
||||
elif 'corners' in data.decode():
|
||||
# send positions of the board markers
|
||||
corner_estimates = self.server.estimator.corner_estimates
|
||||
response = {}
|
||||
for corner, data in corner_estimates.items():
|
||||
response[corner] = {'x': data['x'], 'y': data['y']}
|
||||
self.request.sendall((json.dumps(response) + '\n').encode())
|
||||
|
||||
else:
|
||||
# send robot position
|
||||
try:
|
||||
|
@ -63,23 +57,12 @@ class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
|||
|
||||
|
||||
if __name__ == "__main__":
|
||||
aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[12, 13])
|
||||
aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14])
|
||||
estimator_thread = threading.Thread(target=aruco_estimator.run_tracking, kwargs={'invert_grayscale': True})
|
||||
estimator_thread.start()
|
||||
|
||||
# first we start thread for the measurement server
|
||||
measurement_server = MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator, max_measurements_per_second=30)
|
||||
server_thread = threading.Thread(target=measurement_server.serve_forever)
|
||||
server_thread.start()
|
||||
|
||||
# now we start the Aruco estimator GUI
|
||||
aruco_estimator.process_frame()
|
||||
import sys
|
||||
from pyqtgraph.Qt import QtCore, QtGui
|
||||
|
||||
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
|
||||
QtGui.QApplication.instance().exec_()
|
||||
|
||||
#with MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator,
|
||||
# max_measurements_per_second=30) as measurement_server:
|
||||
# measurement_server.serve_forever()
|
||||
with MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator,
|
||||
max_measurements_per_second=30) as measurement_server:
|
||||
measurement_server.serve_forever()
|
||||
|
||||
# receive with: nc 127.0.0.1 42424 -> 12 + Enter
|
||||
|
|
|
@ -1,117 +0,0 @@
|
|||
from flask import Flask, render_template, request
|
||||
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
import numpy as np
|
||||
|
||||
from robot import Robot
|
||||
|
||||
from mpc_controller import MPCController
|
||||
from aruco_estimator import ArucoEstimator
|
||||
|
||||
from shapely.geometry import LineString
|
||||
|
||||
width = 600
|
||||
height = 400
|
||||
|
||||
robot_ids = [11, 12, 13, 14]
|
||||
robot_ids_control_history = [11, 12, 13, 14]
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
def calc_target_pos(px, py):
|
||||
if rc.estimator.all_corners_detected():
|
||||
# compute column line
|
||||
a = np.array([rc.estimator.corner_estimates['a']['x'], rc.estimator.corner_estimates['a']['y']])
|
||||
b = np.array([rc.estimator.corner_estimates['b']['x'], rc.estimator.corner_estimates['b']['y']])
|
||||
c = np.array([rc.estimator.corner_estimates['c']['x'], rc.estimator.corner_estimates['c']['y']])
|
||||
d = np.array([rc.estimator.corner_estimates['d']['x'], rc.estimator.corner_estimates['d']['y']])
|
||||
x_frac = (px + 0.5) / width
|
||||
y_frac = (py + 0.5) / height
|
||||
|
||||
vab = b - a
|
||||
vdc = c - d
|
||||
column_line_top = a + x_frac * vab
|
||||
column_line_bottom = d + x_frac * vdc
|
||||
|
||||
vad = d - a
|
||||
vbc = c - b
|
||||
row_line_top = a + y_frac * vad
|
||||
row_line_bottom = b + y_frac * vbc
|
||||
|
||||
column_line = LineString([column_line_top, column_line_bottom])
|
||||
row_line = LineString([row_line_top, row_line_bottom])
|
||||
|
||||
int_pt = column_line.intersection(row_line)
|
||||
point_of_intersection = np.array([int_pt.x, int_pt.y, 0.0])
|
||||
|
||||
return point_of_intersection
|
||||
|
||||
|
||||
@app.route('/', methods=('GET', 'POST'))
|
||||
def presence():
|
||||
global robot_ids_control_history
|
||||
if request.method == 'GET':
|
||||
return render_template('telepresence.html', oldest_control=robot_ids.index(robot_ids_control_history[0]))
|
||||
elif request.method == 'POST':
|
||||
x = int(float(request.form.get('x')))
|
||||
y = int(float(request.form.get('y')))
|
||||
print(x, y)
|
||||
margin = 0.1
|
||||
x = min(max(width * margin, x), width * (1 - margin))
|
||||
y = min(max(height * margin, y), height * (1 - margin))
|
||||
|
||||
id = int(request.form.get('robot'))
|
||||
robot_ids_control_history.append(robot_ids_control_history.pop(robot_ids_control_history.index(id)))
|
||||
|
||||
target_pos = calc_target_pos(x, y)
|
||||
rc.estimator.event_queue.put(('robot', id, target_pos))
|
||||
|
||||
return 'OK'
|
||||
|
||||
|
||||
class RemoteController:
|
||||
def __init__(self):
|
||||
self.robots = []
|
||||
#self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
|
||||
self.robots = [Robot(11, '10.10.11.88'), Robot(12, '10.10.11.91'),
|
||||
Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')]
|
||||
|
||||
self.robot_ids = {}
|
||||
for r in self.robots:
|
||||
self.robot_ids[r.id] = r
|
||||
|
||||
# socket for movement commands
|
||||
self.comm_socket = socket.socket()
|
||||
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
for robot in self.robots:
|
||||
robot.connect()
|
||||
|
||||
self.comm_socket.bind(('', 1337))
|
||||
self.comm_socket.listen(5)
|
||||
|
||||
self.t = time.time()
|
||||
|
||||
# start thread for marker position detection
|
||||
self.estimator = ArucoEstimator(self.robot_ids.keys())
|
||||
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking, kwargs={'draw_markers':False})
|
||||
self.estimator_thread.start()
|
||||
|
||||
self.controller = MPCController(self.estimator)
|
||||
|
||||
def run(self):
|
||||
print("waiting until all markers are detected...")
|
||||
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
|
||||
pass
|
||||
print("starting control")
|
||||
self.controller.interactive_control(robots=self.robots)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
rc = RemoteController()
|
||||
rc_thread = threading.Thread(target=rc.run)
|
||||
rc_thread.start()
|
||||
|
||||
app.run('0.0.0.0')
|
|
@ -1,62 +1,175 @@
|
|||
import numpy as np
|
||||
import time
|
||||
|
||||
from controller import ControllerBase
|
||||
from casadi_opt import OpenLoopSolver
|
||||
|
||||
|
||||
class MPCController(ControllerBase):
|
||||
def __init__(self, N=20, T=1.0):
|
||||
super().__init__()
|
||||
self.t = None
|
||||
class MPCController:
|
||||
def __init__(self, estimator):
|
||||
self.t = time.time()
|
||||
|
||||
self.ols = OpenLoopSolver(N=N, T=T)
|
||||
self.ols.setup()
|
||||
self.control_rate = self.ols.T / self.ols.N
|
||||
self.estimator = estimator
|
||||
self.controlling = False
|
||||
|
||||
self.mstep = 2
|
||||
self.ols = OpenLoopSolver(N=20, T=1.0)
|
||||
self.ols.setup()
|
||||
self.dt = self.ols.T / self.ols.N
|
||||
|
||||
# integrator
|
||||
self.omega_max = 5.0
|
||||
self.control_scaling = 0.5
|
||||
self.control_scaling = 0.4
|
||||
|
||||
def set_target_position(self, target_pos):
|
||||
super(MPCController, self).set_target_position(target_pos)
|
||||
self.t = time.time()
|
||||
def move_to_pos(self, target_pos, robot, near_target_counter=5):
|
||||
near_target = 0
|
||||
while near_target < near_target_counter:
|
||||
while not self.estimator.event_queue.empty():
|
||||
event = self.estimator.event_queue.get()
|
||||
print("event: ", event)
|
||||
if event[0] == 'click':
|
||||
pass
|
||||
elif event[0] == 'key':
|
||||
key = event[1]
|
||||
|
||||
def compute_control(self, state):
|
||||
x_pred = np.array(state[1:])
|
||||
if key == 84: # arrow up
|
||||
self.controlling = True
|
||||
self.t = time.time()
|
||||
elif key == 82: # arrow down
|
||||
self.controlling = False
|
||||
robot.send_cmd()
|
||||
elif key == 48: # 0
|
||||
target_pos = np.array([0.0, 0.0, 0.0])
|
||||
elif key == 43: # +
|
||||
self.control_scaling += 0.1
|
||||
self.control_scaling = min(self.control_scaling, 1.0)
|
||||
print("control scaling = ", self.control_scaling)
|
||||
elif key == 45: # -
|
||||
self.control_scaling -= 0.1
|
||||
self.control_scaling = max(self.control_scaling, 0.1)
|
||||
print("control scaling = ", self.control_scaling)
|
||||
elif key == 113:
|
||||
print("quit!")
|
||||
self.controlling = False
|
||||
robot.send_cmd()
|
||||
return
|
||||
elif key == 27: # escape
|
||||
print("quit!")
|
||||
self.controlling = False
|
||||
robot.send_cmd()
|
||||
return
|
||||
|
||||
error_pos = np.linalg.norm(x_pred[0:2] - self.target_pos[0:2])
|
||||
angles_unwrapped = np.unwrap([x_pred[2], self.target_pos[2]]) # unwrap angle to avoid jump in data
|
||||
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
|
||||
x_pred = self.get_measurement(robot.id)
|
||||
|
||||
if error_pos > 0.05 or error_ang > 0.2:
|
||||
#if error_pos > 0.05:
|
||||
# solve mpc open loop problem
|
||||
res = self.ols.solve(x_pred, self.target_pos)
|
||||
if x_pred is not None:
|
||||
error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
|
||||
angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
|
||||
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
|
||||
# print("error pos = ", error_pos)
|
||||
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
|
||||
|
||||
us1 = res[0] * self.control_scaling
|
||||
us2 = res[1] * self.control_scaling
|
||||
# if error_pos > 0.075 or error_ang > 0.35:
|
||||
if error_pos > 0.05 or error_ang > 0.1:
|
||||
# solve mpc open loop problem
|
||||
res = self.ols.solve(x_pred, target_pos)
|
||||
|
||||
dt_mpc = time.time() - self.t
|
||||
if dt_mpc < self.control_rate: # wait until next control can be applied
|
||||
time.sleep(self.control_rate - dt_mpc)
|
||||
else:
|
||||
us1 = [0] * self.mstep
|
||||
us2 = [0] * self.mstep
|
||||
# us1 = res[0]
|
||||
# us2 = res[1]
|
||||
us1 = res[0] * self.control_scaling
|
||||
us2 = res[1] * self.control_scaling
|
||||
# print("u = {}", (us1, us2))
|
||||
|
||||
return us1, us2
|
||||
# print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
|
||||
dt_mpc = time.time() - self.t
|
||||
if dt_mpc < self.dt: # wait until next control can be applied
|
||||
# print("sleeping for {} seconds...".format(self.dt - dt_mpc))
|
||||
time.sleep(self.dt - dt_mpc)
|
||||
else:
|
||||
us1 = [0] * self.mstep
|
||||
us2 = [0] * self.mstep
|
||||
near_target += 1
|
||||
|
||||
def apply_control(self, control):
|
||||
if self.robot is not None:
|
||||
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
|
||||
u1 = control[0][i]
|
||||
u2 = control[1][i]
|
||||
self.robot.send_cmd(u1, u2)
|
||||
if i < self.mstep:
|
||||
time.sleep(self.control_rate)
|
||||
self.t = time.time() # save time the most recent control was applied
|
||||
else:
|
||||
raise Exception("error: controller cannot apply control!\n"
|
||||
" there is no robot attached to the controller!")
|
||||
# send controls to the robot
|
||||
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
|
||||
u1 = us1[i]
|
||||
u2 = us2[i]
|
||||
robot.send_cmd(u1, u2)
|
||||
if i < self.mstep:
|
||||
time.sleep(self.dt)
|
||||
self.t = time.time() # save time the most recent control was applied
|
||||
else:
|
||||
print("robot not detected yet!")
|
||||
|
||||
def interactive_control(self, robots):
|
||||
controlled_robot_number = 0
|
||||
robot = robots[controlled_robot_number]
|
||||
|
||||
target_pos = np.array([0.0, 0.0, 0.0])
|
||||
|
||||
running = True
|
||||
while running:
|
||||
# handle events from opencv window
|
||||
while not self.estimator.event_queue.empty():
|
||||
event = self.estimator.event_queue.get()
|
||||
print("event: ", event)
|
||||
if event[0] == 'click':
|
||||
target_pos = event[1]
|
||||
elif event[0] == 'key':
|
||||
key = event[1]
|
||||
|
||||
if key == 32: # arrow up
|
||||
self.controlling = not self.controlling
|
||||
if not self.controlling:
|
||||
print("disable control")
|
||||
robot.send_cmd() # stop robot
|
||||
else:
|
||||
print("enable control")
|
||||
self.t = time.time()
|
||||
elif key == 48: # 0
|
||||
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
|
||||
elif key == 43: # +
|
||||
self.control_scaling += 0.1
|
||||
self.control_scaling = min(self.control_scaling, 1.0)
|
||||
print("control scaling = ", self.control_scaling)
|
||||
elif key == 45: # -
|
||||
self.control_scaling -= 0.1
|
||||
self.control_scaling = max(self.control_scaling, 0.1)
|
||||
print("control scaling = ", self.control_scaling)
|
||||
elif key == 9: # TAB
|
||||
# switch controlled robot
|
||||
robot.send_cmd() # stop current robot
|
||||
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
|
||||
robot = robots[controlled_robot_number]
|
||||
print(f"controlled robot: {robot.id}")
|
||||
elif key == 113 or key == 27: # q or ESCAPE
|
||||
print("quit!")
|
||||
self.controlling = False
|
||||
robot.send_cmd()
|
||||
return
|
||||
|
||||
if self.controlling:
|
||||
# measure state
|
||||
x_pred = self.get_measurement(robot.id)
|
||||
|
||||
# print(np.linalg.norm(x_pred-target_pos))
|
||||
|
||||
# solve mpc open loop problem
|
||||
res = self.ols.solve(x_pred, target_pos)
|
||||
|
||||
us1 = res[0] * self.control_scaling
|
||||
us2 = res[1] * self.control_scaling
|
||||
|
||||
dt_mpc = time.time() - self.t
|
||||
if dt_mpc < self.dt: # wait until next control can be applied
|
||||
time.sleep(self.dt - dt_mpc)
|
||||
|
||||
# send controls to the robot
|
||||
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
|
||||
u1 = us1[i]
|
||||
u2 = us2[i]
|
||||
robot.send_cmd(u1, u2)
|
||||
if i < self.mstep:
|
||||
time.sleep(self.dt)
|
||||
self.t = time.time() # save time the most recent control was applied
|
||||
|
||||
def get_measurement(self, robot_id):
|
||||
return self.estimator.get_robot_state_estimate(robot_id)
|
||||
|
|
|
@ -3,32 +3,17 @@ import socket
|
|||
import threading
|
||||
import time
|
||||
|
||||
from robot import Robot, ControlledRobot
|
||||
from robot import Robot
|
||||
|
||||
from mpc_controller import MPCController
|
||||
|
||||
# HOST, PORT = "localhost", 42424
|
||||
#
|
||||
# robot_id = 12
|
||||
#
|
||||
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
# sock.connect((HOST, PORT))
|
||||
# sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id
|
||||
# #sock.sendall(f"events\n".encode()) # request events
|
||||
# receiving = True
|
||||
# while receiving:
|
||||
# received = str(sock.recv(1024), "utf-8")
|
||||
# print("Received: {}".format(received))
|
||||
# receiving = len(received) > 0
|
||||
from aruco_estimator import ArucoEstimator
|
||||
|
||||
|
||||
class RemoteController:
|
||||
def __init__(self):
|
||||
self.robots = []
|
||||
#self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
|
||||
#self.robots = [Robot(12, '10.10.11.91'), Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')]
|
||||
#self.robots = [Robot(14, '10.10.11.89')]
|
||||
self.robots = [ControlledRobot(13, '192.168.1.13')]
|
||||
#self.robots = [Robot(11, '192.168.1.11'), Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
|
||||
|
||||
self.robot_ids = {}
|
||||
for r in self.robots:
|
||||
|
@ -47,17 +32,18 @@ class RemoteController:
|
|||
self.t = time.time()
|
||||
|
||||
# start thread for marker position detection
|
||||
self.controller = MPCController()
|
||||
self.robots[0].attach_controller(self.controller)
|
||||
self.estimator = ArucoEstimator(self.robot_ids.keys())
|
||||
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking)
|
||||
self.estimator_thread.start()
|
||||
|
||||
self.controller = MPCController(self.estimator)
|
||||
|
||||
def run(self):
|
||||
#print("waiting until all markers are detected...")
|
||||
#while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
|
||||
# pass
|
||||
print("waiting until all markers are detected...")
|
||||
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
|
||||
pass
|
||||
print("starting control")
|
||||
#self.controller.interactive_control(robots=self.robots)
|
||||
self.controller.start()
|
||||
pass
|
||||
self.controller.interactive_control(robots=self.robots)
|
||||
|
||||
|
||||
def main(args):
|
||||
|
@ -67,13 +53,3 @@ def main(args):
|
|||
|
||||
if __name__ == '__main__':
|
||||
main(sys.argv)
|
||||
|
||||
# to use the program, first start measurement_server.py, which launches the Aruco marker detection GUI and streams
|
||||
# marker positions and GUI events via socket
|
||||
# next, launch this program (mpc_test.py) in debug mode and set a breakpoint after self.controller.start()
|
||||
# you can now set the target position for the controlled robot via move_to_pos(), e.g.
|
||||
# self.robots[0].move_to_pos([0.0,0,0], True)
|
||||
|
||||
# TODO
|
||||
# figure out how to get clicked positions from measurement server and drive robot there
|
||||
# also document and improve program structure
|
|
@ -10,6 +10,8 @@ class PIDController(ControllerBase):
|
|||
super().__init__()
|
||||
self.t = None
|
||||
|
||||
self.controlling = False
|
||||
|
||||
self.P_angle = 0.4
|
||||
self.I_angle = 0.35
|
||||
self.D_angle = 0.1
|
||||
|
@ -44,7 +46,7 @@ class PIDController(ControllerBase):
|
|||
if self.t is None:
|
||||
dt = 0.1
|
||||
else:
|
||||
dt = time.time() - self.t # time since last control was applied
|
||||
dt = time.time() - self.t
|
||||
|
||||
if self.mode == 'angle':
|
||||
# compute angle such that robot faces to target point
|
||||
|
@ -106,11 +108,9 @@ class PIDController(ControllerBase):
|
|||
|
||||
self.e_pos_old = e_pos
|
||||
self.e_angle_old = e_angle
|
||||
|
||||
else:
|
||||
u1 = 0.0
|
||||
u2 = 0.0
|
||||
return u1, u2
|
||||
|
||||
def apply_control(self, control):
|
||||
self.t = time.time() # save time when the most recent control was applied
|
||||
super(PIDController, self).apply_control(control)
|
||||
return u1, u2
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
import socket
|
||||
import threading
|
||||
import json
|
||||
import numpy as np
|
||||
import time
|
||||
import math
|
||||
|
||||
|
||||
class Robot:
|
||||
def __init__(self, marker_id, ip, measurement_server=('127.0.0.1', 42424)):
|
||||
|
@ -125,26 +123,8 @@ class ControlledRobot(Robot):
|
|||
self.controller = controller
|
||||
self.controller.attach_robot(self)
|
||||
|
||||
def move_to_pos(self, target_pos, blocking=False):
|
||||
def move_to_pos(self, target_pos):
|
||||
if self.controller is not None:
|
||||
if not blocking:
|
||||
self.controller.set_target_position(target_pos)
|
||||
else: # only return after the robot has reached the target
|
||||
self.stop_control()
|
||||
self.controller.set_target_position(target_pos)
|
||||
self.start_control()
|
||||
|
||||
close_to_target = False
|
||||
while not close_to_target:
|
||||
current_pos = np.array([self.x, self.y, self.angle])
|
||||
v = target_pos - current_pos
|
||||
angles_unwrapped = np.unwrap([current_pos[2], target_pos[2]]) # unwrap angle to avoid jump in data
|
||||
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
|
||||
e_pos = np.linalg.norm(v[0:2])
|
||||
print(f"e_pos = {e_pos}, e_ang = {e_angle}")
|
||||
close_to_target = e_pos < 0.05 and abs(e_angle) < 0.1
|
||||
time.sleep(0.1)
|
||||
print("target reached!")
|
||||
self.stop_control()
|
||||
self.controller.set_target_position(target_pos)
|
||||
else:
|
||||
raise Exception("Error: Cannot move to position: there is not controller attached to the robot!")
|
||||
raise Exception("Error: Cannot move to position: there is not controller attached to the robot!")
|
||||
|
|
|
@ -1,85 +0,0 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>Mensa-Abend Telepräsenz</title>
|
||||
<style>
|
||||
div.meerschweinchen {
|
||||
max-width: 600px;
|
||||
margin: auto;
|
||||
text-align: center;
|
||||
}
|
||||
div.arena {
|
||||
height: 400px;
|
||||
width: 600px;
|
||||
background-image: url("https://p.0au.de/3d62ea7/foo.png");
|
||||
border: none;
|
||||
text-align: center;
|
||||
position: relative
|
||||
}
|
||||
div.arena p {
|
||||
margin: 0;
|
||||
position: absolute;
|
||||
top: 50%;
|
||||
left: 50%;
|
||||
transform: translate(-50%, -50%)
|
||||
}
|
||||
</style>
|
||||
|
||||
<script type=text/javascript src="{{url_for('static', filename='jquery-3.5.1.min.js') }}"></script>
|
||||
</head>
|
||||
<body>
|
||||
<h1 style="text-align: center">Welcome to Mensa-Abend digital</h1>
|
||||
<div style="text-align: center">
|
||||
<p>Click anywhere to control the robots in our hackerspace. You can see them in the Zoom video.<br>
|
||||
<br>
|
||||
Please let us know about your study background.
|
||||
</p>
|
||||
</div>
|
||||
<div class="meerschweinchen">
|
||||
<div class=arena style="font-size: large;">
|
||||
|
||||
</div>
|
||||
|
||||
<form method="post">
|
||||
<p>Choose your robot:</p>
|
||||
<div>
|
||||
<input type="radio" id="r11" name="robot_id" value="11">
|
||||
<label for="r11">Robot 11</label>
|
||||
</div>
|
||||
<div>
|
||||
<input type="radio" id="r12" name="robot_id" value="12">
|
||||
<label for="r12">Robot 12</label>
|
||||
</div>
|
||||
<div>
|
||||
<input type="radio" id="r13" name="robot_id" value="13">
|
||||
<label for="r13">Robot 13</label>
|
||||
</div>
|
||||
<div>
|
||||
<input type="radio" id="r14" name="robot_id" value="14">
|
||||
<label for="r14">Robot 14</label>
|
||||
</div>
|
||||
</form>
|
||||
</div>
|
||||
</body>
|
||||
|
||||
<script>
|
||||
function fireThis(e) {
|
||||
var parentOffset = $(this).parent().offset();
|
||||
var x = e.pageX - parentOffset.left;
|
||||
var y = e.pageY - parentOffset.top;
|
||||
|
||||
var robot = document.querySelector('input[name = "robot_id"]:checked').value;
|
||||
|
||||
$.post( "/", { 'x': x, 'y': y , 'robot': robot} );
|
||||
}
|
||||
|
||||
$('.arena').on('mousedown', fireThis);
|
||||
|
||||
function callRandom() {
|
||||
var array = document.getElementsByName('robot_id');
|
||||
array[{{ oldest_control }}].checked = true;
|
||||
}
|
||||
callRandom();
|
||||
</script>
|
||||
</html>
|
Loading…
Reference in New Issue
Block a user