Compare commits

..

No commits in common. "d0c17c0b9108459a6baa7702e63a199df8ab65ff" and "17637f427b26844b97f1b78e8e5a89d63a7e1162" have entirely different histories.

11 changed files with 329 additions and 688 deletions

View File

@ -4,163 +4,39 @@ import cv2
import os
import time
import math
from pyqtgraph.Qt import QtCore, QtGui
from pyqtgraph.parametertree import Parameter, ParameterTree, parameterTypes
import pyqtgraph as pg
from shapely.geometry import LineString
from queue import Queue
import aruco
class FPSCounter:
def __init__(self):
self.fps_start_time = time.time()
self.fps_counter = 0
self.fps_display_rate = 1.0
def get_fps(self):
self.fps_counter += 1
delta_t = time.time() - self.fps_start_time
if delta_t > self.fps_display_rate:
self.fps_counter = 0
self.fps_start_time = time.time()
return self.fps_counter / delta_t
## test add/remove
## this group includes a menu allowing the user to add new parameters into its child list
class RobotMarkerGroup(parameterTypes.GroupParameter):
def __init__(self, **opts):
opts['type'] = 'group'
opts['addText'] = "Add Marker"
opts['addList'] = ['Robot']
parameterTypes.GroupParameter.__init__(self, **opts)
def addNew(self, typ):
current_robots = self.children()
current_indices = [int(r.name().split(' ')[1]) for r in current_robots]
new_index = len(current_indices) + 1
val = 0
self.addChild(dict(name=f"Robot {new_index}", type='int', value=val, removable=False,
renamable=True))
class CornerMarkerGroup(parameterTypes.GroupParameter):
def __init__(self, **opts):
opts['type'] = 'group'
opts['addText'] = "Add Marker"
opts['addList'] = ['Corner']
parameterTypes.GroupParameter.__init__(self, **opts)
def addNew(self, typ):
current_corners = self.children()
current_chars = [str(r.name().split(' ')[1]) for r in current_corners]
new_char = chr(ord(current_chars[-1]) + 1)
val = 0
self.addChild(dict(name=f"Corner {new_char}", type='int', value=val, removable=False))
class ArucoEstimator:
corner_marker_ids = {
'a': 0,
'b': 1,
'c': 2,
'd': 3
}
corner_estimates = {
'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
}
def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8):
self.app = QtGui.QApplication([])
## Create window with GraphicsView widget
self.win = QtGui.QWidget()
self.layout = QtGui.QGridLayout()
self.win.setLayout(self.layout)
self.win.keyPressEvent = self.handleKeyPressEvent
self.win.setWindowTitle('ArucoEstimator')
self.plotwidget = pg.PlotWidget()
self.layout.addWidget(self.plotwidget)
## lock the aspect ratio so pixels are always square
self.plotwidget.setAspectLocked(True)
self.plotwidget.getPlotItem().getAxis('left').hide()
self.plotwidget.getPlotItem().getAxis('bottom').hide()
## Create image item
self.img = pg.ImageItem(border='w')
self.img.setLevels([[0, 255], [0, 255], [0, 255]])
self.img.mouseClickEvent = self.handleMouseEvent
self.plotwidget.addItem(self.img)
# fps display
self.fps_counter = FPSCounter()
self.fps_overlay = pg.TextItem('fps = 0', color=(255, 255, 0), anchor=(0,1))
self.plotwidget.addItem(self.fps_overlay)
self.invert_grayscale = True
self.draw_grid = False
self.draw_markers = True
self.draw_marker_coordinate_system = False
self.corner_marker_size = 0.075
self.corner_marker_ids = {
'a': 0,
'b': 1,
'c': 2,
'd': 3
}
self.corner_estimates = {
'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
}
self.grid_columns = grid_columns
self.grid_rows = grid_rows
self.robot_marker_size = 0.07
if robot_marker_ids is None:
robot_marker_ids = []
self.robot_marker_ids = robot_marker_ids
self.robot_marker_estimates = dict([(marker_id, {'t': None, 'x': None, 'y': None, 'angle': None})
for marker_id in self.robot_marker_ids])
robot_marker_group = [{'name': f'Robot {ind}', 'type': 'int', 'value': marker_id} for ind, marker_id in
enumerate(self.robot_marker_ids, 1)]
corner_marker_group = [{'name': f'Corner {letter}', 'type': 'int', 'value': marker_id} for letter, marker_id in
self.corner_marker_ids.items()]
self.threshold = 10.0
# parameter
params_spec = [
{'name': 'Corner marker size', 'type': 'float', 'value': self.corner_marker_size, 'siPrefix': True,
'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001},
{'name': 'Robot marker size', 'type': 'float', 'value': self.robot_marker_size, 'siPrefix': True,
'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001},
{'name': 'Draw markers', 'type': 'bool', 'value': self.draw_markers},
{'name': 'Draw marker coordinate system', 'type': 'bool', 'value': self.draw_marker_coordinate_system},
{'name': 'Threshold', 'type': 'float', 'value': self.threshold},
{'name': 'Invert grayscale', 'type': 'bool', 'value': self.invert_grayscale, 'tip': "Invert grayscale image before marker detection"},
{'name': 'Show FPS', 'type': 'bool', 'value': True, 'tip': "Display frames per second counter"},
{'name': 'Draw grid', 'type': 'bool', 'value': self.draw_grid, 'tip': "Draw grid spanned by the markers 0 - 3"},
{'name': 'Grid columns', 'type': 'int', 'value': self.grid_columns, 'tip': "Number of columns for the grid"},
{'name': 'Grid rows', 'type': 'int', 'value': self.grid_rows, 'tip': "Number of rows for the grid"},
{'name': 'Display mode', 'type': 'list', 'values': ['color', 'grayscale'], 'value': 'color', 'tip': "Display mode for the video"},
{'name': 'Autoexposure', 'type': 'bool', 'value': True},
RobotMarkerGroup(name="Robot markers", children=robot_marker_group),
CornerMarkerGroup(name="Corner markers", children=corner_marker_group),
]
self.params = Parameter.create(name='params', type='group', children=params_spec)
self.params.param('Invert grayscale').sigValueChanged.connect(lambda _, v: self.__setattr__('invert_grayscale', v))
self.params.param('Threshold').sigValueChanged.connect(lambda _, v: self.__setattr__('threshold', v))
self.params.param('Draw markers').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_markers', v))
self.params.param('Draw marker coordinate system').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_marker_coordinate_system', v))
self.params.param('Draw grid').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_grid', v))
self.params.param('Grid columns').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_columns', v))
self.params.param('Grid rows').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_rows', v))
self.params.param('Corner marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('corner_marker_size', v))
self.params.param('Robot marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('robot_marker_size', v))
self.params.param('Show FPS').sigValueChanged.connect(lambda _, v: self.fps_overlay.show() if v else self.fps_overlay.hide())
self.params.param('Autoexposure').sigValueChanged.connect(lambda _, v: self.set_autoexposure(v))
self.paramtree = ParameterTree()
self.paramtree.setParameters(self.params, showTop=False)
self.layout.addWidget(self.paramtree)
self.win.show() ## show widget alone in its own window
self.draw_grid = False
self.event_queue = Queue()
@ -174,8 +50,9 @@ class ArucoEstimator:
# Start streaming
self.pipeline.start(config)
# enable auto exposure
self.set_autoexposure(self.params['Autoexposure'])
# disable auto exposure
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
color_sensor.set_option(rs.option.enable_auto_exposure, False)
camera_intrinsics = self.pipeline.get_active_profile().get_stream(
rs.stream.color).as_video_stream_profile().get_intrinsics()
@ -192,13 +69,16 @@ class ArucoEstimator:
self.pipeline = None
# create detector and get parameters
self.detector = aruco.MarkerDetector()
self.detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.05)
self.detector_params = self.detector.getParameters()
# print detector parameters
# print("detector params:")
# print(self.detector_params)
# for val in dir(self.detector_params):
# if not val.startswith("__"):
# print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
print("detector params:")
for val in dir(self.detector_params):
if not val.startswith("__"):
print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
self.camparam = aruco.CameraParameters()
if use_realsense:
@ -210,12 +90,6 @@ class ArucoEstimator:
self.drag_line_start = None
self.previous_click = None
def set_autoexposure(self, v):
if self.pipeline is not None:
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
color_sensor.set_option(rs.option.enable_auto_exposure, v)
print(color_sensor.get_option(rs.option.enable_auto_exposure))
def compute_clicked_position(self, px, py):
if self.all_corners_detected():
# inter/extrapolate from clicked point to marker position
@ -242,16 +116,21 @@ class ArucoEstimator:
target = np.array([px, -py])
return target
def handleMouseEvent(self, event):
# get click position as distance to top-left corner of the image
px = event.pos().x()
py = self.img.height() - event.pos().y()
print(f"px = {px}, py = {py}")
if event.button() == QtCore.Qt.MouseButton.LeftButton:
# self.drag_line_start = (px, py)
# elif event == cv2.EVENT_LBUTTONUP:
self.drag_line_end = (px, py)
def mouse_callback(self, event, px, py, flags, param):
"""
This function is called for each mouse event inside the opencv window.
If there are reference markers available we compute the real world position corresponding to the clicked
position and put it in an event queue.
:param event: type of event (mouse movement, left click, right click, etc.)
:param px: x-position of event
:param py: y-position of event
"""
target = None
if event == cv2.EVENT_LBUTTONDOWN:
self.drag_line_start = (px, py)
elif event == cv2.EVENT_LBUTTONUP:
self.drag_line_end = (px, py)
target_pos = self.compute_clicked_position(self.drag_line_start[0], self.drag_line_start[1])
if self.drag_line_start != self.drag_line_end:
# compute target angle for clicked position
@ -276,91 +155,105 @@ class ArucoEstimator:
if self.drag_line_start is not None:
self.drag_line_end = (px, py)
def process_frame(self):
if self.pipeline:
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
def run_tracking(self, draw_markers=True, draw_marker_coordinate_system=False, invert_grayscale=False):
"""
Run the marker tracking in a loop
"""
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
cv2.setMouseCallback('RoboRally', self.mouse_callback)
# Convert images to numpy arrays
color_image = np.asanyarray(color_frame.get_data())
else:
# Capture frame-by-frame
ret, color_image = self.cv_camera.read()
t_image = time.time()
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
if self.invert_grayscale:
cv2.bitwise_not(gray, gray)
detector = aruco.MarkerDetector()
detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.01)
#detector_params = detector.getParameters()
# run aruco marker detection
detected_markers = detector.detect(gray)
# detected_markers2 = detector.detect(gray)
#gray = detector.getThresholdedImage()
display_mode = self.params.param('Display mode').value()
#print(f"detected_markers = {[marker.id for marker in detected_markers]}")
#print("threshold = ", self.threshold)
# extract data for detected markers
detected_marker_data = {}
for marker in detected_markers:
if marker.id >= 0:
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
if marker.id in self.corner_marker_ids.values():
marker.calculateExtrinsics(self.corner_marker_size, self.camparam)
fps_display_rate = 1 # displays the frame rate every 1 second
fps_counter = 0
start_time = time.time()
try:
running = True
while running:
if self.pipeline:
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
if not color_frame:
continue
# Convert images to numpy arrays
color_image = np.asanyarray(color_frame.get_data())
else:
marker.calculateExtrinsics(self.robot_marker_size, self.camparam)
detected_marker_data[marker.id]['Rvec'] = marker.Rvec
detected_marker_data[marker.id]['Tvec'] = marker.Tvec
# Capture frame-by-frame
ret, color_image = self.cv_camera.read()
t_image = time.time()
if self.draw_markers:
marker.draw(color_image, np.array([255, 255, 255]), 2, True)
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
if invert_grayscale:
cv2.bitwise_not(gray, gray)
if self.draw_marker_coordinate_system:
aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
# run aruco marker detection
detected_markers = self.detector.detect(gray)
# store data
for marker_id, data in detected_marker_data.items():
self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image)
# extract data for detected markers
detected_marker_data = {}
for marker in detected_markers:
if marker.id >= 0:
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
# draw grid
if self.draw_grid:
color_image = self.draw_grid_lines(color_image, detected_marker_data)
color_image = self.draw_robot_pos(color_image, detected_marker_data)
if marker.id in self.corner_marker_ids.values():
marker.calculateExtrinsics(0.075, self.camparam)
else:
marker.calculateExtrinsics(0.07, self.camparam)
detected_marker_data[marker.id]['Rvec'] = marker.Rvec
detected_marker_data[marker.id]['Tvec'] = marker.Tvec
# draw drag
if self.drag_line_start is not None and self.drag_line_end is not None:
color_image = cv2.line(color_image, self.drag_line_start, self.drag_line_end, color=(0, 0, 255), thickness=2)
if draw_markers:
marker.draw(color_image, np.array([255, 255, 255]), 2, True)
# compute frame rate
self.fps_overlay.setText(f"fps = {self.fps_counter.get_fps():.1f}")
if draw_marker_coordinate_system:
aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
# Show images
color_image_rgb = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) # convert to RGB
if display_mode == 'color':
self.img.setImage(np.transpose(np.flipud(color_image_rgb), axes=(1, 0, 2)))
elif display_mode == 'grayscale':
self.img.setImage(np.transpose(np.flipud(gray)))
# store data
for marker_id, data in detected_marker_data.items():
self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image)
QtCore.QTimer.singleShot(1, self.process_frame)
# draw grid
if self.draw_grid:
color_image = self.draw_grid_lines(color_image, detected_marker_data)
color_image = self.draw_robot_pos(color_image, detected_marker_data)
def handleKeyPressEvent(self, ev):
key = ev.key()
self.event_queue.put(('key', key))
if key == QtCore.Qt.Key_Q:
# draw drag
if self.drag_line_start is not None and self.drag_line_end is not None:
color_image = cv2.line(color_image, self.drag_line_start, self.drag_line_end, color=(0, 0, 255), thickness=2)
# compute frame rate
fps_counter += 1
delta_t = time.time() - start_time
if delta_t > fps_display_rate:
fps_counter = 0
start_time = time.time()
color_image = cv2.putText(color_image, f"fps = {(fps_counter / delta_t):.2f}", (10, 25), cv2.FONT_HERSHEY_PLAIN, 2,
(0, 255, 255),
thickness=2)
# Show images
cv2.imshow('RoboRally', color_image)
key = cv2.waitKey(1)
if key > 0:
self.event_queue.put(('key', key))
if key == ord('g'):
self.draw_grid = not self.draw_grid
if key == ord('q'):
running = False
if key == ord('x'):
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
if color_sensor.get_option(rs.option.enable_auto_exposure) == 1.0:
color_sensor.set_option(rs.option.enable_auto_exposure, False)
print("auto exposure OFF")
else:
color_sensor.set_option(rs.option.enable_auto_exposure, True)
print("auto exposure ON")
if key == ord('i'):
invert_grayscale = not invert_grayscale
finally:
cv2.destroyAllWindows()
if self.pipeline is not None:
# Stop streaming
self.pipeline.stop()
self.app.quit()
elif key == QtCore.Qt.Key_I:
self.invert_grayscale = not self.invert_grayscale
def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image):
# update the marker estimate with new data
@ -484,7 +377,7 @@ class ArucoEstimator:
corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate']
corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate']
if corner_1_center is not None and corner_2_center is not None:
frame = cv2.line(frame, tuple(corner_1_center.astype(int)), tuple(corner_2_center.astype(int)), color=(0, 0, 255), thickness=2)
frame = cv2.line(frame, tuple(corner_1_center), tuple(corner_2_center), color=(0, 0, 255), thickness=2)
return frame
def draw_grid_lines(self, frame, detected_marker_data):
@ -507,7 +400,7 @@ class ArucoEstimator:
for x in range(1, self.grid_columns):
column_line_top = a + x / self.grid_columns * vab
column_line_bottom = d + x / self.grid_columns * vdc
frame = cv2.line(frame, tuple(column_line_top.astype(int)), tuple(column_line_bottom.astype(int)), color=(0, 255, 0),
frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(0, 255, 0),
thickness=1)
# draw horizontal lines
@ -516,7 +409,7 @@ class ArucoEstimator:
for y in range(1, self.grid_rows):
row_line_top = a + y / self.grid_rows * vad
row_line_bottom = b + y / self.grid_rows * vbc
frame = cv2.line(frame, tuple(row_line_top.astype(int)), tuple(row_line_bottom.astype(int)), color=(0, 255, 0),
frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(0, 255, 0),
thickness=1)
return frame
@ -553,9 +446,4 @@ class ArucoEstimator:
if __name__ == "__main__":
estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14])
estimator.process_frame()
import sys
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()
estimator.run_tracking(draw_markers=True, invert_grayscale=True)

View File

@ -3,40 +3,19 @@ import time
from robot import ControlledRobot
from pid_controller import PIDController
from mpc_controller import MPCController
from event_listener import EventListener
class CommanderBase:
valid_controller_types = {'pid': PIDController,
'mpc': MPCController}
def __init__(self):
self.robots = []
self.robots = [ControlledRobot(12, '10.10.11.91')] # , Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
# self.robots = [ControlledRobot(marker_id=13, ip='10.10.11.90'),
# ControlledRobot(marker_id=14, ip='10.10.11.89')]
def __init__(self, id_ip_dict, controller_type='pid'):
"""
:param id_ip_dict: dictionary containing robot marker id and ip of the robot, e.g. { 12: '10.10.11.91' }
:param controller_type: string 'pid', 'mpc'; or dictionary with robot id and controller type string, e.g. { 12: 'mpc', 13: 'pid'}
"""
self.robots = {}
for id, ip in id_ip_dict.items():
self.robots[id] = ControlledRobot(id, ip)
for id, r in self.robots.items():
for r in self.robots:
r.connect()
if type(controller_type) == dict:
for id, ctype in controller_type.items():
if ctype in CommanderBase.valid_controller_types:
self.robots[id].attach_controller(CommanderBase.valid_controller_types[ctype]())
else:
raise Exception(f"invalid controller type {ctype} specified for robot {id}. "
f"valid controller types are {list(CommanderBase.valid_controller_types.keys())}")
elif controller_type in CommanderBase.valid_controller_types:
for id, r in self.robots.items():
r.attach_controller(CommanderBase.valid_controller_types[controller_type]())
else:
raise Exception(f"invalid controller type {controller_type} specified. valid controller types are "
f"{list(CommanderBase.valid_controller_types.keys())}")
r.attach_controller(PIDController())
self.event_listener = EventListener(event_server=('127.0.0.1', 42424))
@ -45,14 +24,14 @@ class CommanderBase:
self.running = False
def run(self):
unconnected_robots = list(filter(lambda r: not r.connected, self.robots.values()))
unconnected_robots = list(filter(lambda r: not r.connected, self.robots))
if len(unconnected_robots) > 0:
print(f"warning: could not connect to the following robots: {unconnected_robots}")
return
all_detected = False
while not all_detected:
undetected_robots = list(filter(lambda r: None in r.get_measurement(), self.robots.values()))
undetected_robots = list(filter(lambda r: None in r.get_measurement(), self.robots))
all_detected = len(undetected_robots) == 0
if not all_detected:
print(f"warning: no measurements available for the following robots: {undetected_robots}")
@ -71,25 +50,23 @@ class CommanderBase:
if event[0] == 'click':
target = event[1]
target_pos = np.array([target['x'], target['y'], target['angle']])
controlled_robot_id = list(self.robots.keys())[self.current_robot_index]
self.robots[controlled_robot_id].move_to_pos(target_pos)
self.robots[self.current_robot_index].move_to_pos(target_pos)
elif event[0] == 'key':
key = event[1]
if key == 32: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
for _, r in self.robots.items():
for r in self.robots:
r.stop_control()
else:
print("enable control")
for _, r in self.robots.items():
for r in self.robots:
r.start_control()
elif key == 9: # TAB
# switch controlled robot
self.current_robot_index = (self.current_robot_index + 1) % len(self.robots)
controlled_robot_id = list(self.robots.keys())[self.current_robot_index]
robot = self.robots[controlled_robot_id]
robot = self.robots[self.current_robot_index]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
@ -99,15 +76,5 @@ class CommanderBase:
if __name__ == '__main__':
id_ip_dict = {
#11: '10.10.11.88',
#12: '10.10.11.91',
#13: '10.10.11.90',
14: '10.10.11.89',
}
# controller_type = {12: 'mpc', 13: 'pid'}
controller_type = 'mpc'
rc = CommanderBase(id_ip_dict, controller_type=controller_type)
rc = CommanderBase()
rc.run()

View File

@ -18,11 +18,10 @@ class ControllerBase:
control = self.compute_control(state)
if self.controlling:
self.apply_control(control)
self.apply_control(control)
if self.robot is not None:
self.robot.send_cmd(u1=0, u2=0) # stop robot
time.sleep(self.control_rate)
self.apply_control((0.0, 0.0)) # stop robot
def set_target_position(self, target_pos):
self.target_pos = target_pos
@ -43,7 +42,6 @@ class ControllerBase:
else:
raise Exception("error: controller cannot apply control!\n"
" there is no robot attached to the controller!")
time.sleep(self.control_rate)
def attach_robot(self, robot):
self.robot = robot

View File

@ -1,62 +0,0 @@
import socket
import pygame
from argparse import ArgumentParser
class ManualController:
def __init__(self, estimator):
self.estimator = estimator
self.estimator.external_keyboard_callback = self.handle_keypress
self.controlling = False
def handle_keypress(self, key):
pass
parser = ArgumentParser()
parser.add_argument('ip', metavar='ip', type=str, help='ip address of the controlled robot')
args = parser.parse_args()
ip = args.ip
pygame.init()
pygame.display.set_mode((640, 480))
rc_socket = socket.socket()
try:
rc_socket.connect((ip, 1234)) # connect to robot
except socket.error:
print("could not connect to socket")
running = True
while running:
u1 = 0
u2 = 0
vmax = 1.0
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
u1 = -vmax
u2 = vmax
print(f"turn left: ({u1},{u2})")
elif event.key == pygame.K_RIGHT:
u1 = vmax
u2 = -vmax
print(f"turn right: ({u1},{u2})")
elif event.key == pygame.K_UP:
u1 = vmax
u2 = vmax
print(f"forward: ({u1},{u2})")
elif event.key == pygame.K_DOWN:
u1 = -vmax
u2 = -vmax
print(f"backward: ({u1},{u2})")
elif event.key == pygame.K_ESCAPE:
print("quit")
running = False
u1 = 0.0
u2 = 0.0
rc_socket.send(f'({u1},{u2})\n'.encode())
elif event.type == pygame.KEYUP:
print("key released, resetting: ({},{})".format(u1, u2))
rc_socket.send(f'({u1},{u2})\n'.encode())

View File

@ -22,13 +22,7 @@ class MeasurementHandler(socketserver.BaseRequestHandler):
self.request.sendall((json.dumps(event) + '\n').encode())
self.server.estimator.last_event = None
time.sleep(1.0 / self.server.max_measurements_per_second)
elif 'corners' in data.decode():
# send positions of the board markers
corner_estimates = self.server.estimator.corner_estimates
response = {}
for corner, data in corner_estimates.items():
response[corner] = {'x': data['x'], 'y': data['y']}
self.request.sendall((json.dumps(response) + '\n').encode())
else:
# send robot position
try:
@ -63,23 +57,12 @@ class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
if __name__ == "__main__":
aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[12, 13])
aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14])
estimator_thread = threading.Thread(target=aruco_estimator.run_tracking, kwargs={'invert_grayscale': True})
estimator_thread.start()
# first we start thread for the measurement server
measurement_server = MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator, max_measurements_per_second=30)
server_thread = threading.Thread(target=measurement_server.serve_forever)
server_thread.start()
# now we start the Aruco estimator GUI
aruco_estimator.process_frame()
import sys
from pyqtgraph.Qt import QtCore, QtGui
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()
#with MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator,
# max_measurements_per_second=30) as measurement_server:
# measurement_server.serve_forever()
with MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator,
max_measurements_per_second=30) as measurement_server:
measurement_server.serve_forever()
# receive with: nc 127.0.0.1 42424 -> 12 + Enter

View File

@ -1,117 +0,0 @@
from flask import Flask, render_template, request
import socket
import threading
import time
import numpy as np
from robot import Robot
from mpc_controller import MPCController
from aruco_estimator import ArucoEstimator
from shapely.geometry import LineString
width = 600
height = 400
robot_ids = [11, 12, 13, 14]
robot_ids_control_history = [11, 12, 13, 14]
app = Flask(__name__)
def calc_target_pos(px, py):
if rc.estimator.all_corners_detected():
# compute column line
a = np.array([rc.estimator.corner_estimates['a']['x'], rc.estimator.corner_estimates['a']['y']])
b = np.array([rc.estimator.corner_estimates['b']['x'], rc.estimator.corner_estimates['b']['y']])
c = np.array([rc.estimator.corner_estimates['c']['x'], rc.estimator.corner_estimates['c']['y']])
d = np.array([rc.estimator.corner_estimates['d']['x'], rc.estimator.corner_estimates['d']['y']])
x_frac = (px + 0.5) / width
y_frac = (py + 0.5) / height
vab = b - a
vdc = c - d
column_line_top = a + x_frac * vab
column_line_bottom = d + x_frac * vdc
vad = d - a
vbc = c - b
row_line_top = a + y_frac * vad
row_line_bottom = b + y_frac * vbc
column_line = LineString([column_line_top, column_line_bottom])
row_line = LineString([row_line_top, row_line_bottom])
int_pt = column_line.intersection(row_line)
point_of_intersection = np.array([int_pt.x, int_pt.y, 0.0])
return point_of_intersection
@app.route('/', methods=('GET', 'POST'))
def presence():
global robot_ids_control_history
if request.method == 'GET':
return render_template('telepresence.html', oldest_control=robot_ids.index(robot_ids_control_history[0]))
elif request.method == 'POST':
x = int(float(request.form.get('x')))
y = int(float(request.form.get('y')))
print(x, y)
margin = 0.1
x = min(max(width * margin, x), width * (1 - margin))
y = min(max(height * margin, y), height * (1 - margin))
id = int(request.form.get('robot'))
robot_ids_control_history.append(robot_ids_control_history.pop(robot_ids_control_history.index(id)))
target_pos = calc_target_pos(x, y)
rc.estimator.event_queue.put(('robot', id, target_pos))
return 'OK'
class RemoteController:
def __init__(self):
self.robots = []
#self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
self.robots = [Robot(11, '10.10.11.88'), Robot(12, '10.10.11.91'),
Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
# socket for movement commands
self.comm_socket = socket.socket()
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
for robot in self.robots:
robot.connect()
self.comm_socket.bind(('', 1337))
self.comm_socket.listen(5)
self.t = time.time()
# start thread for marker position detection
self.estimator = ArucoEstimator(self.robot_ids.keys())
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking, kwargs={'draw_markers':False})
self.estimator_thread.start()
self.controller = MPCController(self.estimator)
def run(self):
print("waiting until all markers are detected...")
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
pass
print("starting control")
self.controller.interactive_control(robots=self.robots)
if __name__ == "__main__":
rc = RemoteController()
rc_thread = threading.Thread(target=rc.run)
rc_thread.start()
app.run('0.0.0.0')

View File

@ -1,62 +1,175 @@
import numpy as np
import time
from controller import ControllerBase
from casadi_opt import OpenLoopSolver
class MPCController(ControllerBase):
def __init__(self, N=20, T=1.0):
super().__init__()
self.t = None
class MPCController:
def __init__(self, estimator):
self.t = time.time()
self.ols = OpenLoopSolver(N=N, T=T)
self.ols.setup()
self.control_rate = self.ols.T / self.ols.N
self.estimator = estimator
self.controlling = False
self.mstep = 2
self.ols = OpenLoopSolver(N=20, T=1.0)
self.ols.setup()
self.dt = self.ols.T / self.ols.N
# integrator
self.omega_max = 5.0
self.control_scaling = 0.5
self.control_scaling = 0.4
def set_target_position(self, target_pos):
super(MPCController, self).set_target_position(target_pos)
self.t = time.time()
def move_to_pos(self, target_pos, robot, near_target_counter=5):
near_target = 0
while near_target < near_target_counter:
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
pass
elif event[0] == 'key':
key = event[1]
def compute_control(self, state):
x_pred = np.array(state[1:])
if key == 84: # arrow up
self.controlling = True
self.t = time.time()
elif key == 82: # arrow down
self.controlling = False
robot.send_cmd()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0])
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 113:
print("quit!")
self.controlling = False
robot.send_cmd()
return
elif key == 27: # escape
print("quit!")
self.controlling = False
robot.send_cmd()
return
error_pos = np.linalg.norm(x_pred[0:2] - self.target_pos[0:2])
angles_unwrapped = np.unwrap([x_pred[2], self.target_pos[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
x_pred = self.get_measurement(robot.id)
if error_pos > 0.05 or error_ang > 0.2:
#if error_pos > 0.05:
# solve mpc open loop problem
res = self.ols.solve(x_pred, self.target_pos)
if x_pred is not None:
error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
# print("error pos = ", error_pos)
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
# if error_pos > 0.075 or error_ang > 0.35:
if error_pos > 0.05 or error_ang > 0.1:
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
dt_mpc = time.time() - self.t
if dt_mpc < self.control_rate: # wait until next control can be applied
time.sleep(self.control_rate - dt_mpc)
else:
us1 = [0] * self.mstep
us2 = [0] * self.mstep
# us1 = res[0]
# us2 = res[1]
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
# print("u = {}", (us1, us2))
return us1, us2
# print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
# print("sleeping for {} seconds...".format(self.dt - dt_mpc))
time.sleep(self.dt - dt_mpc)
else:
us1 = [0] * self.mstep
us2 = [0] * self.mstep
near_target += 1
def apply_control(self, control):
if self.robot is not None:
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = control[0][i]
u2 = control[1][i]
self.robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.control_rate)
self.t = time.time() # save time the most recent control was applied
else:
raise Exception("error: controller cannot apply control!\n"
" there is no robot attached to the controller!")
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
else:
print("robot not detected yet!")
def interactive_control(self, robots):
controlled_robot_number = 0
robot = robots[controlled_robot_number]
target_pos = np.array([0.0, 0.0, 0.0])
running = True
while running:
# handle events from opencv window
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
target_pos = event[1]
elif event[0] == 'key':
key = event[1]
if key == 32: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
robot.send_cmd() # stop robot
else:
print("enable control")
self.t = time.time()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 9: # TAB
# switch controlled robot
robot.send_cmd() # stop current robot
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
robot = robots[controlled_robot_number]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
self.controlling = False
robot.send_cmd()
return
if self.controlling:
# measure state
x_pred = self.get_measurement(robot.id)
# print(np.linalg.norm(x_pred-target_pos))
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
time.sleep(self.dt - dt_mpc)
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
def get_measurement(self, robot_id):
return self.estimator.get_robot_state_estimate(robot_id)

View File

@ -3,32 +3,17 @@ import socket
import threading
import time
from robot import Robot, ControlledRobot
from robot import Robot
from mpc_controller import MPCController
# HOST, PORT = "localhost", 42424
#
# robot_id = 12
#
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# sock.connect((HOST, PORT))
# sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id
# #sock.sendall(f"events\n".encode()) # request events
# receiving = True
# while receiving:
# received = str(sock.recv(1024), "utf-8")
# print("Received: {}".format(received))
# receiving = len(received) > 0
from aruco_estimator import ArucoEstimator
class RemoteController:
def __init__(self):
self.robots = []
#self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
#self.robots = [Robot(12, '10.10.11.91'), Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')]
#self.robots = [Robot(14, '10.10.11.89')]
self.robots = [ControlledRobot(13, '192.168.1.13')]
#self.robots = [Robot(11, '192.168.1.11'), Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
self.robot_ids = {}
for r in self.robots:
@ -47,17 +32,18 @@ class RemoteController:
self.t = time.time()
# start thread for marker position detection
self.controller = MPCController()
self.robots[0].attach_controller(self.controller)
self.estimator = ArucoEstimator(self.robot_ids.keys())
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking)
self.estimator_thread.start()
self.controller = MPCController(self.estimator)
def run(self):
#print("waiting until all markers are detected...")
#while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
# pass
print("waiting until all markers are detected...")
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
pass
print("starting control")
#self.controller.interactive_control(robots=self.robots)
self.controller.start()
pass
self.controller.interactive_control(robots=self.robots)
def main(args):
@ -67,13 +53,3 @@ def main(args):
if __name__ == '__main__':
main(sys.argv)
# to use the program, first start measurement_server.py, which launches the Aruco marker detection GUI and streams
# marker positions and GUI events via socket
# next, launch this program (mpc_test.py) in debug mode and set a breakpoint after self.controller.start()
# you can now set the target position for the controlled robot via move_to_pos(), e.g.
# self.robots[0].move_to_pos([0.0,0,0], True)
# TODO
# figure out how to get clicked positions from measurement server and drive robot there
# also document and improve program structure

View File

@ -10,6 +10,8 @@ class PIDController(ControllerBase):
super().__init__()
self.t = None
self.controlling = False
self.P_angle = 0.4
self.I_angle = 0.35
self.D_angle = 0.1
@ -44,7 +46,7 @@ class PIDController(ControllerBase):
if self.t is None:
dt = 0.1
else:
dt = time.time() - self.t # time since last control was applied
dt = time.time() - self.t
if self.mode == 'angle':
# compute angle such that robot faces to target point
@ -106,11 +108,9 @@ class PIDController(ControllerBase):
self.e_pos_old = e_pos
self.e_angle_old = e_angle
else:
u1 = 0.0
u2 = 0.0
return u1, u2
def apply_control(self, control):
self.t = time.time() # save time when the most recent control was applied
super(PIDController, self).apply_control(control)
return u1, u2

View File

@ -1,9 +1,7 @@
import socket
import threading
import json
import numpy as np
import time
import math
class Robot:
def __init__(self, marker_id, ip, measurement_server=('127.0.0.1', 42424)):
@ -125,26 +123,8 @@ class ControlledRobot(Robot):
self.controller = controller
self.controller.attach_robot(self)
def move_to_pos(self, target_pos, blocking=False):
def move_to_pos(self, target_pos):
if self.controller is not None:
if not blocking:
self.controller.set_target_position(target_pos)
else: # only return after the robot has reached the target
self.stop_control()
self.controller.set_target_position(target_pos)
self.start_control()
close_to_target = False
while not close_to_target:
current_pos = np.array([self.x, self.y, self.angle])
v = target_pos - current_pos
angles_unwrapped = np.unwrap([current_pos[2], target_pos[2]]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
e_pos = np.linalg.norm(v[0:2])
print(f"e_pos = {e_pos}, e_ang = {e_angle}")
close_to_target = e_pos < 0.05 and abs(e_angle) < 0.1
time.sleep(0.1)
print("target reached!")
self.stop_control()
self.controller.set_target_position(target_pos)
else:
raise Exception("Error: Cannot move to position: there is not controller attached to the robot!")

View File

@ -1,85 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Mensa-Abend Telepräsenz</title>
<style>
div.meerschweinchen {
max-width: 600px;
margin: auto;
text-align: center;
}
div.arena {
height: 400px;
width: 600px;
background-image: url("https://p.0au.de/3d62ea7/foo.png");
border: none;
text-align: center;
position: relative
}
div.arena p {
margin: 0;
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%)
}
</style>
<script type=text/javascript src="{{url_for('static', filename='jquery-3.5.1.min.js') }}"></script>
</head>
<body>
<h1 style="text-align: center">Welcome to Mensa-Abend digital</h1>
<div style="text-align: center">
<p>Click anywhere to control the robots in our hackerspace. You can see them in the Zoom video.<br>
<br>
Please let us know about your study background.
</p>
</div>
<div class="meerschweinchen">
<div class=arena style="font-size: large;">
</div>
<form method="post">
<p>Choose your robot:</p>
<div>
<input type="radio" id="r11" name="robot_id" value="11">
<label for="r11">Robot 11</label>
</div>
<div>
<input type="radio" id="r12" name="robot_id" value="12">
<label for="r12">Robot 12</label>
</div>
<div>
<input type="radio" id="r13" name="robot_id" value="13">
<label for="r13">Robot 13</label>
</div>
<div>
<input type="radio" id="r14" name="robot_id" value="14">
<label for="r14">Robot 14</label>
</div>
</form>
</div>
</body>
<script>
function fireThis(e) {
var parentOffset = $(this).parent().offset();
var x = e.pageX - parentOffset.left;
var y = e.pageY - parentOffset.top;
var robot = document.querySelector('input[name = "robot_id"]:checked').value;
$.post( "/", { 'x': x, 'y': y , 'robot': robot} );
}
$('.arena').on('mousedown', fireThis);
function callRandom() {
var array = document.getElementsByName('robot_id');
array[{{ oldest_control }}].checked = true;
}
callRandom();
</script>
</html>