Compare commits

...

9 Commits

11 changed files with 688 additions and 329 deletions

View File

@ -4,39 +4,163 @@ import cv2
import os import os
import time import time
import math import math
from pyqtgraph.Qt import QtCore, QtGui
from pyqtgraph.parametertree import Parameter, ParameterTree, parameterTypes
import pyqtgraph as pg
from shapely.geometry import LineString from shapely.geometry import LineString
from queue import Queue from queue import Queue
import aruco import aruco
class FPSCounter:
def __init__(self):
self.fps_start_time = time.time()
self.fps_counter = 0
self.fps_display_rate = 1.0
def get_fps(self):
self.fps_counter += 1
delta_t = time.time() - self.fps_start_time
if delta_t > self.fps_display_rate:
self.fps_counter = 0
self.fps_start_time = time.time()
return self.fps_counter / delta_t
## test add/remove
## this group includes a menu allowing the user to add new parameters into its child list
class RobotMarkerGroup(parameterTypes.GroupParameter):
def __init__(self, **opts):
opts['type'] = 'group'
opts['addText'] = "Add Marker"
opts['addList'] = ['Robot']
parameterTypes.GroupParameter.__init__(self, **opts)
def addNew(self, typ):
current_robots = self.children()
current_indices = [int(r.name().split(' ')[1]) for r in current_robots]
new_index = len(current_indices) + 1
val = 0
self.addChild(dict(name=f"Robot {new_index}", type='int', value=val, removable=False,
renamable=True))
class CornerMarkerGroup(parameterTypes.GroupParameter):
def __init__(self, **opts):
opts['type'] = 'group'
opts['addText'] = "Add Marker"
opts['addList'] = ['Corner']
parameterTypes.GroupParameter.__init__(self, **opts)
def addNew(self, typ):
current_corners = self.children()
current_chars = [str(r.name().split(' ')[1]) for r in current_corners]
new_char = chr(ord(current_chars[-1]) + 1)
val = 0
self.addChild(dict(name=f"Corner {new_char}", type='int', value=val, removable=False))
class ArucoEstimator: class ArucoEstimator:
corner_marker_ids = { def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8):
self.app = QtGui.QApplication([])
## Create window with GraphicsView widget
self.win = QtGui.QWidget()
self.layout = QtGui.QGridLayout()
self.win.setLayout(self.layout)
self.win.keyPressEvent = self.handleKeyPressEvent
self.win.setWindowTitle('ArucoEstimator')
self.plotwidget = pg.PlotWidget()
self.layout.addWidget(self.plotwidget)
## lock the aspect ratio so pixels are always square
self.plotwidget.setAspectLocked(True)
self.plotwidget.getPlotItem().getAxis('left').hide()
self.plotwidget.getPlotItem().getAxis('bottom').hide()
## Create image item
self.img = pg.ImageItem(border='w')
self.img.setLevels([[0, 255], [0, 255], [0, 255]])
self.img.mouseClickEvent = self.handleMouseEvent
self.plotwidget.addItem(self.img)
# fps display
self.fps_counter = FPSCounter()
self.fps_overlay = pg.TextItem('fps = 0', color=(255, 255, 0), anchor=(0,1))
self.plotwidget.addItem(self.fps_overlay)
self.invert_grayscale = True
self.draw_grid = False
self.draw_markers = True
self.draw_marker_coordinate_system = False
self.corner_marker_size = 0.075
self.corner_marker_ids = {
'a': 0, 'a': 0,
'b': 1, 'b': 1,
'c': 2, 'c': 2,
'd': 3 'd': 3
} }
self.corner_estimates = {
corner_estimates = {
'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0}, 'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0}, 'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0}, 'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0}, 'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
} }
def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8):
self.grid_columns = grid_columns self.grid_columns = grid_columns
self.grid_rows = grid_rows self.grid_rows = grid_rows
self.robot_marker_size = 0.07
if robot_marker_ids is None: if robot_marker_ids is None:
robot_marker_ids = [] robot_marker_ids = []
self.robot_marker_ids = robot_marker_ids self.robot_marker_ids = robot_marker_ids
self.robot_marker_estimates = dict([(marker_id, {'t': None, 'x': None, 'y': None, 'angle': None}) self.robot_marker_estimates = dict([(marker_id, {'t': None, 'x': None, 'y': None, 'angle': None})
for marker_id in self.robot_marker_ids]) for marker_id in self.robot_marker_ids])
self.draw_grid = False robot_marker_group = [{'name': f'Robot {ind}', 'type': 'int', 'value': marker_id} for ind, marker_id in
enumerate(self.robot_marker_ids, 1)]
corner_marker_group = [{'name': f'Corner {letter}', 'type': 'int', 'value': marker_id} for letter, marker_id in
self.corner_marker_ids.items()]
self.threshold = 10.0
# parameter
params_spec = [
{'name': 'Corner marker size', 'type': 'float', 'value': self.corner_marker_size, 'siPrefix': True,
'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001},
{'name': 'Robot marker size', 'type': 'float', 'value': self.robot_marker_size, 'siPrefix': True,
'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001},
{'name': 'Draw markers', 'type': 'bool', 'value': self.draw_markers},
{'name': 'Draw marker coordinate system', 'type': 'bool', 'value': self.draw_marker_coordinate_system},
{'name': 'Threshold', 'type': 'float', 'value': self.threshold},
{'name': 'Invert grayscale', 'type': 'bool', 'value': self.invert_grayscale, 'tip': "Invert grayscale image before marker detection"},
{'name': 'Show FPS', 'type': 'bool', 'value': True, 'tip': "Display frames per second counter"},
{'name': 'Draw grid', 'type': 'bool', 'value': self.draw_grid, 'tip': "Draw grid spanned by the markers 0 - 3"},
{'name': 'Grid columns', 'type': 'int', 'value': self.grid_columns, 'tip': "Number of columns for the grid"},
{'name': 'Grid rows', 'type': 'int', 'value': self.grid_rows, 'tip': "Number of rows for the grid"},
{'name': 'Display mode', 'type': 'list', 'values': ['color', 'grayscale'], 'value': 'color', 'tip': "Display mode for the video"},
{'name': 'Autoexposure', 'type': 'bool', 'value': True},
RobotMarkerGroup(name="Robot markers", children=robot_marker_group),
CornerMarkerGroup(name="Corner markers", children=corner_marker_group),
]
self.params = Parameter.create(name='params', type='group', children=params_spec)
self.params.param('Invert grayscale').sigValueChanged.connect(lambda _, v: self.__setattr__('invert_grayscale', v))
self.params.param('Threshold').sigValueChanged.connect(lambda _, v: self.__setattr__('threshold', v))
self.params.param('Draw markers').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_markers', v))
self.params.param('Draw marker coordinate system').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_marker_coordinate_system', v))
self.params.param('Draw grid').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_grid', v))
self.params.param('Grid columns').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_columns', v))
self.params.param('Grid rows').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_rows', v))
self.params.param('Corner marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('corner_marker_size', v))
self.params.param('Robot marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('robot_marker_size', v))
self.params.param('Show FPS').sigValueChanged.connect(lambda _, v: self.fps_overlay.show() if v else self.fps_overlay.hide())
self.params.param('Autoexposure').sigValueChanged.connect(lambda _, v: self.set_autoexposure(v))
self.paramtree = ParameterTree()
self.paramtree.setParameters(self.params, showTop=False)
self.layout.addWidget(self.paramtree)
self.win.show() ## show widget alone in its own window
self.event_queue = Queue() self.event_queue = Queue()
@ -50,9 +174,8 @@ class ArucoEstimator:
# Start streaming # Start streaming
self.pipeline.start(config) self.pipeline.start(config)
# disable auto exposure # enable auto exposure
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1] self.set_autoexposure(self.params['Autoexposure'])
color_sensor.set_option(rs.option.enable_auto_exposure, False)
camera_intrinsics = self.pipeline.get_active_profile().get_stream( camera_intrinsics = self.pipeline.get_active_profile().get_stream(
rs.stream.color).as_video_stream_profile().get_intrinsics() rs.stream.color).as_video_stream_profile().get_intrinsics()
@ -69,16 +192,13 @@ class ArucoEstimator:
self.pipeline = None self.pipeline = None
# create detector and get parameters # create detector and get parameters
self.detector = aruco.MarkerDetector()
self.detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.05)
self.detector_params = self.detector.getParameters()
# print detector parameters # print detector parameters
print("detector params:") # print("detector params:")
for val in dir(self.detector_params): # print(self.detector_params)
if not val.startswith("__"): # for val in dir(self.detector_params):
print("\t{} : {}".format(val, self.detector_params.__getattribute__(val))) # if not val.startswith("__"):
# print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
self.camparam = aruco.CameraParameters() self.camparam = aruco.CameraParameters()
if use_realsense: if use_realsense:
@ -90,6 +210,12 @@ class ArucoEstimator:
self.drag_line_start = None self.drag_line_start = None
self.previous_click = None self.previous_click = None
def set_autoexposure(self, v):
if self.pipeline is not None:
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
color_sensor.set_option(rs.option.enable_auto_exposure, v)
print(color_sensor.get_option(rs.option.enable_auto_exposure))
def compute_clicked_position(self, px, py): def compute_clicked_position(self, px, py):
if self.all_corners_detected(): if self.all_corners_detected():
# inter/extrapolate from clicked point to marker position # inter/extrapolate from clicked point to marker position
@ -116,21 +242,16 @@ class ArucoEstimator:
target = np.array([px, -py]) target = np.array([px, -py])
return target return target
def mouse_callback(self, event, px, py, flags, param): def handleMouseEvent(self, event):
""" # get click position as distance to top-left corner of the image
This function is called for each mouse event inside the opencv window. px = event.pos().x()
If there are reference markers available we compute the real world position corresponding to the clicked py = self.img.height() - event.pos().y()
position and put it in an event queue. print(f"px = {px}, py = {py}")
:param event: type of event (mouse movement, left click, right click, etc.) if event.button() == QtCore.Qt.MouseButton.LeftButton:
:param px: x-position of event # self.drag_line_start = (px, py)
:param py: y-position of event # elif event == cv2.EVENT_LBUTTONUP:
"""
target = None
if event == cv2.EVENT_LBUTTONDOWN:
self.drag_line_start = (px, py)
elif event == cv2.EVENT_LBUTTONUP:
self.drag_line_end = (px, py) self.drag_line_end = (px, py)
self.drag_line_start = (px, py)
target_pos = self.compute_clicked_position(self.drag_line_start[0], self.drag_line_start[1]) target_pos = self.compute_clicked_position(self.drag_line_start[0], self.drag_line_start[1])
if self.drag_line_start != self.drag_line_end: if self.drag_line_start != self.drag_line_end:
# compute target angle for clicked position # compute target angle for clicked position
@ -155,24 +276,11 @@ class ArucoEstimator:
if self.drag_line_start is not None: if self.drag_line_start is not None:
self.drag_line_end = (px, py) self.drag_line_end = (px, py)
def run_tracking(self, draw_markers=True, draw_marker_coordinate_system=False, invert_grayscale=False): def process_frame(self):
"""
Run the marker tracking in a loop
"""
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
cv2.setMouseCallback('RoboRally', self.mouse_callback)
fps_display_rate = 1 # displays the frame rate every 1 second
fps_counter = 0
start_time = time.time()
try:
running = True
while running:
if self.pipeline: if self.pipeline:
frames = self.pipeline.wait_for_frames() frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame() color_frame = frames.get_color_frame()
if not color_frame:
continue
# Convert images to numpy arrays # Convert images to numpy arrays
color_image = np.asanyarray(color_frame.get_data()) color_image = np.asanyarray(color_frame.get_data())
else: else:
@ -181,11 +289,23 @@ class ArucoEstimator:
t_image = time.time() t_image = time.time()
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY) gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
if invert_grayscale: if self.invert_grayscale:
cv2.bitwise_not(gray, gray) cv2.bitwise_not(gray, gray)
detector = aruco.MarkerDetector()
detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.01)
#detector_params = detector.getParameters()
# run aruco marker detection # run aruco marker detection
detected_markers = self.detector.detect(gray) detected_markers = detector.detect(gray)
# detected_markers2 = detector.detect(gray)
#gray = detector.getThresholdedImage()
display_mode = self.params.param('Display mode').value()
#print(f"detected_markers = {[marker.id for marker in detected_markers]}")
#print("threshold = ", self.threshold)
# extract data for detected markers # extract data for detected markers
detected_marker_data = {} detected_marker_data = {}
@ -194,16 +314,16 @@ class ArucoEstimator:
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()} detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
if marker.id in self.corner_marker_ids.values(): if marker.id in self.corner_marker_ids.values():
marker.calculateExtrinsics(0.075, self.camparam) marker.calculateExtrinsics(self.corner_marker_size, self.camparam)
else: else:
marker.calculateExtrinsics(0.07, self.camparam) marker.calculateExtrinsics(self.robot_marker_size, self.camparam)
detected_marker_data[marker.id]['Rvec'] = marker.Rvec detected_marker_data[marker.id]['Rvec'] = marker.Rvec
detected_marker_data[marker.id]['Tvec'] = marker.Tvec detected_marker_data[marker.id]['Tvec'] = marker.Tvec
if draw_markers: if self.draw_markers:
marker.draw(color_image, np.array([255, 255, 255]), 2, True) marker.draw(color_image, np.array([255, 255, 255]), 2, True)
if draw_marker_coordinate_system: if self.draw_marker_coordinate_system:
aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1) aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
# store data # store data
@ -220,40 +340,27 @@ class ArucoEstimator:
color_image = cv2.line(color_image, self.drag_line_start, self.drag_line_end, color=(0, 0, 255), thickness=2) color_image = cv2.line(color_image, self.drag_line_start, self.drag_line_end, color=(0, 0, 255), thickness=2)
# compute frame rate # compute frame rate
fps_counter += 1 self.fps_overlay.setText(f"fps = {self.fps_counter.get_fps():.1f}")
delta_t = time.time() - start_time
if delta_t > fps_display_rate:
fps_counter = 0
start_time = time.time()
color_image = cv2.putText(color_image, f"fps = {(fps_counter / delta_t):.2f}", (10, 25), cv2.FONT_HERSHEY_PLAIN, 2,
(0, 255, 255),
thickness=2)
# Show images # Show images
cv2.imshow('RoboRally', color_image) color_image_rgb = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) # convert to RGB
key = cv2.waitKey(1) if display_mode == 'color':
self.img.setImage(np.transpose(np.flipud(color_image_rgb), axes=(1, 0, 2)))
elif display_mode == 'grayscale':
self.img.setImage(np.transpose(np.flipud(gray)))
if key > 0: QtCore.QTimer.singleShot(1, self.process_frame)
def handleKeyPressEvent(self, ev):
key = ev.key()
self.event_queue.put(('key', key)) self.event_queue.put(('key', key))
if key == ord('g'): if key == QtCore.Qt.Key_Q:
self.draw_grid = not self.draw_grid
if key == ord('q'):
running = False
if key == ord('x'):
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
if color_sensor.get_option(rs.option.enable_auto_exposure) == 1.0:
color_sensor.set_option(rs.option.enable_auto_exposure, False)
print("auto exposure OFF")
else:
color_sensor.set_option(rs.option.enable_auto_exposure, True)
print("auto exposure ON")
if key == ord('i'):
invert_grayscale = not invert_grayscale
finally:
cv2.destroyAllWindows()
if self.pipeline is not None: if self.pipeline is not None:
# Stop streaming # Stop streaming
self.pipeline.stop() self.pipeline.stop()
self.app.quit()
elif key == QtCore.Qt.Key_I:
self.invert_grayscale = not self.invert_grayscale
def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image): def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image):
# update the marker estimate with new data # update the marker estimate with new data
@ -377,7 +484,7 @@ class ArucoEstimator:
corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate'] corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate']
corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate'] corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate']
if corner_1_center is not None and corner_2_center is not None: if corner_1_center is not None and corner_2_center is not None:
frame = cv2.line(frame, tuple(corner_1_center), tuple(corner_2_center), color=(0, 0, 255), thickness=2) frame = cv2.line(frame, tuple(corner_1_center.astype(int)), tuple(corner_2_center.astype(int)), color=(0, 0, 255), thickness=2)
return frame return frame
def draw_grid_lines(self, frame, detected_marker_data): def draw_grid_lines(self, frame, detected_marker_data):
@ -400,7 +507,7 @@ class ArucoEstimator:
for x in range(1, self.grid_columns): for x in range(1, self.grid_columns):
column_line_top = a + x / self.grid_columns * vab column_line_top = a + x / self.grid_columns * vab
column_line_bottom = d + x / self.grid_columns * vdc column_line_bottom = d + x / self.grid_columns * vdc
frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(0, 255, 0), frame = cv2.line(frame, tuple(column_line_top.astype(int)), tuple(column_line_bottom.astype(int)), color=(0, 255, 0),
thickness=1) thickness=1)
# draw horizontal lines # draw horizontal lines
@ -409,7 +516,7 @@ class ArucoEstimator:
for y in range(1, self.grid_rows): for y in range(1, self.grid_rows):
row_line_top = a + y / self.grid_rows * vad row_line_top = a + y / self.grid_rows * vad
row_line_bottom = b + y / self.grid_rows * vbc row_line_bottom = b + y / self.grid_rows * vbc
frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(0, 255, 0), frame = cv2.line(frame, tuple(row_line_top.astype(int)), tuple(row_line_bottom.astype(int)), color=(0, 255, 0),
thickness=1) thickness=1)
return frame return frame
@ -446,4 +553,9 @@ class ArucoEstimator:
if __name__ == "__main__": if __name__ == "__main__":
estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14]) estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14])
estimator.run_tracking(draw_markers=True, invert_grayscale=True) estimator.process_frame()
import sys
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()

View File

@ -3,19 +3,40 @@ import time
from robot import ControlledRobot from robot import ControlledRobot
from pid_controller import PIDController from pid_controller import PIDController
from mpc_controller import MPCController
from event_listener import EventListener from event_listener import EventListener
class CommanderBase: class CommanderBase:
def __init__(self): valid_controller_types = {'pid': PIDController,
self.robots = [] 'mpc': MPCController}
self.robots = [ControlledRobot(12, '10.10.11.91')] # , Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
# self.robots = [ControlledRobot(marker_id=13, ip='10.10.11.90'),
# ControlledRobot(marker_id=14, ip='10.10.11.89')]
for r in self.robots: def __init__(self, id_ip_dict, controller_type='pid'):
"""
:param id_ip_dict: dictionary containing robot marker id and ip of the robot, e.g. { 12: '10.10.11.91' }
:param controller_type: string 'pid', 'mpc'; or dictionary with robot id and controller type string, e.g. { 12: 'mpc', 13: 'pid'}
"""
self.robots = {}
for id, ip in id_ip_dict.items():
self.robots[id] = ControlledRobot(id, ip)
for id, r in self.robots.items():
r.connect() r.connect()
r.attach_controller(PIDController())
if type(controller_type) == dict:
for id, ctype in controller_type.items():
if ctype in CommanderBase.valid_controller_types:
self.robots[id].attach_controller(CommanderBase.valid_controller_types[ctype]())
else:
raise Exception(f"invalid controller type {ctype} specified for robot {id}. "
f"valid controller types are {list(CommanderBase.valid_controller_types.keys())}")
elif controller_type in CommanderBase.valid_controller_types:
for id, r in self.robots.items():
r.attach_controller(CommanderBase.valid_controller_types[controller_type]())
else:
raise Exception(f"invalid controller type {controller_type} specified. valid controller types are "
f"{list(CommanderBase.valid_controller_types.keys())}")
self.event_listener = EventListener(event_server=('127.0.0.1', 42424)) self.event_listener = EventListener(event_server=('127.0.0.1', 42424))
@ -24,14 +45,14 @@ class CommanderBase:
self.running = False self.running = False
def run(self): def run(self):
unconnected_robots = list(filter(lambda r: not r.connected, self.robots)) unconnected_robots = list(filter(lambda r: not r.connected, self.robots.values()))
if len(unconnected_robots) > 0: if len(unconnected_robots) > 0:
print(f"warning: could not connect to the following robots: {unconnected_robots}") print(f"warning: could not connect to the following robots: {unconnected_robots}")
return return
all_detected = False all_detected = False
while not all_detected: while not all_detected:
undetected_robots = list(filter(lambda r: None in r.get_measurement(), self.robots)) undetected_robots = list(filter(lambda r: None in r.get_measurement(), self.robots.values()))
all_detected = len(undetected_robots) == 0 all_detected = len(undetected_robots) == 0
if not all_detected: if not all_detected:
print(f"warning: no measurements available for the following robots: {undetected_robots}") print(f"warning: no measurements available for the following robots: {undetected_robots}")
@ -50,23 +71,25 @@ class CommanderBase:
if event[0] == 'click': if event[0] == 'click':
target = event[1] target = event[1]
target_pos = np.array([target['x'], target['y'], target['angle']]) target_pos = np.array([target['x'], target['y'], target['angle']])
self.robots[self.current_robot_index].move_to_pos(target_pos) controlled_robot_id = list(self.robots.keys())[self.current_robot_index]
self.robots[controlled_robot_id].move_to_pos(target_pos)
elif event[0] == 'key': elif event[0] == 'key':
key = event[1] key = event[1]
if key == 32: # arrow up if key == 32: # arrow up
self.controlling = not self.controlling self.controlling = not self.controlling
if not self.controlling: if not self.controlling:
print("disable control") print("disable control")
for r in self.robots: for _, r in self.robots.items():
r.stop_control() r.stop_control()
else: else:
print("enable control") print("enable control")
for r in self.robots: for _, r in self.robots.items():
r.start_control() r.start_control()
elif key == 9: # TAB elif key == 9: # TAB
# switch controlled robot # switch controlled robot
self.current_robot_index = (self.current_robot_index + 1) % len(self.robots) self.current_robot_index = (self.current_robot_index + 1) % len(self.robots)
robot = self.robots[self.current_robot_index] controlled_robot_id = list(self.robots.keys())[self.current_robot_index]
robot = self.robots[controlled_robot_id]
print(f"controlled robot: {robot.id}") print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE elif key == 113 or key == 27: # q or ESCAPE
print("quit!") print("quit!")
@ -76,5 +99,15 @@ class CommanderBase:
if __name__ == '__main__': if __name__ == '__main__':
rc = CommanderBase() id_ip_dict = {
#11: '10.10.11.88',
#12: '10.10.11.91',
#13: '10.10.11.90',
14: '10.10.11.89',
}
# controller_type = {12: 'mpc', 13: 'pid'}
controller_type = 'mpc'
rc = CommanderBase(id_ip_dict, controller_type=controller_type)
rc.run() rc.run()

View File

@ -18,10 +18,11 @@ class ControllerBase:
control = self.compute_control(state) control = self.compute_control(state)
if self.controlling:
self.apply_control(control) self.apply_control(control)
time.sleep(self.control_rate) if self.robot is not None:
self.apply_control((0.0, 0.0)) # stop robot self.robot.send_cmd(u1=0, u2=0) # stop robot
def set_target_position(self, target_pos): def set_target_position(self, target_pos):
self.target_pos = target_pos self.target_pos = target_pos
@ -42,6 +43,7 @@ class ControllerBase:
else: else:
raise Exception("error: controller cannot apply control!\n" raise Exception("error: controller cannot apply control!\n"
" there is no robot attached to the controller!") " there is no robot attached to the controller!")
time.sleep(self.control_rate)
def attach_robot(self, robot): def attach_robot(self, robot):
self.robot = robot self.robot = robot

View File

@ -0,0 +1,62 @@
import socket
import pygame
from argparse import ArgumentParser
class ManualController:
def __init__(self, estimator):
self.estimator = estimator
self.estimator.external_keyboard_callback = self.handle_keypress
self.controlling = False
def handle_keypress(self, key):
pass
parser = ArgumentParser()
parser.add_argument('ip', metavar='ip', type=str, help='ip address of the controlled robot')
args = parser.parse_args()
ip = args.ip
pygame.init()
pygame.display.set_mode((640, 480))
rc_socket = socket.socket()
try:
rc_socket.connect((ip, 1234)) # connect to robot
except socket.error:
print("could not connect to socket")
running = True
while running:
u1 = 0
u2 = 0
vmax = 1.0
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
u1 = -vmax
u2 = vmax
print(f"turn left: ({u1},{u2})")
elif event.key == pygame.K_RIGHT:
u1 = vmax
u2 = -vmax
print(f"turn right: ({u1},{u2})")
elif event.key == pygame.K_UP:
u1 = vmax
u2 = vmax
print(f"forward: ({u1},{u2})")
elif event.key == pygame.K_DOWN:
u1 = -vmax
u2 = -vmax
print(f"backward: ({u1},{u2})")
elif event.key == pygame.K_ESCAPE:
print("quit")
running = False
u1 = 0.0
u2 = 0.0
rc_socket.send(f'({u1},{u2})\n'.encode())
elif event.type == pygame.KEYUP:
print("key released, resetting: ({},{})".format(u1, u2))
rc_socket.send(f'({u1},{u2})\n'.encode())

View File

@ -22,7 +22,13 @@ class MeasurementHandler(socketserver.BaseRequestHandler):
self.request.sendall((json.dumps(event) + '\n').encode()) self.request.sendall((json.dumps(event) + '\n').encode())
self.server.estimator.last_event = None self.server.estimator.last_event = None
time.sleep(1.0 / self.server.max_measurements_per_second) time.sleep(1.0 / self.server.max_measurements_per_second)
elif 'corners' in data.decode():
# send positions of the board markers
corner_estimates = self.server.estimator.corner_estimates
response = {}
for corner, data in corner_estimates.items():
response[corner] = {'x': data['x'], 'y': data['y']}
self.request.sendall((json.dumps(response) + '\n').encode())
else: else:
# send robot position # send robot position
try: try:
@ -57,12 +63,23 @@ class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
if __name__ == "__main__": if __name__ == "__main__":
aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14]) aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[12, 13])
estimator_thread = threading.Thread(target=aruco_estimator.run_tracking, kwargs={'invert_grayscale': True})
estimator_thread.start()
with MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator, # first we start thread for the measurement server
max_measurements_per_second=30) as measurement_server: measurement_server = MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator, max_measurements_per_second=30)
measurement_server.serve_forever() server_thread = threading.Thread(target=measurement_server.serve_forever)
server_thread.start()
# now we start the Aruco estimator GUI
aruco_estimator.process_frame()
import sys
from pyqtgraph.Qt import QtCore, QtGui
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()
#with MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator,
# max_measurements_per_second=30) as measurement_server:
# measurement_server.serve_forever()
# receive with: nc 127.0.0.1 42424 -> 12 + Enter # receive with: nc 127.0.0.1 42424 -> 12 + Enter

View File

@ -0,0 +1,117 @@
from flask import Flask, render_template, request
import socket
import threading
import time
import numpy as np
from robot import Robot
from mpc_controller import MPCController
from aruco_estimator import ArucoEstimator
from shapely.geometry import LineString
width = 600
height = 400
robot_ids = [11, 12, 13, 14]
robot_ids_control_history = [11, 12, 13, 14]
app = Flask(__name__)
def calc_target_pos(px, py):
if rc.estimator.all_corners_detected():
# compute column line
a = np.array([rc.estimator.corner_estimates['a']['x'], rc.estimator.corner_estimates['a']['y']])
b = np.array([rc.estimator.corner_estimates['b']['x'], rc.estimator.corner_estimates['b']['y']])
c = np.array([rc.estimator.corner_estimates['c']['x'], rc.estimator.corner_estimates['c']['y']])
d = np.array([rc.estimator.corner_estimates['d']['x'], rc.estimator.corner_estimates['d']['y']])
x_frac = (px + 0.5) / width
y_frac = (py + 0.5) / height
vab = b - a
vdc = c - d
column_line_top = a + x_frac * vab
column_line_bottom = d + x_frac * vdc
vad = d - a
vbc = c - b
row_line_top = a + y_frac * vad
row_line_bottom = b + y_frac * vbc
column_line = LineString([column_line_top, column_line_bottom])
row_line = LineString([row_line_top, row_line_bottom])
int_pt = column_line.intersection(row_line)
point_of_intersection = np.array([int_pt.x, int_pt.y, 0.0])
return point_of_intersection
@app.route('/', methods=('GET', 'POST'))
def presence():
global robot_ids_control_history
if request.method == 'GET':
return render_template('telepresence.html', oldest_control=robot_ids.index(robot_ids_control_history[0]))
elif request.method == 'POST':
x = int(float(request.form.get('x')))
y = int(float(request.form.get('y')))
print(x, y)
margin = 0.1
x = min(max(width * margin, x), width * (1 - margin))
y = min(max(height * margin, y), height * (1 - margin))
id = int(request.form.get('robot'))
robot_ids_control_history.append(robot_ids_control_history.pop(robot_ids_control_history.index(id)))
target_pos = calc_target_pos(x, y)
rc.estimator.event_queue.put(('robot', id, target_pos))
return 'OK'
class RemoteController:
def __init__(self):
self.robots = []
#self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
self.robots = [Robot(11, '10.10.11.88'), Robot(12, '10.10.11.91'),
Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
# socket for movement commands
self.comm_socket = socket.socket()
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
for robot in self.robots:
robot.connect()
self.comm_socket.bind(('', 1337))
self.comm_socket.listen(5)
self.t = time.time()
# start thread for marker position detection
self.estimator = ArucoEstimator(self.robot_ids.keys())
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking, kwargs={'draw_markers':False})
self.estimator_thread.start()
self.controller = MPCController(self.estimator)
def run(self):
print("waiting until all markers are detected...")
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
pass
print("starting control")
self.controller.interactive_control(robots=self.robots)
if __name__ == "__main__":
rc = RemoteController()
rc_thread = threading.Thread(target=rc.run)
rc_thread.start()
app.run('0.0.0.0')

View File

@ -1,175 +1,62 @@
import numpy as np import numpy as np
import time import time
from controller import ControllerBase
from casadi_opt import OpenLoopSolver from casadi_opt import OpenLoopSolver
class MPCController: class MPCController(ControllerBase):
def __init__(self, estimator): def __init__(self, N=20, T=1.0):
self.t = time.time() super().__init__()
self.t = None
self.estimator = estimator self.ols = OpenLoopSolver(N=N, T=T)
self.controlling = False self.ols.setup()
self.control_rate = self.ols.T / self.ols.N
self.mstep = 2 self.mstep = 2
self.ols = OpenLoopSolver(N=20, T=1.0)
self.ols.setup()
self.dt = self.ols.T / self.ols.N
# integrator # integrator
self.omega_max = 5.0 self.omega_max = 5.0
self.control_scaling = 0.4 self.control_scaling = 0.5
def move_to_pos(self, target_pos, robot, near_target_counter=5): def set_target_position(self, target_pos):
near_target = 0 super(MPCController, self).set_target_position(target_pos)
while near_target < near_target_counter:
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
pass
elif event[0] == 'key':
key = event[1]
if key == 84: # arrow up
self.controlling = True
self.t = time.time() self.t = time.time()
elif key == 82: # arrow down
self.controlling = False
robot.send_cmd()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0])
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 113:
print("quit!")
self.controlling = False
robot.send_cmd()
return
elif key == 27: # escape
print("quit!")
self.controlling = False
robot.send_cmd()
return
x_pred = self.get_measurement(robot.id) def compute_control(self, state):
x_pred = np.array(state[1:])
if x_pred is not None: error_pos = np.linalg.norm(x_pred[0:2] - self.target_pos[0:2])
error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2]) angles_unwrapped = np.unwrap([x_pred[2], self.target_pos[2]]) # unwrap angle to avoid jump in data
angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1]) error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
# print("error pos = ", error_pos)
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
# if error_pos > 0.075 or error_ang > 0.35: if error_pos > 0.05 or error_ang > 0.2:
if error_pos > 0.05 or error_ang > 0.1: #if error_pos > 0.05:
# solve mpc open loop problem # solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos) res = self.ols.solve(x_pred, self.target_pos)
# us1 = res[0]
# us2 = res[1]
us1 = res[0] * self.control_scaling us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling us2 = res[1] * self.control_scaling
# print("u = {}", (us1, us2))
# print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied if dt_mpc < self.control_rate: # wait until next control can be applied
# print("sleeping for {} seconds...".format(self.dt - dt_mpc)) time.sleep(self.control_rate - dt_mpc)
time.sleep(self.dt - dt_mpc)
else: else:
us1 = [0] * self.mstep us1 = [0] * self.mstep
us2 = [0] * self.mstep us2 = [0] * self.mstep
near_target += 1
# send controls to the robot return us1, us2
def apply_control(self, control):
if self.robot is not None:
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1 for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i] u1 = control[0][i]
u2 = us2[i] u2 = control[1][i]
robot.send_cmd(u1, u2) self.robot.send_cmd(u1, u2)
if i < self.mstep: if i < self.mstep:
time.sleep(self.dt) time.sleep(self.control_rate)
self.t = time.time() # save time the most recent control was applied self.t = time.time() # save time the most recent control was applied
else: else:
print("robot not detected yet!") raise Exception("error: controller cannot apply control!\n"
" there is no robot attached to the controller!")
def interactive_control(self, robots):
controlled_robot_number = 0
robot = robots[controlled_robot_number]
target_pos = np.array([0.0, 0.0, 0.0])
running = True
while running:
# handle events from opencv window
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
target_pos = event[1]
elif event[0] == 'key':
key = event[1]
if key == 32: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
robot.send_cmd() # stop robot
else:
print("enable control")
self.t = time.time()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 9: # TAB
# switch controlled robot
robot.send_cmd() # stop current robot
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
robot = robots[controlled_robot_number]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
self.controlling = False
robot.send_cmd()
return
if self.controlling:
# measure state
x_pred = self.get_measurement(robot.id)
# print(np.linalg.norm(x_pred-target_pos))
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
time.sleep(self.dt - dt_mpc)
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
def get_measurement(self, robot_id):
return self.estimator.get_robot_state_estimate(robot_id)

View File

@ -3,17 +3,32 @@ import socket
import threading import threading
import time import time
from robot import Robot from robot import Robot, ControlledRobot
from mpc_controller import MPCController from mpc_controller import MPCController
from aruco_estimator import ArucoEstimator # HOST, PORT = "localhost", 42424
#
# robot_id = 12
#
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# sock.connect((HOST, PORT))
# sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id
# #sock.sendall(f"events\n".encode()) # request events
# receiving = True
# while receiving:
# received = str(sock.recv(1024), "utf-8")
# print("Received: {}".format(received))
# receiving = len(received) > 0
class RemoteController: class RemoteController:
def __init__(self): def __init__(self):
self.robots = [] self.robots = []
#self.robots = [Robot(11, '192.168.1.11'), Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')] #self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
#self.robots = [Robot(12, '10.10.11.91'), Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')]
#self.robots = [Robot(14, '10.10.11.89')]
self.robots = [ControlledRobot(13, '192.168.1.13')]
self.robot_ids = {} self.robot_ids = {}
for r in self.robots: for r in self.robots:
@ -32,18 +47,17 @@ class RemoteController:
self.t = time.time() self.t = time.time()
# start thread for marker position detection # start thread for marker position detection
self.estimator = ArucoEstimator(self.robot_ids.keys()) self.controller = MPCController()
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking) self.robots[0].attach_controller(self.controller)
self.estimator_thread.start()
self.controller = MPCController(self.estimator)
def run(self): def run(self):
print("waiting until all markers are detected...") #print("waiting until all markers are detected...")
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()): #while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
pass # pass
print("starting control") print("starting control")
self.controller.interactive_control(robots=self.robots) #self.controller.interactive_control(robots=self.robots)
self.controller.start()
pass
def main(args): def main(args):
@ -53,3 +67,13 @@ def main(args):
if __name__ == '__main__': if __name__ == '__main__':
main(sys.argv) main(sys.argv)
# to use the program, first start measurement_server.py, which launches the Aruco marker detection GUI and streams
# marker positions and GUI events via socket
# next, launch this program (mpc_test.py) in debug mode and set a breakpoint after self.controller.start()
# you can now set the target position for the controlled robot via move_to_pos(), e.g.
# self.robots[0].move_to_pos([0.0,0,0], True)
# TODO
# figure out how to get clicked positions from measurement server and drive robot there
# also document and improve program structure

View File

@ -10,8 +10,6 @@ class PIDController(ControllerBase):
super().__init__() super().__init__()
self.t = None self.t = None
self.controlling = False
self.P_angle = 0.4 self.P_angle = 0.4
self.I_angle = 0.35 self.I_angle = 0.35
self.D_angle = 0.1 self.D_angle = 0.1
@ -46,7 +44,7 @@ class PIDController(ControllerBase):
if self.t is None: if self.t is None:
dt = 0.1 dt = 0.1
else: else:
dt = time.time() - self.t dt = time.time() - self.t # time since last control was applied
if self.mode == 'angle': if self.mode == 'angle':
# compute angle such that robot faces to target point # compute angle such that robot faces to target point
@ -108,9 +106,11 @@ class PIDController(ControllerBase):
self.e_pos_old = e_pos self.e_pos_old = e_pos
self.e_angle_old = e_angle self.e_angle_old = e_angle
else: else:
u1 = 0.0 u1 = 0.0
u2 = 0.0 u2 = 0.0
self.t = time.time() # save time when the most recent control was applied
return u1, u2 return u1, u2
def apply_control(self, control):
self.t = time.time() # save time when the most recent control was applied
super(PIDController, self).apply_control(control)

View File

@ -1,7 +1,9 @@
import socket import socket
import threading import threading
import json import json
import numpy as np
import time
import math
class Robot: class Robot:
def __init__(self, marker_id, ip, measurement_server=('127.0.0.1', 42424)): def __init__(self, marker_id, ip, measurement_server=('127.0.0.1', 42424)):
@ -123,8 +125,26 @@ class ControlledRobot(Robot):
self.controller = controller self.controller = controller
self.controller.attach_robot(self) self.controller.attach_robot(self)
def move_to_pos(self, target_pos): def move_to_pos(self, target_pos, blocking=False):
if self.controller is not None: if self.controller is not None:
if not blocking:
self.controller.set_target_position(target_pos) self.controller.set_target_position(target_pos)
else: # only return after the robot has reached the target
self.stop_control()
self.controller.set_target_position(target_pos)
self.start_control()
close_to_target = False
while not close_to_target:
current_pos = np.array([self.x, self.y, self.angle])
v = target_pos - current_pos
angles_unwrapped = np.unwrap([current_pos[2], target_pos[2]]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
e_pos = np.linalg.norm(v[0:2])
print(f"e_pos = {e_pos}, e_ang = {e_angle}")
close_to_target = e_pos < 0.05 and abs(e_angle) < 0.1
time.sleep(0.1)
print("target reached!")
self.stop_control()
else: else:
raise Exception("Error: Cannot move to position: there is not controller attached to the robot!") raise Exception("Error: Cannot move to position: there is not controller attached to the robot!")

View File

@ -0,0 +1,85 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Mensa-Abend Telepräsenz</title>
<style>
div.meerschweinchen {
max-width: 600px;
margin: auto;
text-align: center;
}
div.arena {
height: 400px;
width: 600px;
background-image: url("https://p.0au.de/3d62ea7/foo.png");
border: none;
text-align: center;
position: relative
}
div.arena p {
margin: 0;
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%)
}
</style>
<script type=text/javascript src="{{url_for('static', filename='jquery-3.5.1.min.js') }}"></script>
</head>
<body>
<h1 style="text-align: center">Welcome to Mensa-Abend digital</h1>
<div style="text-align: center">
<p>Click anywhere to control the robots in our hackerspace. You can see them in the Zoom video.<br>
<br>
Please let us know about your study background.
</p>
</div>
<div class="meerschweinchen">
<div class=arena style="font-size: large;">
</div>
<form method="post">
<p>Choose your robot:</p>
<div>
<input type="radio" id="r11" name="robot_id" value="11">
<label for="r11">Robot 11</label>
</div>
<div>
<input type="radio" id="r12" name="robot_id" value="12">
<label for="r12">Robot 12</label>
</div>
<div>
<input type="radio" id="r13" name="robot_id" value="13">
<label for="r13">Robot 13</label>
</div>
<div>
<input type="radio" id="r14" name="robot_id" value="14">
<label for="r14">Robot 14</label>
</div>
</form>
</div>
</body>
<script>
function fireThis(e) {
var parentOffset = $(this).parent().offset();
var x = e.pageX - parentOffset.left;
var y = e.pageY - parentOffset.top;
var robot = document.querySelector('input[name = "robot_id"]:checked').value;
$.post( "/", { 'x': x, 'y': y , 'robot': robot} );
}
$('.arena').on('mousedown', fireThis);
function callRandom() {
var array = document.getElementsByName('robot_id');
array[{{ oldest_control }}].checked = true;
}
callRandom();
</script>
</html>