Compare commits
37 Commits
c2fad13698
...
a872232880
Author | SHA1 | Date | |
---|---|---|---|
a872232880 | |||
91cde26908 | |||
993d4c0141 | |||
119d857531 | |||
8728db82e8 | |||
a0f701fe83 | |||
7ded3bee79 | |||
dd162018d8 | |||
a22a3fdea3 | |||
ab6cc79eea | |||
7f194f57f2 | |||
409a9980bd | |||
9e1ce9b512 | |||
3bd11a53ce | |||
02a83f405f | |||
ec6b2bbf4a | |||
9bb4f2920f | |||
0acc59cc53 | |||
280fb10427 | |||
0bc6b68a20 | |||
e93ae65e0f | |||
056d91da52 | |||
3ab8fadc7b | |||
49645765d7 | |||
e3753f7644 | |||
32892e5dcf | |||
149e9b3536 | |||
36f3ccbd6c | |||
e7ab92dfe2 | |||
c98760a2c7 | |||
280aee75ee | |||
826fa4be0f | |||
6fb3cd5484 | |||
d170524370 | |||
0c59a77b28 | |||
c1b7640d03 | |||
aab1b78e8f |
|
@ -10,6 +10,10 @@ m0.speed(5000)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import ustruct
|
import ustruct
|
||||||
|
import utime
|
||||||
|
import math
|
||||||
|
|
||||||
|
sign = lambda x: math.copysign(1, x)
|
||||||
|
|
||||||
|
|
||||||
_STATE_BRAKE = const(0)
|
_STATE_BRAKE = const(0)
|
||||||
|
@ -49,6 +53,10 @@ class Motor:
|
||||||
def speed(self, speed=None):
|
def speed(self, speed=None):
|
||||||
if speed is None:
|
if speed is None:
|
||||||
return self._speed
|
return self._speed
|
||||||
|
if abs(speed) < 30 and abs(speed) > 10: # kick-start for small speeds
|
||||||
|
self.speed(int(100 * sign(speed)))
|
||||||
|
utime.sleep_ms(5)
|
||||||
|
|
||||||
if speed > 0:
|
if speed > 0:
|
||||||
self._speed = min(10000, max(1, speed))
|
self._speed = min(10000, max(1, speed))
|
||||||
self._state = _STATE_RIGHT
|
self._state = _STATE_RIGHT
|
||||||
|
|
|
@ -2,7 +2,7 @@ import machine
|
||||||
import sys
|
import sys
|
||||||
from machine import I2C, Pin
|
from machine import I2C, Pin
|
||||||
|
|
||||||
i2c = True
|
i2c = False
|
||||||
if i2c:
|
if i2c:
|
||||||
import d1motor
|
import d1motor
|
||||||
else:
|
else:
|
||||||
|
@ -100,5 +100,5 @@ class Robot:
|
||||||
del comm_socket
|
del comm_socket
|
||||||
del socket
|
del socket
|
||||||
|
|
||||||
wall_e = Robot()
|
wall_e = Robot(i2c=i2c)
|
||||||
wall_e.remote_control()
|
wall_e.remote_control()
|
||||||
|
|
400
remote_control/aruco_estimator.py
Normal file
400
remote_control/aruco_estimator.py
Normal file
|
@ -0,0 +1,400 @@
|
||||||
|
import pyrealsense2 as rs
|
||||||
|
import numpy as np
|
||||||
|
import cv2
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
from shapely.geometry import LineString
|
||||||
|
from queue import Queue
|
||||||
|
|
||||||
|
import aruco
|
||||||
|
|
||||||
|
class ArucoEstimator:
|
||||||
|
corner_marker_ids = {
|
||||||
|
'a': 0,
|
||||||
|
'b': 1,
|
||||||
|
'c': 2,
|
||||||
|
'd': 3
|
||||||
|
}
|
||||||
|
|
||||||
|
corner_estimates = {
|
||||||
|
'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
|
||||||
|
'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
|
||||||
|
'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
|
||||||
|
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8):
|
||||||
|
self.grid_columns = grid_columns
|
||||||
|
self.grid_rows = grid_rows
|
||||||
|
|
||||||
|
if robot_marker_ids is None:
|
||||||
|
robot_marker_ids = []
|
||||||
|
self.robot_marker_ids = robot_marker_ids
|
||||||
|
self.robot_marker_estimates = dict([(marker_id, {'t': None, 'x': None, 'y': None, 'angle': None})
|
||||||
|
for marker_id in self.robot_marker_ids])
|
||||||
|
|
||||||
|
self.draw_grid = False
|
||||||
|
|
||||||
|
self.event_queue = Queue()
|
||||||
|
|
||||||
|
if use_realsense: # check if realsense camera is connected
|
||||||
|
# Configure depth and color streams
|
||||||
|
self.pipeline = rs.pipeline()
|
||||||
|
config = rs.config()
|
||||||
|
#config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30)
|
||||||
|
config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
|
||||||
|
|
||||||
|
# Start streaming
|
||||||
|
self.pipeline.start(config)
|
||||||
|
|
||||||
|
# disable auto exposure
|
||||||
|
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
|
||||||
|
color_sensor.set_option(rs.option.enable_auto_exposure, False)
|
||||||
|
|
||||||
|
camera_intrinsics = self.pipeline.get_active_profile().get_stream(
|
||||||
|
rs.stream.color).as_video_stream_profile().get_intrinsics()
|
||||||
|
self.camera_matrix = np.zeros((3, 3))
|
||||||
|
self.camera_matrix[0][0] = camera_intrinsics.fx
|
||||||
|
self.camera_matrix[1][1] = camera_intrinsics.fy
|
||||||
|
self.camera_matrix[0][2] = camera_intrinsics.ppx
|
||||||
|
self.camera_matrix[1][2] = camera_intrinsics.ppy
|
||||||
|
self.dist_coeffs = np.array(camera_intrinsics.coeffs)
|
||||||
|
# more info: https://dev.intelrealsense.com/docs/projection-in-intel-realsense-sdk-20
|
||||||
|
else:
|
||||||
|
# use other camera
|
||||||
|
self.cv_camera = cv2.VideoCapture(0)
|
||||||
|
self.pipeline = None
|
||||||
|
|
||||||
|
# create detector and get parameters
|
||||||
|
self.detector = aruco.MarkerDetector()
|
||||||
|
# self.detector.setDictionary('ARUCO_MIP_36h12')
|
||||||
|
#self.detector.setDictionary('ARUCO_MIP_16h3')
|
||||||
|
# self.detector.setDictionary('ARUCO')
|
||||||
|
#self.detector.setDetectionMode(aruco.DM_NORMAL, 0.05)
|
||||||
|
self.detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.05)
|
||||||
|
|
||||||
|
self.detector_params = self.detector.getParameters()
|
||||||
|
|
||||||
|
# print detector parameters
|
||||||
|
print("detector params:")
|
||||||
|
for val in dir(self.detector_params):
|
||||||
|
if not val.startswith("__"):
|
||||||
|
print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
|
||||||
|
|
||||||
|
self.camparam = aruco.CameraParameters()
|
||||||
|
if use_realsense:
|
||||||
|
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "realsense.yml"))
|
||||||
|
else:
|
||||||
|
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "dfk72_6mm_param2.yml"))
|
||||||
|
print(self.camparam)
|
||||||
|
|
||||||
|
def mouse_callback(self, event, px, py, flags, param):
|
||||||
|
"""
|
||||||
|
This function is called for each mouse event inside the opencv window.
|
||||||
|
If there are reference markers available we compute the real world position corresponding to the clicked
|
||||||
|
position and put it in an event queue.
|
||||||
|
:param event: type of event (mouse movement, left click, right click, etc.)
|
||||||
|
:param px: x-position of event
|
||||||
|
:param py: y-position of event
|
||||||
|
"""
|
||||||
|
if event == cv2.EVENT_LBUTTONDOWN:
|
||||||
|
if self.all_corners_detected():
|
||||||
|
# inter/extrapolate from clicked point to marker position
|
||||||
|
px1 = self.corner_estimates['a']['pixel_coordinate'][0]
|
||||||
|
px3 = self.corner_estimates['c']['pixel_coordinate'][0]
|
||||||
|
py1 = self.corner_estimates['a']['pixel_coordinate'][1]
|
||||||
|
py3 = self.corner_estimates['c']['pixel_coordinate'][1]
|
||||||
|
|
||||||
|
x1 = self.corner_estimates['a']['x']
|
||||||
|
x3 = self.corner_estimates['c']['x']
|
||||||
|
y1 = self.corner_estimates['a']['y']
|
||||||
|
y3 = self.corner_estimates['c']['y']
|
||||||
|
|
||||||
|
alpha = (px - px1) / (px3 - px1)
|
||||||
|
beta = (py - py1) / (py3 - py1)
|
||||||
|
|
||||||
|
print(f"alpha = {alpha}, beta = {beta}")
|
||||||
|
|
||||||
|
target_x = x1 + alpha * (x3 - x1)
|
||||||
|
target_y = y1 + beta * (y3 - y1)
|
||||||
|
target = np.array([target_x, target_y, 0])
|
||||||
|
|
||||||
|
print(f"target = ({target_x},{target_y})")
|
||||||
|
self.event_queue.put(('click', target))
|
||||||
|
else:
|
||||||
|
print("not all markers have been detected!")
|
||||||
|
|
||||||
|
def run_tracking(self, draw_markers=True, draw_marker_coordinate_system=False, invert_grayscale=False):
|
||||||
|
"""
|
||||||
|
Run the marker tracking in a loop
|
||||||
|
"""
|
||||||
|
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
|
||||||
|
cv2.setMouseCallback('RoboRally', self.mouse_callback)
|
||||||
|
|
||||||
|
try:
|
||||||
|
running = True
|
||||||
|
while running:
|
||||||
|
if self.pipeline:
|
||||||
|
frames = self.pipeline.wait_for_frames()
|
||||||
|
color_frame = frames.get_color_frame()
|
||||||
|
if not color_frame:
|
||||||
|
continue
|
||||||
|
# Convert images to numpy arrays
|
||||||
|
color_image = np.asanyarray(color_frame.get_data())
|
||||||
|
else:
|
||||||
|
# Capture frame-by-frame
|
||||||
|
ret, color_image = self.cv_camera.read()
|
||||||
|
t_image = time.time()
|
||||||
|
|
||||||
|
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
|
||||||
|
if invert_grayscale:
|
||||||
|
cv2.bitwise_not(gray, gray)
|
||||||
|
|
||||||
|
# run aruco marker detection
|
||||||
|
detected_markers = self.detector.detect(gray)
|
||||||
|
|
||||||
|
# extract data for detected markers
|
||||||
|
detected_marker_data = {}
|
||||||
|
for marker in detected_markers:
|
||||||
|
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
|
||||||
|
if marker.id >= 0:
|
||||||
|
if marker.id in self.corner_marker_ids.values():
|
||||||
|
marker.calculateExtrinsics(0.1, self.camparam)
|
||||||
|
else:
|
||||||
|
marker.calculateExtrinsics(0.07, self.camparam)
|
||||||
|
detected_marker_data[marker.id]['Rvec'] = marker.Rvec
|
||||||
|
detected_marker_data[marker.id]['Tvec'] = marker.Tvec
|
||||||
|
|
||||||
|
if marker.id >= 0: # draw markers onto the image
|
||||||
|
if draw_markers:
|
||||||
|
marker.draw(color_image, np.array([255, 255, 255]), 2, True)
|
||||||
|
|
||||||
|
if draw_marker_coordinate_system:
|
||||||
|
aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
|
||||||
|
|
||||||
|
# store data
|
||||||
|
for marker_id, data in detected_marker_data.items():
|
||||||
|
self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image)
|
||||||
|
|
||||||
|
# draw grid
|
||||||
|
if self.draw_grid:
|
||||||
|
color_image = self.draw_grid_lines(color_image, detected_marker_data)
|
||||||
|
color_image = self.draw_robot_pos(color_image, detected_marker_data)
|
||||||
|
|
||||||
|
# Show images
|
||||||
|
cv2.imshow('RoboRally', color_image)
|
||||||
|
key = cv2.waitKey(1)
|
||||||
|
|
||||||
|
if key > 0:
|
||||||
|
self.event_queue.put(('key', key))
|
||||||
|
if key == ord('g'):
|
||||||
|
self.draw_grid = not self.draw_grid
|
||||||
|
if key == ord('q'):
|
||||||
|
running = False
|
||||||
|
if key == ord('a'):
|
||||||
|
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
|
||||||
|
if color_sensor.get_option(rs.option.enable_auto_exposure) == 1.0:
|
||||||
|
color_sensor.set_option(rs.option.enable_auto_exposure, False)
|
||||||
|
print("auto exposure OFF")
|
||||||
|
else:
|
||||||
|
color_sensor.set_option(rs.option.enable_auto_exposure, True)
|
||||||
|
print("auto exposure ON")
|
||||||
|
finally:
|
||||||
|
cv2.destroyAllWindows()
|
||||||
|
if self.pipeline is not None:
|
||||||
|
# Stop streaming
|
||||||
|
self.pipeline.stop()
|
||||||
|
|
||||||
|
def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image):
|
||||||
|
# update the marker estimate with new data
|
||||||
|
if marker_id in self.corner_marker_ids.values():
|
||||||
|
# for corner markers we compute the mean of all measurements s.t. the position stabilizes over time
|
||||||
|
# (we assume the markers do not move)
|
||||||
|
|
||||||
|
# get corresponding corner to the detected marker
|
||||||
|
corner = next(filter(lambda key: self.corner_marker_ids[key] == marker_id, self.corner_marker_ids.keys()))
|
||||||
|
old_estimate_x = self.corner_estimates[corner]['x']
|
||||||
|
old_estimate_y = self.corner_estimates[corner]['y']
|
||||||
|
n_estimates = self.corner_estimates[corner]['n_estimates']
|
||||||
|
|
||||||
|
x = tvec[0][0]
|
||||||
|
y = -tvec[1][0] # flip y coordinate
|
||||||
|
if not any([old_estimate_x is None, old_estimate_y is None]):
|
||||||
|
new_estimate_x = (n_estimates * old_estimate_x + x) / (n_estimates + 1) # weighted update
|
||||||
|
new_estimate_y = (n_estimates * old_estimate_y + y) / (n_estimates + 1)
|
||||||
|
else:
|
||||||
|
new_estimate_x = x # first estimate
|
||||||
|
new_estimate_y = y
|
||||||
|
self.corner_estimates[corner]['t'] = t_image
|
||||||
|
self.corner_estimates[corner]['x'] = new_estimate_x
|
||||||
|
self.corner_estimates[corner]['y'] = new_estimate_y
|
||||||
|
self.corner_estimates[corner]['n_estimates'] = n_estimates + 1
|
||||||
|
self.corner_estimates[corner]['pixel_coordinate'] = pixel_coord_center
|
||||||
|
|
||||||
|
elif marker_id in self.robot_marker_ids:
|
||||||
|
# for robot markers we extract x and y position as well as the angle
|
||||||
|
# here we could also implement a filter
|
||||||
|
x = tvec[0][0]
|
||||||
|
y = -tvec[1][0] # flip y coordinate
|
||||||
|
|
||||||
|
# compute angle
|
||||||
|
rot_mat, _ = cv2.Rodrigues(rvec)
|
||||||
|
pose_mat = cv2.hconcat((rot_mat, tvec))
|
||||||
|
_, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat)
|
||||||
|
angle = -euler_angles[2][0] * np.pi / 180.0
|
||||||
|
|
||||||
|
self.robot_marker_estimates[marker_id] = {'t': t_image, 'x': x, 'y': y, 'angle': angle}
|
||||||
|
|
||||||
|
def all_corners_detected(self):
|
||||||
|
# checks if all corner markers have been detected at least once
|
||||||
|
return not any([estimate['n_estimates'] == 0 for estimate in self.corner_estimates.values()])
|
||||||
|
|
||||||
|
def all_robots_detected(self):
|
||||||
|
# checks if all robot markers have been detected at least once
|
||||||
|
return not any([estimate['t'] is None for estimate in self.robot_marker_estimates.values()])
|
||||||
|
|
||||||
|
def get_pos_from_grid_point(self, x, y, orientation=None):
|
||||||
|
"""
|
||||||
|
returns the position for the given grid point based on the current corner estimates
|
||||||
|
:param x: x position on the grid ( 0 &le x < number of grid columns)
|
||||||
|
:param y: y position on the grid ( 0 &le x < number of grid rows)
|
||||||
|
:param orientation: (optional) orientation in the given grid cell (one of ^, >, v, < )
|
||||||
|
:return: numpy array with corresponding real world x- and y-position
|
||||||
|
if orientation was specified the array also contains the matching angle for the orientation
|
||||||
|
"""
|
||||||
|
assert 0 <= x < self.grid_columns
|
||||||
|
assert 0 <= y < self.grid_rows
|
||||||
|
assert self.all_corners_detected()
|
||||||
|
|
||||||
|
# compute column line
|
||||||
|
a = np.array([self.corner_estimates['a']['x'], self.corner_estimates['a']['y']])
|
||||||
|
b = np.array([self.corner_estimates['b']['x'], self.corner_estimates['b']['y']])
|
||||||
|
c = np.array([self.corner_estimates['c']['x'], self.corner_estimates['c']['y']])
|
||||||
|
d = np.array([self.corner_estimates['d']['x'], self.corner_estimates['d']['y']])
|
||||||
|
x_frac = (x + 0.5) / self.grid_columns
|
||||||
|
y_frac = (y + 0.5) / self.grid_rows
|
||||||
|
|
||||||
|
vab = b - a
|
||||||
|
vdc = c - d
|
||||||
|
column_line_top = a + x_frac * vab
|
||||||
|
column_line_bottom = d + x_frac * vdc
|
||||||
|
|
||||||
|
vad = d - a
|
||||||
|
vbc = c - b
|
||||||
|
row_line_top = a + y_frac * vad
|
||||||
|
row_line_bottom = b + y_frac * vbc
|
||||||
|
|
||||||
|
column_line = LineString([column_line_top, column_line_bottom])
|
||||||
|
row_line = LineString([row_line_top, row_line_bottom])
|
||||||
|
|
||||||
|
int_pt = column_line.intersection(row_line)
|
||||||
|
point_of_intersection = np.array([int_pt.x, int_pt.y])
|
||||||
|
|
||||||
|
if orientation is not None:
|
||||||
|
# compute angle corresponding to the orientation w.r.t. the grid
|
||||||
|
# TODO: test this code
|
||||||
|
angle_ab = np.arctan2(vab[1], vab[0])
|
||||||
|
angle_dc = np.arctan2(vdc[1], vdc[0])
|
||||||
|
|
||||||
|
angle_ad = np.arctan2(vad[1], vad[0])
|
||||||
|
angle_bc = np.arctan2(vbc[1], vbc[0])
|
||||||
|
|
||||||
|
angle = 0.0
|
||||||
|
if orientation == '>':
|
||||||
|
angle = y_frac * angle_ab + (1 - y_frac) * angle_dc
|
||||||
|
elif orientation == '<':
|
||||||
|
angle = y_frac * angle_ab + (1 - y_frac) * angle_dc + np.pi
|
||||||
|
elif orientation == 'v':
|
||||||
|
angle = x_frac * angle_ad + (1 - x_frac) * angle_bc
|
||||||
|
elif orientation == '^':
|
||||||
|
angle = x_frac * angle_ad + (1 - x_frac) * angle_bc + np.pi
|
||||||
|
|
||||||
|
return np.array((point_of_intersection[0], point_of_intersection[1], angle))
|
||||||
|
else:
|
||||||
|
return point_of_intersection
|
||||||
|
|
||||||
|
def get_grid_point_from_pos(self):
|
||||||
|
# TODO return the nearest grid point for the given position estimate
|
||||||
|
pass
|
||||||
|
|
||||||
|
def print_corner_estimates(self):
|
||||||
|
for key, value in self.corner_estimates.items():
|
||||||
|
if value['n_estimates'] != 0:
|
||||||
|
print(f"corner marker {key} at pos ({value['x']},{value['y']})")
|
||||||
|
|
||||||
|
def draw_corner_line(self, frame, corner_1, corner_2):
|
||||||
|
# draws a line between the given markers onto the given frame
|
||||||
|
corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate']
|
||||||
|
corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate']
|
||||||
|
if corner_1_center is not None and corner_2_center is not None:
|
||||||
|
frame = cv2.line(frame, tuple(corner_1_center), tuple(corner_2_center), color=(0, 0, 255), thickness=2)
|
||||||
|
return frame
|
||||||
|
|
||||||
|
def draw_grid_lines(self, frame, detected_marker_data):
|
||||||
|
# draws a grid onto the given frame
|
||||||
|
frame = self.draw_corner_line(frame, 'a', 'b')
|
||||||
|
frame = self.draw_corner_line(frame, 'b', 'c')
|
||||||
|
frame = self.draw_corner_line(frame, 'c', 'd')
|
||||||
|
frame = self.draw_corner_line(frame, 'd', 'a')
|
||||||
|
|
||||||
|
if not any([estimate['pixel_coordinate'] is None for estimate in self.corner_estimates.values()]):
|
||||||
|
# compute outlines of board
|
||||||
|
a = self.corner_estimates['a']['pixel_coordinate']
|
||||||
|
b = self.corner_estimates['b']['pixel_coordinate']
|
||||||
|
c = self.corner_estimates['c']['pixel_coordinate']
|
||||||
|
d = self.corner_estimates['d']['pixel_coordinate']
|
||||||
|
|
||||||
|
# draw vertical lines
|
||||||
|
vab = b - a
|
||||||
|
vdc = c - d
|
||||||
|
for x in range(1, self.grid_columns):
|
||||||
|
column_line_top = a + x / self.grid_columns * vab
|
||||||
|
column_line_bottom = d + x / self.grid_columns * vdc
|
||||||
|
frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(0, 255, 0),
|
||||||
|
thickness=1)
|
||||||
|
|
||||||
|
# draw horizontal lines
|
||||||
|
vad = d - a
|
||||||
|
vbc = c - b
|
||||||
|
for y in range(1, self.grid_rows):
|
||||||
|
row_line_top = a + y / self.grid_rows * vad
|
||||||
|
row_line_bottom = b + y / self.grid_rows * vbc
|
||||||
|
frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(0, 255, 0),
|
||||||
|
thickness=1)
|
||||||
|
|
||||||
|
return frame
|
||||||
|
|
||||||
|
def get_robot_state_estimate(self, marker_id):
|
||||||
|
if marker_id in self.robot_marker_estimates:
|
||||||
|
if self.robot_marker_estimates[marker_id]['t'] is not None:
|
||||||
|
return np.array([self.robot_marker_estimates[marker_id]['x'],
|
||||||
|
self.robot_marker_estimates[marker_id]['y'],
|
||||||
|
self.robot_marker_estimates[marker_id]['angle']])
|
||||||
|
else:
|
||||||
|
print(f"error: no estimate available for robot {marker_id}")
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
print(f"error: invalid robot id {marker_id}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def draw_robot_pos(self, frame, detected_marker_data):
|
||||||
|
# draws information about the robot positions onto the given frame
|
||||||
|
robot_corners_pixel_coords = {}
|
||||||
|
for marker_id, estimate in self.robot_marker_estimates.items():
|
||||||
|
if marker_id in detected_marker_data.keys():
|
||||||
|
robot_corners_pixel_coords[marker_id] = tuple(detected_marker_data[marker_id]['marker_center'])
|
||||||
|
|
||||||
|
for marker_id, coord in robot_corners_pixel_coords.items():
|
||||||
|
x = self.robot_marker_estimates[marker_id]['x']
|
||||||
|
y = self.robot_marker_estimates[marker_id]['y']
|
||||||
|
angle = self.robot_marker_estimates[marker_id]['angle']
|
||||||
|
#frame = cv2.putText(frame, "pos = ({:5.3f}, {:5.3f}), ang = {:5.3f}".format(x, y, angle), coord,
|
||||||
|
# cv2.FONT_HERSHEY_SIMPLEX, 0.50, (0, 255, 0))
|
||||||
|
frame = cv2.putText(frame, f"{marker_id}", coord, cv2.FONT_HERSHEY_PLAIN, 2, (0, 255, 255), thickness=4)
|
||||||
|
return frame
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14])
|
||||||
|
estimator.run_tracking(draw_markers=True, invert_grayscale=True)
|
|
@ -3,6 +3,9 @@ import time
|
||||||
|
|
||||||
# look at: https://github.com/casadi/casadi/blob/master/docs/examples/python/vdp_indirect_multiple_shooting.py
|
# look at: https://github.com/casadi/casadi/blob/master/docs/examples/python/vdp_indirect_multiple_shooting.py
|
||||||
|
|
||||||
|
# TODO for roborally
|
||||||
|
# implement essential movements: forward, backward, turn left, turn right on grid-based level
|
||||||
|
|
||||||
class OpenLoopSolver:
|
class OpenLoopSolver:
|
||||||
def __init__(self, N=20, T=4.0):
|
def __init__(self, N=20, T=4.0):
|
||||||
self.T = T
|
self.T = T
|
||||||
|
@ -40,7 +43,8 @@ class OpenLoopSolver:
|
||||||
|
|
||||||
# cost functional
|
# cost functional
|
||||||
target = (-0.0, 0.0)
|
target = (-0.0, 0.0)
|
||||||
L = (x-target[0]) ** 2 + (y-target[1]) ** 2 + 1e-2 * theta ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
|
L = (x-target[0]) ** 2 + (y-target[1]) ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
|
||||||
|
#L = (x-target[0]) ** 2 + (y-target[1]) ** 2 + 1e-2 * theta ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
|
||||||
|
|
||||||
# Fixed step Runge-Kutta 4 integrator
|
# Fixed step Runge-Kutta 4 integrator
|
||||||
M = 4 # RK4 steps per interval
|
M = 4 # RK4 steps per interval
|
||||||
|
@ -138,7 +142,12 @@ class OpenLoopSolver:
|
||||||
#return
|
#return
|
||||||
|
|
||||||
|
|
||||||
def solve(self, x0, target, obstacles, track):
|
def solve(self, x0, target):
|
||||||
|
|
||||||
|
angles_unwrapped = np.unwrap([x0[2], target[2]]) # unwrap angle to avoid jump in data
|
||||||
|
x0[2] = angles_unwrapped[0]
|
||||||
|
target[2] = angles_unwrapped[1]
|
||||||
|
|
||||||
tstart = time.time()
|
tstart = time.time()
|
||||||
# alternative solution using multiple shooting (way faster!)
|
# alternative solution using multiple shooting (way faster!)
|
||||||
self.opti = Opti() # Optimization problem
|
self.opti = Opti() # Optimization problem
|
||||||
|
@ -186,7 +195,11 @@ class OpenLoopSolver:
|
||||||
theta)) * omegal * omega_max
|
theta)) * omegal * omega_max
|
||||||
f3 = -(r / (2 * R) * omegar - r / (2 * R) * omegal) * omega_max
|
f3 = -(r / (2 * R) * omegar - r / (2 * R) * omegal) * omega_max
|
||||||
xdot = vertcat(f1, f2, f3)
|
xdot = vertcat(f1, f2, f3)
|
||||||
L = (x - target[0]) ** 2 + (y - target[1]) ** 2 + 1e-2 * (theta - target[2]) ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
|
|
||||||
|
#L = 100 * (x - target[0]) ** 2 + 100 * (y - target[1]) ** 2 + 1e-1 * (theta - target[2])**2 + 1e-2 * (omegar ** 2 + omegal ** 2)
|
||||||
|
L = 100 * (x - target[0]) ** 2 + 100 * (y - target[1]) ** 2 + 1e-1 * (target[2] - theta) ** 2 + 1e-2 * (
|
||||||
|
omegar ** 2 + omegal ** 2)
|
||||||
|
#L = (x - target[0]) ** 2 + (y - target[1]) ** 2 + 1e-2 * (theta - target[2]) ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
|
||||||
self.f = Function('f', [state, control], [xdot, L])
|
self.f = Function('f', [state, control], [xdot, L])
|
||||||
# ---- solve NLP ------
|
# ---- solve NLP ------
|
||||||
|
|
||||||
|
@ -222,31 +235,13 @@ class OpenLoopSolver:
|
||||||
self.opti.subject_to(self.Q[:, 0] == 0.0)
|
self.opti.subject_to(self.Q[:, 0] == 0.0)
|
||||||
|
|
||||||
|
|
||||||
solver = self.opti.solver("ipopt", {}, {"print_level": 0})
|
solver = self.opti.solver("ipopt", {'print_time': False}, {"print_level": 0})
|
||||||
|
|
||||||
# ---- misc. constraints ----------
|
# ---- misc. constraints ----------
|
||||||
# self.opti.subject_to(X[1,:]>=0) # Time must be positive
|
# self.opti.subject_to(X[1,:]>=0) # Time must be positive
|
||||||
# self.opti.subject_to(X[2,:]<=4) # Time must be positive
|
# self.opti.subject_to(X[2,:]<=4) # Time must be positive
|
||||||
# self.opti.subject_to(X[2,:]>=-2) # Time must be positive
|
# self.opti.subject_to(X[2,:]>=-2) # Time must be positive
|
||||||
|
|
||||||
# avoid obstacle
|
|
||||||
for o in obstacles:
|
|
||||||
p = obstacles[o].pos
|
|
||||||
r = obstacles[o].radius
|
|
||||||
if p is not None:
|
|
||||||
for k in range(1,self.N):
|
|
||||||
self.opti.subject_to((self.X[0,k]-p[0])**2 + (self.X[1,k]-p[1])**2 + self.slack > r**2)
|
|
||||||
# keep inside track
|
|
||||||
# TODO
|
|
||||||
# track_ids = track.inner.keys()
|
|
||||||
# a = track.outer[track_ids[0]]
|
|
||||||
# b = track.outer[track_ids[1]]
|
|
||||||
# c = track.outer[track_ids[2]]
|
|
||||||
# d = track.outer[track_ids[3]]
|
|
||||||
#
|
|
||||||
# for k in range(1, self.N):
|
|
||||||
# self.opti.subject_to(self.opti.subject_to(self.X))
|
|
||||||
|
|
||||||
posx = self.X[0, :]
|
posx = self.X[0, :]
|
||||||
posy = self.X[1, :]
|
posy = self.X[1, :]
|
||||||
angle = self.X[2, :]
|
angle = self.X[2, :]
|
||||||
|
@ -256,7 +251,7 @@ class OpenLoopSolver:
|
||||||
self.opti.subject_to(angle[0] == x0[2]) # finish line at position 1
|
self.opti.subject_to(angle[0] == x0[2]) # finish line at position 1
|
||||||
tend = time.time()
|
tend = time.time()
|
||||||
|
|
||||||
print("setting up problem took {} seconds".format(tend - tstart))
|
#print("setting up problem took {} seconds".format(tend - tstart))
|
||||||
|
|
||||||
tstart = time.time()
|
tstart = time.time()
|
||||||
if self.use_warmstart and self.opti_x0 is not None:
|
if self.use_warmstart and self.opti_x0 is not None:
|
||||||
|
@ -267,7 +262,7 @@ class OpenLoopSolver:
|
||||||
print("could not set warmstart")
|
print("could not set warmstart")
|
||||||
sol = self.opti.solve() # actual solve
|
sol = self.opti.solve() # actual solve
|
||||||
tend = time.time()
|
tend = time.time()
|
||||||
print("solving the problem took {} seconds".format(tend - tstart))
|
#print("solving the problem took {} seconds".format(tend - tstart))
|
||||||
|
|
||||||
tstart = time.time()
|
tstart = time.time()
|
||||||
self.opti_x0 = sol.value(self.opti.x)
|
self.opti_x0 = sol.value(self.opti.x)
|
||||||
|
@ -278,7 +273,7 @@ class OpenLoopSolver:
|
||||||
u_opt_1 = sol.value(self.U[0,:])
|
u_opt_1 = sol.value(self.U[0,:])
|
||||||
u_opt_2 = sol.value(self.U[1,:])
|
u_opt_2 = sol.value(self.U[1,:])
|
||||||
tend = time.time()
|
tend = time.time()
|
||||||
print("postprocessing took {} seconds".format(tend - tstart))
|
#print("postprocessing took {} seconds".format(tend - tstart))
|
||||||
|
|
||||||
return (u_opt_1, u_opt_2, sol.value(posx), sol.value(posy))
|
return (u_opt_1, u_opt_2, sol.value(posx), sol.value(posy))
|
||||||
|
|
||||||
|
|
|
@ -28,27 +28,27 @@ while running:
|
||||||
if event.key == pygame.K_LEFT:
|
if event.key == pygame.K_LEFT:
|
||||||
u1 = -vmax
|
u1 = -vmax
|
||||||
u2 = vmax
|
u2 = vmax
|
||||||
print("turn left: ({},{})".format(u1, u2))
|
print(f"turn left: ({u1},{u2})")
|
||||||
elif event.key == pygame.K_RIGHT:
|
elif event.key == pygame.K_RIGHT:
|
||||||
u1 = vmax
|
u1 = vmax
|
||||||
u2 = -vmax
|
u2 = -vmax
|
||||||
print("turn right: ({},{})".format(u1, u2))
|
print(f"turn right: ({u1},{u2})")
|
||||||
elif event.key == pygame.K_UP:
|
elif event.key == pygame.K_UP:
|
||||||
u1 = vmax
|
u1 = vmax
|
||||||
u2 = vmax
|
u2 = vmax
|
||||||
print("forward: ({},{})".format(u1, u2))
|
print(f"forward: ({u1},{u2})")
|
||||||
elif event.key == pygame.K_DOWN:
|
elif event.key == pygame.K_DOWN:
|
||||||
u1 = -vmax
|
u1 = -vmax
|
||||||
u2 = -vmax
|
u2 = -vmax
|
||||||
print("backward: ({},{})".format(u1, u2))
|
print(f"backward: ({u1},{u2})")
|
||||||
elif event.key == pygame.K_ESCAPE:
|
elif event.key == pygame.K_ESCAPE:
|
||||||
print("quit")
|
print("quit")
|
||||||
running = False
|
running = False
|
||||||
u1 = 0.0
|
u1 = 0.0
|
||||||
u2 = 0.0
|
u2 = 0.0
|
||||||
rc_socket.send('({},{})\n'.format(u1, u2))
|
rc_socket.send(f'({u1},{u2})\n'.encode())
|
||||||
elif event.type == pygame.KEYUP:
|
elif event.type == pygame.KEYUP:
|
||||||
print("key released, resetting: ({},{})".format(u1, u2))
|
print("key released, resetting: ({},{})".format(u1, u2))
|
||||||
rc_socket.send('({},{})\n'.format(u1, u2))
|
rc_socket.send(f'({u1},{u2})\n'.encode())
|
||||||
|
|
||||||
|
|
||||||
|
|
13
remote_control/measurement_client.py
Normal file
13
remote_control/measurement_client.py
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
import socket
|
||||||
|
|
||||||
|
HOST, PORT = "localhost", 42424
|
||||||
|
|
||||||
|
robot_id = 15
|
||||||
|
|
||||||
|
# SOCK_DGRAM is the socket type to use for UDP sockets
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
|
||||||
|
sock.sendto(f"{robot_id}\n".encode(), (HOST, PORT))
|
||||||
|
while True:
|
||||||
|
received = str(sock.recv(1024), "utf-8")
|
||||||
|
print("Received: {}".format(received))
|
46
remote_control/measurement_server.py
Normal file
46
remote_control/measurement_server.py
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
import socketserver
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
from aruco_estimator import ArucoEstimator
|
||||||
|
|
||||||
|
|
||||||
|
class MeasurementHandler(socketserver.BaseRequestHandler):
|
||||||
|
def handle(self) -> None:
|
||||||
|
data = self.request[0]
|
||||||
|
socket = self.request[1]
|
||||||
|
cur_thread = threading.current_thread()
|
||||||
|
print(f"current thread {cur_thread}")
|
||||||
|
try:
|
||||||
|
marker_id = int(data)
|
||||||
|
if marker_id in self.server.estimator.robot_marker_estimates:
|
||||||
|
while True:
|
||||||
|
socket.sendto(f"{self.server.estimator.robot_marker_estimates[marker_id]}\n".encode(),
|
||||||
|
self.client_address)
|
||||||
|
time.sleep(1.0 / self.server.max_measurements_per_second)
|
||||||
|
else:
|
||||||
|
socket.sendto("error: unknown robot marker id\n".encode(),
|
||||||
|
self.client_address)
|
||||||
|
except ValueError:
|
||||||
|
socket.sendto("error: data not understood. expected robot marker id (int)\n".encode(), self.client_address)
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
class MeasurementServer(socketserver.ThreadingMixIn, socketserver.UDPServer):
|
||||||
|
def __init__(self, server_address, RequestHandlerClass, estimator, max_measurements_per_second=30):
|
||||||
|
super().__init__(server_address, RequestHandlerClass)
|
||||||
|
self.estimator = estimator
|
||||||
|
self.max_measurements_per_second = max_measurements_per_second
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
aruco_estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[15])
|
||||||
|
estimator_thread = threading.Thread(target=aruco_estimator.run_tracking)
|
||||||
|
estimator_thread.start()
|
||||||
|
|
||||||
|
with MeasurementServer(('127.0.0.1', 42424), MeasurementHandler, aruco_estimator,
|
||||||
|
max_measurements_per_second=30) as measurement_server:
|
||||||
|
measurement_server.serve_forever()
|
||||||
|
|
||||||
|
# receive with: nc 127.0.0.1 42424 -u -> 15 + Enter
|
175
remote_control/mpc_controller.py
Normal file
175
remote_control/mpc_controller.py
Normal file
|
@ -0,0 +1,175 @@
|
||||||
|
import numpy as np
|
||||||
|
import time
|
||||||
|
|
||||||
|
from casadi_opt import OpenLoopSolver
|
||||||
|
|
||||||
|
|
||||||
|
class MPCController:
|
||||||
|
def __init__(self, estimator):
|
||||||
|
self.t = time.time()
|
||||||
|
|
||||||
|
self.estimator = estimator
|
||||||
|
self.controlling = False
|
||||||
|
|
||||||
|
self.mstep = 2
|
||||||
|
self.ols = OpenLoopSolver(N=20, T=1.0)
|
||||||
|
self.ols.setup()
|
||||||
|
self.dt = self.ols.T / self.ols.N
|
||||||
|
|
||||||
|
# integrator
|
||||||
|
self.omega_max = 5.0
|
||||||
|
self.control_scaling = 0.4
|
||||||
|
|
||||||
|
def move_to_pos(self, target_pos, robot, near_target_counter=5):
|
||||||
|
near_target = 0
|
||||||
|
while near_target < near_target_counter:
|
||||||
|
while not self.estimator.event_queue.empty():
|
||||||
|
event = self.estimator.event_queue.get()
|
||||||
|
print("event: ", event)
|
||||||
|
if event[0] == 'click':
|
||||||
|
pass
|
||||||
|
elif event[0] == 'key':
|
||||||
|
key = event[1]
|
||||||
|
|
||||||
|
if key == 84: # arrow up
|
||||||
|
self.controlling = True
|
||||||
|
self.t = time.time()
|
||||||
|
elif key == 82: # arrow down
|
||||||
|
self.controlling = False
|
||||||
|
robot.send_cmd()
|
||||||
|
elif key == 48: # 0
|
||||||
|
target_pos = np.array([0.0, 0.0, 0.0])
|
||||||
|
elif key == 43: # +
|
||||||
|
self.control_scaling += 0.1
|
||||||
|
self.control_scaling = min(self.control_scaling, 1.0)
|
||||||
|
print("control scaling = ", self.control_scaling)
|
||||||
|
elif key == 45: # -
|
||||||
|
self.control_scaling -= 0.1
|
||||||
|
self.control_scaling = max(self.control_scaling, 0.1)
|
||||||
|
print("control scaling = ", self.control_scaling)
|
||||||
|
elif key == 113:
|
||||||
|
print("quit!")
|
||||||
|
self.controlling = False
|
||||||
|
robot.send_cmd()
|
||||||
|
return
|
||||||
|
elif key == 27: # escape
|
||||||
|
print("quit!")
|
||||||
|
self.controlling = False
|
||||||
|
robot.send_cmd()
|
||||||
|
return
|
||||||
|
|
||||||
|
x_pred = self.get_measurement(robot.id)
|
||||||
|
|
||||||
|
if x_pred is not None:
|
||||||
|
error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
|
||||||
|
angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
|
||||||
|
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
|
||||||
|
# print("error pos = ", error_pos)
|
||||||
|
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
|
||||||
|
|
||||||
|
# if error_pos > 0.075 or error_ang > 0.35:
|
||||||
|
if error_pos > 0.05 or error_ang > 0.1:
|
||||||
|
# solve mpc open loop problem
|
||||||
|
res = self.ols.solve(x_pred, target_pos)
|
||||||
|
|
||||||
|
# us1 = res[0]
|
||||||
|
# us2 = res[1]
|
||||||
|
us1 = res[0] * self.control_scaling
|
||||||
|
us2 = res[1] * self.control_scaling
|
||||||
|
# print("u = {}", (us1, us2))
|
||||||
|
|
||||||
|
# print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
|
||||||
|
dt_mpc = time.time() - self.t
|
||||||
|
if dt_mpc < self.dt: # wait until next control can be applied
|
||||||
|
# print("sleeping for {} seconds...".format(self.dt - dt_mpc))
|
||||||
|
time.sleep(self.dt - dt_mpc)
|
||||||
|
else:
|
||||||
|
us1 = [0] * self.mstep
|
||||||
|
us2 = [0] * self.mstep
|
||||||
|
near_target += 1
|
||||||
|
|
||||||
|
# send controls to the robot
|
||||||
|
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
|
||||||
|
u1 = us1[i]
|
||||||
|
u2 = us2[i]
|
||||||
|
robot.send_cmd(u1, u2)
|
||||||
|
if i < self.mstep:
|
||||||
|
time.sleep(self.dt)
|
||||||
|
self.t = time.time() # save time the most recent control was applied
|
||||||
|
else:
|
||||||
|
print("robot not detected yet!")
|
||||||
|
|
||||||
|
def interactive_control(self, robots):
|
||||||
|
controlled_robot_number = 0
|
||||||
|
robot = robots[controlled_robot_number]
|
||||||
|
|
||||||
|
target_pos = np.array([0.0, 0.0, 0.0])
|
||||||
|
|
||||||
|
running = True
|
||||||
|
while running:
|
||||||
|
# handle events from opencv window
|
||||||
|
while not self.estimator.event_queue.empty():
|
||||||
|
event = self.estimator.event_queue.get()
|
||||||
|
print("event: ", event)
|
||||||
|
if event[0] == 'click':
|
||||||
|
target_pos = event[1]
|
||||||
|
elif event[0] == 'key':
|
||||||
|
key = event[1]
|
||||||
|
|
||||||
|
if key == 32: # arrow up
|
||||||
|
self.controlling = not self.controlling
|
||||||
|
if not self.controlling:
|
||||||
|
print("disable control")
|
||||||
|
robot.send_cmd() # stop robot
|
||||||
|
else:
|
||||||
|
print("enable control")
|
||||||
|
self.t = time.time()
|
||||||
|
elif key == 48: # 0
|
||||||
|
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
|
||||||
|
elif key == 43: # +
|
||||||
|
self.control_scaling += 0.1
|
||||||
|
self.control_scaling = min(self.control_scaling, 1.0)
|
||||||
|
print("control scaling = ", self.control_scaling)
|
||||||
|
elif key == 45: # -
|
||||||
|
self.control_scaling -= 0.1
|
||||||
|
self.control_scaling = max(self.control_scaling, 0.1)
|
||||||
|
print("control scaling = ", self.control_scaling)
|
||||||
|
elif key == 9: # TAB
|
||||||
|
# switch controlled robot
|
||||||
|
robot.send_cmd() # stop current robot
|
||||||
|
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
|
||||||
|
robot = robots[controlled_robot_number]
|
||||||
|
print(f"controlled robot: {robot.id}")
|
||||||
|
elif key == 113 or key == 27: # q or ESCAPE
|
||||||
|
print("quit!")
|
||||||
|
self.controlling = False
|
||||||
|
robot.send_cmd()
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.controlling:
|
||||||
|
# measure state
|
||||||
|
x_pred = self.get_measurement(robot.id)
|
||||||
|
|
||||||
|
# print(np.linalg.norm(x_pred-target_pos))
|
||||||
|
|
||||||
|
# solve mpc open loop problem
|
||||||
|
res = self.ols.solve(x_pred, target_pos)
|
||||||
|
|
||||||
|
us1 = res[0] * self.control_scaling
|
||||||
|
us2 = res[1] * self.control_scaling
|
||||||
|
|
||||||
|
dt_mpc = time.time() - self.t
|
||||||
|
if dt_mpc < self.dt: # wait until next control can be applied
|
||||||
|
time.sleep(self.dt - dt_mpc)
|
||||||
|
|
||||||
|
# send controls to the robot
|
||||||
|
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
|
||||||
|
u1 = us1[i]
|
||||||
|
u2 = us2[i]
|
||||||
|
robot.send_cmd(u1, u2)
|
||||||
|
if i < self.mstep:
|
||||||
|
time.sleep(self.dt)
|
||||||
|
self.t = time.time() # save time the most recent control was applied
|
||||||
|
|
||||||
|
def get_measurement(self, robot_id):
|
||||||
|
return self.estimator.get_robot_state_estimate(robot_id)
|
55
remote_control/mpc_test.py
Normal file
55
remote_control/mpc_test.py
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
import sys
|
||||||
|
import socket
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
from robot import Robot
|
||||||
|
|
||||||
|
from mpc_controller import MPCController
|
||||||
|
|
||||||
|
from aruco_estimator import ArucoEstimator
|
||||||
|
|
||||||
|
|
||||||
|
class RemoteController:
|
||||||
|
def __init__(self):
|
||||||
|
self.robots = []
|
||||||
|
#self.robots = [Robot(11, '192.168.1.11'), Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
|
||||||
|
|
||||||
|
self.robot_ids = {}
|
||||||
|
for r in self.robots:
|
||||||
|
self.robot_ids[r.id] = r
|
||||||
|
|
||||||
|
# socket for movement commands
|
||||||
|
self.comm_socket = socket.socket()
|
||||||
|
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
|
||||||
|
for robot in self.robots:
|
||||||
|
robot.connect()
|
||||||
|
|
||||||
|
self.comm_socket.bind(('', 1337))
|
||||||
|
self.comm_socket.listen(5)
|
||||||
|
|
||||||
|
self.t = time.time()
|
||||||
|
|
||||||
|
# start thread for marker position detection
|
||||||
|
self.estimator = ArucoEstimator(self.robot_ids.keys())
|
||||||
|
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking)
|
||||||
|
self.estimator_thread.start()
|
||||||
|
|
||||||
|
self.controller = MPCController(self.estimator)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
print("waiting until all markers are detected...")
|
||||||
|
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
|
||||||
|
pass
|
||||||
|
print("starting control")
|
||||||
|
self.controller.interactive_control(robots=self.robots)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args):
|
||||||
|
rc = RemoteController()
|
||||||
|
rc.run()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main(sys.argv)
|
267
remote_control/pid_controller.py
Normal file
267
remote_control/pid_controller.py
Normal file
|
@ -0,0 +1,267 @@
|
||||||
|
import numpy as np
|
||||||
|
import math
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class PIDController:
|
||||||
|
def __init__(self, estimator):
|
||||||
|
self.t = time.time()
|
||||||
|
|
||||||
|
self.estimator = estimator
|
||||||
|
self.controlling = False
|
||||||
|
|
||||||
|
self.P_angle = 0.4 # PID parameters determined by Ziegler-Nichols. measured K_crit = 1.4, T_crit = 1.5
|
||||||
|
self.I_angle = 0.35
|
||||||
|
self.D_angle = 0.1
|
||||||
|
|
||||||
|
self.P_pos = 0.50
|
||||||
|
self.I_pos = 0.3
|
||||||
|
self.D_pos = 0.1
|
||||||
|
|
||||||
|
self.mode = None
|
||||||
|
|
||||||
|
def move_to_pos(self, target_pos, robot, near_target_counter=5):
|
||||||
|
near_target = 0
|
||||||
|
while near_target < near_target_counter:
|
||||||
|
while not self.estimator.event_queue.empty():
|
||||||
|
event = self.estimator.event_queue.get()
|
||||||
|
print("event: ", event)
|
||||||
|
if event[0] == 'click':
|
||||||
|
pass
|
||||||
|
elif event[0] == 'key':
|
||||||
|
key = event[1]
|
||||||
|
|
||||||
|
if key == 84: # arrow up
|
||||||
|
self.controlling = True
|
||||||
|
self.t = time.time()
|
||||||
|
elif key == 82: # arrow down
|
||||||
|
self.controlling = False
|
||||||
|
robot.send_cmd()
|
||||||
|
elif key == 48: # 0
|
||||||
|
target_pos = np.array([0.0, 0.0, 0.0])
|
||||||
|
elif key == 43: # +
|
||||||
|
self.control_scaling += 0.1
|
||||||
|
self.control_scaling = min(self.control_scaling, 1.0)
|
||||||
|
print("control scaling = ", self.control_scaling)
|
||||||
|
elif key == 45: # -
|
||||||
|
self.control_scaling -= 0.1
|
||||||
|
self.control_scaling = max(self.control_scaling, 0.1)
|
||||||
|
print("control scaling = ", self.control_scaling)
|
||||||
|
elif key == 113:
|
||||||
|
print("quit!")
|
||||||
|
self.controlling = False
|
||||||
|
robot.send_cmd()
|
||||||
|
return
|
||||||
|
elif key == 27: # escape
|
||||||
|
print("quit!")
|
||||||
|
self.controlling = False
|
||||||
|
robot.send_cmd()
|
||||||
|
return
|
||||||
|
|
||||||
|
x_pred = self.get_measurement(robot.id)
|
||||||
|
|
||||||
|
if x_pred is not None:
|
||||||
|
error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
|
||||||
|
angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
|
||||||
|
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
|
||||||
|
# print("error pos = ", error_pos)
|
||||||
|
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
|
||||||
|
|
||||||
|
# if error_pos > 0.075 or error_ang > 0.35:
|
||||||
|
if error_pos > 0.05 or error_ang > 0.1:
|
||||||
|
# solve mpc open loop problem
|
||||||
|
res = self.ols.solve(x_pred, target_pos)
|
||||||
|
|
||||||
|
# us1 = res[0]
|
||||||
|
# us2 = res[1]
|
||||||
|
us1 = res[0] * self.control_scaling
|
||||||
|
us2 = res[1] * self.control_scaling
|
||||||
|
# print("u = {}", (us1, us2))
|
||||||
|
|
||||||
|
# print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
|
||||||
|
dt_mpc = time.time() - self.t
|
||||||
|
if dt_mpc < self.dt: # wait until next control can be applied
|
||||||
|
# print("sleeping for {} seconds...".format(self.dt - dt_mpc))
|
||||||
|
time.sleep(self.dt - dt_mpc)
|
||||||
|
else:
|
||||||
|
us1 = [0] * self.mstep
|
||||||
|
us2 = [0] * self.mstep
|
||||||
|
near_target += 1
|
||||||
|
|
||||||
|
# send controls to the robot
|
||||||
|
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
|
||||||
|
u1 = us1[i]
|
||||||
|
u2 = us2[i]
|
||||||
|
robot.send_cmd(u1, u2)
|
||||||
|
if i < self.mstep:
|
||||||
|
time.sleep(self.dt)
|
||||||
|
self.t = time.time() # save time the most recent control was applied
|
||||||
|
else:
|
||||||
|
print("robot not detected yet!")
|
||||||
|
|
||||||
|
def interactive_control(self, robots):
|
||||||
|
controlled_robot_number = 0
|
||||||
|
robot = robots[controlled_robot_number]
|
||||||
|
|
||||||
|
ts = []
|
||||||
|
angles = []
|
||||||
|
|
||||||
|
target_pos = np.array([0.0, 0.0, 0.0])
|
||||||
|
e_angle_old = 0.0
|
||||||
|
e_pos_old = 0.0
|
||||||
|
|
||||||
|
i = 0.0
|
||||||
|
i_angle = 0.0
|
||||||
|
i_pos = 0.0
|
||||||
|
|
||||||
|
t0 = time.time()
|
||||||
|
|
||||||
|
running = True
|
||||||
|
while running:
|
||||||
|
# handle events from opencv window
|
||||||
|
while not self.estimator.event_queue.empty():
|
||||||
|
event = self.estimator.event_queue.get()
|
||||||
|
print("event: ", event)
|
||||||
|
if event[0] == 'click':
|
||||||
|
target_pos = event[1]
|
||||||
|
i_angle = 0
|
||||||
|
i_pos = 0
|
||||||
|
e_pos_old = 0
|
||||||
|
e_angle_old = 0
|
||||||
|
self.mode = 'combined'
|
||||||
|
elif event[0] == 'key':
|
||||||
|
key = event[1]
|
||||||
|
|
||||||
|
if key == 32: # arrow up
|
||||||
|
self.controlling = not self.controlling
|
||||||
|
if not self.controlling:
|
||||||
|
print("disable control")
|
||||||
|
robot.send_cmd() # stop robot
|
||||||
|
self.mode = None # reset control mode
|
||||||
|
else:
|
||||||
|
print("enable control")
|
||||||
|
i = 0
|
||||||
|
self.t = time.time()
|
||||||
|
elif key == 48: # 0
|
||||||
|
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
|
||||||
|
elif key == 97: # a
|
||||||
|
self.mode = 'angle'
|
||||||
|
e_angle_old = 0
|
||||||
|
i = 0
|
||||||
|
self.t = time.time()
|
||||||
|
elif key == 99: # c
|
||||||
|
self.mode = 'combined'
|
||||||
|
e_angle_old = 0
|
||||||
|
e_pos_old = 0
|
||||||
|
i_angle = 0
|
||||||
|
i_pos = 0
|
||||||
|
self.t = time.time()
|
||||||
|
elif key == 112: # p
|
||||||
|
self.mode = 'position'
|
||||||
|
e_pos_old = 0
|
||||||
|
i = 0
|
||||||
|
self.t = time.time()
|
||||||
|
elif key == 43: # +
|
||||||
|
self.P_pos += 0.1
|
||||||
|
print("P = ", self.P_angle)
|
||||||
|
elif key == 45: # -
|
||||||
|
self.P_pos -= 0.1
|
||||||
|
print("P = ", self.P_angle)
|
||||||
|
elif key == 9: # TAB
|
||||||
|
# switch controlled robot
|
||||||
|
robot.send_cmd() # stop current robot
|
||||||
|
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
|
||||||
|
robot = robots[controlled_robot_number]
|
||||||
|
print(f"controlled robot: {robot.id}")
|
||||||
|
elif key == 113 or key == 27: # q or ESCAPE
|
||||||
|
print("quit!")
|
||||||
|
self.controlling = False
|
||||||
|
robot.send_cmd()
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.controlling:
|
||||||
|
# measure state
|
||||||
|
x_pred = self.get_measurement(robot.id)
|
||||||
|
dt = self.t - time.time()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#print(f"x_pred = {x_pred}\ntarget = {target_pos}\nerror = {np.linalg.norm(target_pos - x_pred)}\n")
|
||||||
|
if self.mode == 'angle':
|
||||||
|
# compute angle such that robot faces to target point
|
||||||
|
target_angle = target_pos[2]
|
||||||
|
|
||||||
|
ts.append(time.time() - t0)
|
||||||
|
angles.append(x_pred[2])
|
||||||
|
|
||||||
|
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
|
||||||
|
|
||||||
|
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
|
||||||
|
p = self.P_angle * e_angle
|
||||||
|
# i += self.I * dt * e # right Riemann sum
|
||||||
|
i += self.I_angle * dt * (e_angle + e_angle_old)/2.0 # trapezoidal rule
|
||||||
|
d = self.D_angle * (e_angle - e_angle_old)/dt
|
||||||
|
|
||||||
|
u1 = p - i - d
|
||||||
|
u2 = - u1
|
||||||
|
|
||||||
|
e_angle_old = e_angle
|
||||||
|
|
||||||
|
elif self.mode == 'combined':
|
||||||
|
# compute angle such that robot faces to target point
|
||||||
|
v = target_pos[0:2] - x_pred[0:2]
|
||||||
|
target_angle = math.atan2(v[1], v[0])
|
||||||
|
|
||||||
|
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
|
||||||
|
|
||||||
|
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
|
||||||
|
e_pos = np.linalg.norm(v)
|
||||||
|
|
||||||
|
if e_pos < 0.05:
|
||||||
|
self.mode = 'angle'
|
||||||
|
e_angle_old = 0
|
||||||
|
e_pos_old = 0
|
||||||
|
i_angle = 0
|
||||||
|
i_pos = 0
|
||||||
|
u1 = 0
|
||||||
|
u2 = 0
|
||||||
|
else:
|
||||||
|
forward = abs(e_angle) < np.pi/2.0
|
||||||
|
|
||||||
|
if not forward:
|
||||||
|
if e_angle > np.pi/2.0:
|
||||||
|
e_angle -= np.pi
|
||||||
|
else:
|
||||||
|
e_angle += np.pi
|
||||||
|
|
||||||
|
p_angle = self.P_angle * e_angle
|
||||||
|
i_angle += self.I_angle * dt * (e_angle + e_angle_old) / 2.0 # trapezoidal rule
|
||||||
|
d_angle = self.D_angle * (e_angle - e_angle_old) / dt
|
||||||
|
|
||||||
|
p_pos = self.P_pos * e_pos
|
||||||
|
i_pos += self.I_pos * dt * (e_pos + e_pos_old) / 2.0 # trapezoidal rule
|
||||||
|
d_pos = self.D_pos * (e_pos - e_pos_old) / dt
|
||||||
|
|
||||||
|
if forward:
|
||||||
|
print("forward")
|
||||||
|
u1 = p_angle + p_pos - i_angle - i_pos - d_angle - d_pos
|
||||||
|
u2 = - p_angle + p_pos + i_angle - i_pos + d_angle - d_pos
|
||||||
|
else:
|
||||||
|
print("backward")
|
||||||
|
u1 = p_angle - p_pos - i_angle + i_pos - d_angle + d_pos
|
||||||
|
u2 = - p_angle - p_pos + i_angle + i_pos + d_angle + d_pos
|
||||||
|
|
||||||
|
e_pos_old = e_pos
|
||||||
|
e_angle_old = e_angle
|
||||||
|
|
||||||
|
else:
|
||||||
|
u1 = 0.0
|
||||||
|
u2 = 0.0
|
||||||
|
#print(f"u = ({u1}, {u2})")
|
||||||
|
robot.send_cmd(u1, u2)
|
||||||
|
self.t = time.time() # save time when the most recent control was applied
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
|
||||||
|
def get_measurement(self, robot_id):
|
||||||
|
return self.estimator.get_robot_state_estimate(robot_id)
|
|
@ -30,8 +30,12 @@ from marker_pos_angle.msg import id_pos_angle
|
||||||
|
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
|
||||||
|
from argparse import ArgumentParser
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Robot:
|
class Robot:
|
||||||
def __init__(self, id, ip=None):
|
def __init__(self, id, ip):
|
||||||
self.pos = None
|
self.pos = None
|
||||||
self.orient = None
|
self.orient = None
|
||||||
|
|
||||||
|
@ -52,16 +56,16 @@ class Track:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# ids in clockwise direction
|
# ids in clockwise direction
|
||||||
self.inner = OrderedDict()
|
self.inner = OrderedDict()
|
||||||
|
self.inner[0] = None
|
||||||
self.inner[1] = None
|
self.inner[1] = None
|
||||||
self.inner[2] = None
|
self.inner[2] = None
|
||||||
self.inner[3] = None
|
self.inner[3] = None
|
||||||
self.inner[4] = None
|
|
||||||
|
|
||||||
self.outer = OrderedDict()
|
self.outer = OrderedDict()
|
||||||
|
self.outer[4] = None
|
||||||
self.outer[5] = None
|
self.outer[5] = None
|
||||||
self.outer[6] = None
|
self.outer[6] = None
|
||||||
self.outer[7] = None
|
self.outer[7] = None
|
||||||
self.outer[8] = None
|
|
||||||
|
|
||||||
self.track_complete = False
|
self.track_complete = False
|
||||||
|
|
||||||
|
@ -122,20 +126,18 @@ def f_ode(t, x, u):
|
||||||
return dx
|
return dx
|
||||||
|
|
||||||
class RemoteController:
|
class RemoteController:
|
||||||
def __init__(self):
|
def __init__(self, id, ip):
|
||||||
|
|
||||||
self.robots = [Robot(11, '192.168.1.103')]
|
self.anim_stopped = False
|
||||||
#self.robots = [Robot(14, '192.168.1.102')]
|
|
||||||
|
#self.robots = [Robot(14, '192.168.1.103')]
|
||||||
|
#self.robots = [Robot(15, '192.168.1.102')]
|
||||||
|
self.robots = [Robot(id, ip)]
|
||||||
|
|
||||||
self.robot_ids = {}
|
self.robot_ids = {}
|
||||||
for r in self.robots:
|
for r in self.robots:
|
||||||
self.robot_ids[r.id] = r
|
self.robot_ids[r.id] = r
|
||||||
|
|
||||||
obst = [Obstacle(12, 0.275), Obstacle(10, 0.275), Obstacle(14, 0.275), Obstacle(15, 0.275)]
|
|
||||||
|
|
||||||
self.obstacles = {}
|
|
||||||
for r in obst:
|
|
||||||
self.obstacles[r.id] = r
|
|
||||||
|
|
||||||
self.track = Track()
|
self.track = Track()
|
||||||
|
|
||||||
|
@ -198,13 +200,6 @@ class RemoteController:
|
||||||
self.track_line_inner, = self.ax.plot([], [])
|
self.track_line_inner, = self.ax.plot([], [])
|
||||||
self.track_line_outer, = self.ax.plot([], [])
|
self.track_line_outer, = self.ax.plot([], [])
|
||||||
|
|
||||||
self.circles = []
|
|
||||||
for o in self.obstacles:
|
|
||||||
self.circles.append(patch.Circle((0.0, 0.0), radius=0.1, fill=False, color='red', linestyle='--'))
|
|
||||||
|
|
||||||
for s in self.circles:
|
|
||||||
self.ax.add_artist(s)
|
|
||||||
|
|
||||||
self.ax.set_xlabel('x-position')
|
self.ax.set_xlabel('x-position')
|
||||||
self.ax.set_ylabel('y-position')
|
self.ax.set_ylabel('y-position')
|
||||||
self.ax.grid()
|
self.ax.grid()
|
||||||
|
@ -231,6 +226,7 @@ class RemoteController:
|
||||||
|
|
||||||
|
|
||||||
def ani(self):
|
def ani(self):
|
||||||
|
|
||||||
print("starting animation")
|
print("starting animation")
|
||||||
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
|
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
|
||||||
plt.ion()
|
plt.ion()
|
||||||
|
@ -251,10 +247,13 @@ class RemoteController:
|
||||||
self.track_line_outer.set_data(self.track.outer_poly.exterior.xy)
|
self.track_line_outer.set_data(self.track.outer_poly.exterior.xy)
|
||||||
|
|
||||||
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol,\
|
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol,\
|
||||||
self.track_line_inner, self.track_line_outer, self.line_x,self.line_y, self.circles[0], self.circles[1],\
|
self.track_line_inner, self.track_line_outer, self.line_x,self.line_y,
|
||||||
self.circles[2], self.circles[3],
|
|
||||||
|
|
||||||
def ani_update(self, frame):
|
def ani_update(self, frame):
|
||||||
|
if self.anim_stopped:
|
||||||
|
self.ani.event_source.stop()
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
#print("plotting")
|
#print("plotting")
|
||||||
self.mutex.acquire()
|
self.mutex.acquire()
|
||||||
try:
|
try:
|
||||||
|
@ -306,21 +305,11 @@ class RemoteController:
|
||||||
else:
|
else:
|
||||||
self.line_ol.set_data([],[])
|
self.line_ol.set_data([],[])
|
||||||
|
|
||||||
i = 0
|
|
||||||
obst_keys = self.obstacles.keys()
|
|
||||||
for s in self.circles:
|
|
||||||
o = self.obstacles[obst_keys[i]]
|
|
||||||
i = i + 1
|
|
||||||
|
|
||||||
if o.pos is not None:
|
|
||||||
s.center = o.pos
|
|
||||||
s.radius = o.radius
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
self.mutex.release()
|
self.mutex.release()
|
||||||
|
|
||||||
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol, self.track_line_inner, self.track_line_outer,\
|
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol, self.track_line_inner, self.track_line_outer,\
|
||||||
self.line_x, self.line_y, self.circles[0], self.circles[1],self.circles[2],self.circles[3],
|
self.line_x, self.line_y,
|
||||||
|
|
||||||
def measurement_callback(self, data):
|
def measurement_callback(self, data):
|
||||||
#print(data)
|
#print(data)
|
||||||
|
@ -352,11 +341,6 @@ class RemoteController:
|
||||||
finally:
|
finally:
|
||||||
self.mutex.release()
|
self.mutex.release()
|
||||||
|
|
||||||
# detect obstacles
|
|
||||||
if data.id in self.obstacles.keys():
|
|
||||||
obst = (data.x, data.y)
|
|
||||||
self.obstacles[data.id].pos = obst
|
|
||||||
|
|
||||||
# detect track
|
# detect track
|
||||||
if data.id in self.track.inner.keys() or data.id in self.track.outer.keys():
|
if data.id in self.track.inner.keys() or data.id in self.track.outer.keys():
|
||||||
self.track.set_id(data)
|
self.track.set_id(data)
|
||||||
|
@ -369,7 +353,8 @@ class RemoteController:
|
||||||
markers_out = self.track.outer.values()
|
markers_out = self.track.outer.values()
|
||||||
|
|
||||||
# find targets:
|
# find targets:
|
||||||
lamb = 0.2
|
# generate waypoints
|
||||||
|
lamb = 0.5
|
||||||
j = 0
|
j = 0
|
||||||
for i in range(0,4):
|
for i in range(0,4):
|
||||||
p = np.array(markers_in[i]) + lamb * (np.array(markers_out[i]) - np.array(markers_in[i]))
|
p = np.array(markers_in[i]) + lamb * (np.array(markers_out[i]) - np.array(markers_in[i]))
|
||||||
|
@ -382,9 +367,18 @@ class RemoteController:
|
||||||
targets[j] = (mean[0], mean[1], 0.0)
|
targets[j] = (mean[0], mean[1], 0.0)
|
||||||
j += 1
|
j += 1
|
||||||
|
|
||||||
|
# final connection between first and last marker
|
||||||
|
mean_in = (np.array(markers_in[3]) + np.array(markers_in[0])) * 0.5
|
||||||
|
mean_out = (np.array(markers_out[3]) + np.array(markers_out[0])) * 0.5
|
||||||
|
mean = mean_in + (1.0 - lamb) * (mean_out - mean_in)
|
||||||
|
targets[j] = (mean[0], mean[1], 0.0)
|
||||||
|
print("targets = {}".format(targets.keys()))
|
||||||
|
|
||||||
auto_control = False
|
auto_control = False
|
||||||
current_target = 0
|
current_target = 0
|
||||||
|
|
||||||
|
control_scaling = 0.3
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
|
||||||
# open loop controller
|
# open loop controller
|
||||||
|
@ -418,12 +412,32 @@ class RemoteController:
|
||||||
self.target = targets[5]
|
self.target = targets[5]
|
||||||
elif event.key == pygame.K_7:
|
elif event.key == pygame.K_7:
|
||||||
self.target = targets[6]
|
self.target = targets[6]
|
||||||
|
elif event.key == pygame.K_8:
|
||||||
|
self.target = targets[7]
|
||||||
elif event.key == pygame.K_SPACE:
|
elif event.key == pygame.K_SPACE:
|
||||||
auto_control = not auto_control
|
auto_control = not auto_control
|
||||||
if auto_control:
|
if auto_control:
|
||||||
self.target = targets[current_target]
|
self.target = targets[current_target]
|
||||||
|
elif event.key == pygame.K_PLUS:
|
||||||
|
control_scaling += 0.1
|
||||||
|
control_scaling = min(control_scaling, 1.0)
|
||||||
|
elif event.key == pygame.K_MINUS:
|
||||||
|
control_scaling -= 0.1
|
||||||
|
control_scaling = max(control_scaling, 0.3)
|
||||||
|
elif event.key == pygame.K_ESCAPE:
|
||||||
|
print("quit!")
|
||||||
|
self.controlling = False
|
||||||
|
if self.rc_socket:
|
||||||
|
self.rc_socket.send('(0.0,0.0)\n')
|
||||||
|
self.anim_stopped = True
|
||||||
|
return
|
||||||
elif event.key == pygame.QUIT:
|
elif event.key == pygame.QUIT:
|
||||||
sys.exit(0)
|
print("quit!")
|
||||||
|
self.controlling = False
|
||||||
|
if self.rc_socket:
|
||||||
|
self.rc_socket.send('(0.0,0.0)\n')
|
||||||
|
self.anim_stopped = True
|
||||||
|
return
|
||||||
|
|
||||||
if self.controlling:
|
if self.controlling:
|
||||||
|
|
||||||
|
@ -432,17 +446,20 @@ class RemoteController:
|
||||||
if auto_control:
|
if auto_control:
|
||||||
if np.linalg.norm(x_pred[0:2]-np.array(self.target[0:2])) < 0.3:
|
if np.linalg.norm(x_pred[0:2]-np.array(self.target[0:2])) < 0.3:
|
||||||
print("close to target!")
|
print("close to target!")
|
||||||
current_target = (current_target + 1) % 7
|
current_target = (current_target + 1) % 8
|
||||||
self.target = targets[current_target]
|
self.target = targets[current_target]
|
||||||
print("new target = {}".format(current_target))
|
print("new target = {}".format(current_target))
|
||||||
|
|
||||||
tmpc_start = time.time()
|
tmpc_start = time.time()
|
||||||
|
|
||||||
# solve mpc open loop problem
|
# solve mpc open loop problem
|
||||||
res = self.ols.solve(x_pred, self.target, self.obstacles, self.track)
|
res = self.ols.solve(x_pred, self.target, [], self.track)
|
||||||
|
|
||||||
us1 = res[0]
|
#us1 = res[0]
|
||||||
us2 = res[1]
|
#us2 = res[1]
|
||||||
|
us1 = res[0] * control_scaling
|
||||||
|
us2 = res[1] * control_scaling
|
||||||
|
print("u = {}", (us1, us2))
|
||||||
|
|
||||||
# save open loop trajectories for plotting
|
# save open loop trajectories for plotting
|
||||||
self.mutex.acquire()
|
self.mutex.acquire()
|
||||||
|
@ -493,9 +510,18 @@ class RemoteController:
|
||||||
|
|
||||||
|
|
||||||
def main(args):
|
def main(args):
|
||||||
|
parser = ArgumentParser()
|
||||||
|
parser.add_argument('id', metavar='id', type=str, help='marker id of the controlled robot')
|
||||||
|
parser.add_argument('ip', metavar='ip', type=str, help='ip address of the controlled robot')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
marker_id = int(args.id)
|
||||||
|
ip = args.ip
|
||||||
|
|
||||||
|
|
||||||
rospy.init_node('controller_node', anonymous=True)
|
rospy.init_node('controller_node', anonymous=True)
|
||||||
|
|
||||||
rc = RemoteController()
|
rc = RemoteController(marker_id, ip)
|
||||||
|
|
||||||
pygame.init()
|
pygame.init()
|
||||||
|
|
||||||
|
@ -508,7 +534,8 @@ def main(args):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
#threading.Thread(target=rc.input_handling).start()
|
#threading.Thread(target=rc.input_handling).start()
|
||||||
threading.Thread(target=rc.controller).start()
|
controller_thread = threading.Thread(target=rc.controller)
|
||||||
|
controller_thread.start()
|
||||||
|
|
||||||
rc.ani()
|
rc.ani()
|
||||||
|
|
||||||
|
|
285
remote_control/roborally.py
Normal file
285
remote_control/roborally.py
Normal file
|
@ -0,0 +1,285 @@
|
||||||
|
import sys
|
||||||
|
import socket
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
from mpc_controller import MPCController
|
||||||
|
from robot import Robot
|
||||||
|
from aruco_estimator import ArucoEstimator
|
||||||
|
|
||||||
|
MSGLEN = 64
|
||||||
|
|
||||||
|
def myreceive(sock):
|
||||||
|
chunks = []
|
||||||
|
bytes_recd = 0
|
||||||
|
while bytes_recd < MSGLEN:
|
||||||
|
chunk = sock.recv(1)
|
||||||
|
if chunk == b'':
|
||||||
|
raise RuntimeError("socket connection broken")
|
||||||
|
chunks.append(chunk)
|
||||||
|
bytes_recd = bytes_recd + len(chunk)
|
||||||
|
|
||||||
|
if chunks[-1] == b'\n':
|
||||||
|
break
|
||||||
|
|
||||||
|
return b''.join(chunks)
|
||||||
|
|
||||||
|
|
||||||
|
class RoboRallyRobot(Robot):
|
||||||
|
# dictionary mapping the current orientation and a turn command to the resulting orientation
|
||||||
|
resulting_orientation = {
|
||||||
|
'^': {'turn left': '<', 'turn right': '>', 'turn around': 'v'},
|
||||||
|
'>': {'turn left': '^', 'turn right': 'v', 'turn around': '<'},
|
||||||
|
'v': {'turn left': '>', 'turn right': '<', 'turn around': '^'},
|
||||||
|
'<': {'turn left': 'v', 'turn right': '^', 'turn around': '>'},
|
||||||
|
}
|
||||||
|
|
||||||
|
# dictionary mapping an orientation to its opposite
|
||||||
|
opposites = {'^': 'v', '>': '<', 'v': '^', '<': '>'}
|
||||||
|
|
||||||
|
def __init__(self, id, ip, x, y, orientation):
|
||||||
|
super().__init__(id, ip)
|
||||||
|
|
||||||
|
self.grid_x = x
|
||||||
|
self.grid_y = y
|
||||||
|
self.grid_orientation = orientation
|
||||||
|
|
||||||
|
def get_neighbor_coordinates(self, direction):
|
||||||
|
# get the coordinates of the neighboring tile in the given direction
|
||||||
|
if direction == '^':
|
||||||
|
return self.grid_x, self.grid_y - 1
|
||||||
|
elif direction == '>':
|
||||||
|
return self.grid_x + 1, self.grid_y
|
||||||
|
elif direction == 'v':
|
||||||
|
return self.grid_x, self.grid_y + 1
|
||||||
|
elif direction == '<':
|
||||||
|
return self.grid_x - 1, self.grid_y
|
||||||
|
else:
|
||||||
|
print("error: unknown direction")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
def move(self, move_type):
|
||||||
|
if move_type == 'forward':
|
||||||
|
target_tile = self.get_neighbor_coordinates(self.grid_orientation)
|
||||||
|
self.grid_x = target_tile[0]
|
||||||
|
self.grid_y = target_tile[1]
|
||||||
|
elif move_type == 'backward':
|
||||||
|
opposite_orientation = RoboRallyRobot.opposites[self.grid_orientation]
|
||||||
|
target_tile = self.get_neighbor_coordinates(opposite_orientation)
|
||||||
|
self.grid_x = target_tile[0]
|
||||||
|
self.grid_y = target_tile[1]
|
||||||
|
elif 'turn' in move_type:
|
||||||
|
self.grid_orientation = RoboRallyRobot.resulting_orientation[self.grid_orientation][move_type]
|
||||||
|
elif 'nop' in move_type:
|
||||||
|
pass # nop command -> robot grid position does not change (used e.g. for driving the robot to initial
|
||||||
|
# position)
|
||||||
|
else:
|
||||||
|
print(f"error: invalid move: {move_type}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.__repr__()
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"grid x: {self.grid_x}, grid y: {self.grid_y}, grid orientation: {self.grid_orientation}"
|
||||||
|
|
||||||
|
|
||||||
|
class RemoteController:
|
||||||
|
valid_cmds = ['forward', 'backward', 'turn left', 'turn right', 'turn around', 'nop', 'get position',
|
||||||
|
'set position', 'initialize_robot', 'initialize_grid']
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.robots = []
|
||||||
|
#self.robots = [RoboRallyRobot(12, '192.168.1.12', x=1, y=1, orientation='>')]
|
||||||
|
|
||||||
|
self.robot_ids = {}
|
||||||
|
for r in self.robots:
|
||||||
|
self.robot_ids[r.id] = r
|
||||||
|
|
||||||
|
# socket for movement commands
|
||||||
|
self.comm_socket = socket.socket()
|
||||||
|
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
|
||||||
|
for robot in self.robots:
|
||||||
|
robot.connect()
|
||||||
|
|
||||||
|
self.comm_socket.bind(('', 1337))
|
||||||
|
self.comm_socket.listen(5)
|
||||||
|
|
||||||
|
self.t = time.time()
|
||||||
|
|
||||||
|
# start thread for marker position detection
|
||||||
|
self.estimator = ArucoEstimator(self.robot_ids.keys())
|
||||||
|
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking)
|
||||||
|
self.estimator_thread.start()
|
||||||
|
|
||||||
|
self.controller = MPCController(self.estimator)
|
||||||
|
|
||||||
|
print("waiting for corner and robot detection..")
|
||||||
|
while not self.estimator.all_robots_detected() or not self.estimator.all_corners_detected():
|
||||||
|
pass
|
||||||
|
print("everything detected!")
|
||||||
|
# drive robots to initial position
|
||||||
|
for robot_id in self.robot_ids:
|
||||||
|
self.grid_control(robot_id, 'nop')
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
print("waiting until all markers are detected...")
|
||||||
|
while not self.estimator.all_corners_detected():
|
||||||
|
pass
|
||||||
|
print("starting control")
|
||||||
|
|
||||||
|
running = True
|
||||||
|
while running:
|
||||||
|
(clientsocket, address) = self.comm_socket.accept()
|
||||||
|
clientsocket.settimeout(None)
|
||||||
|
|
||||||
|
connected = True
|
||||||
|
while connected:
|
||||||
|
try:
|
||||||
|
data = myreceive(clientsocket)
|
||||||
|
print("data received: ", data)
|
||||||
|
|
||||||
|
inputs = data.split(b',')
|
||||||
|
cmd = inputs[0]
|
||||||
|
cmd = cmd.strip().decode()
|
||||||
|
|
||||||
|
if len(inputs) > 1:
|
||||||
|
if cmd in RemoteController.valid_cmds:
|
||||||
|
if cmd == 'initialize_grid':
|
||||||
|
try:
|
||||||
|
grid_columns = int(inputs[1])
|
||||||
|
grid_rows = int(inputs[2])
|
||||||
|
|
||||||
|
self.estimator.grid_columns = grid_columns
|
||||||
|
self.estimator.grid_rows = grid_rows
|
||||||
|
clientsocket.sendall(b'OK\n')
|
||||||
|
except ValueError:
|
||||||
|
print("could not initialize grid!")
|
||||||
|
clientsocket.sendall(b'"could not initialize grid!\n'
|
||||||
|
b'expected: initialize_robot, <grid columns>, <grid rows>')
|
||||||
|
except IndexError:
|
||||||
|
print("could not initialize grid!")
|
||||||
|
clientsocket.sendall(b'"could not initialize grid!\n'
|
||||||
|
b'expected: initialize_robot, <grid columns>, <grid rows>')
|
||||||
|
else: # robot command
|
||||||
|
try:
|
||||||
|
robot_id = int(inputs[1])
|
||||||
|
except ValueError:
|
||||||
|
robot_id = None
|
||||||
|
print("could not read robot id!")
|
||||||
|
clientsocket.sendall(b'Could not read robot id!\n')
|
||||||
|
|
||||||
|
if cmd == 'initialize_robot':
|
||||||
|
# add a new robot to the game
|
||||||
|
try:
|
||||||
|
id = int(inputs[1])
|
||||||
|
ip = inputs[2].decode().strip()
|
||||||
|
x = int(inputs[3])
|
||||||
|
y = int(inputs[4])
|
||||||
|
orientation = inputs[5].decode().strip()
|
||||||
|
|
||||||
|
print(f"initializing new robot with id {id} and ip {ip} at pos ({x},{y}) with "
|
||||||
|
f"orientation '{orientation}'")
|
||||||
|
if id not in self.robot_ids:
|
||||||
|
new_robot = RoboRallyRobot(id=id, ip=ip, x=x, y=y, orientation=orientation)
|
||||||
|
new_robot.connect()
|
||||||
|
|
||||||
|
# store the new robot in the list of robots
|
||||||
|
self.robots.append(new_robot)
|
||||||
|
self.robot_ids[new_robot.id] = new_robot # this also means the estimator
|
||||||
|
# will track the new robot because
|
||||||
|
# it got a reference to the list of
|
||||||
|
# robot ids to keep an eye out for
|
||||||
|
|
||||||
|
else: # robot is already created -> just re-initialize its position
|
||||||
|
new_robot = self.robot_ids[id]
|
||||||
|
new_robot.grid_x = x
|
||||||
|
new_robot.grid_y = y
|
||||||
|
new_robot.orientation = orientation
|
||||||
|
|
||||||
|
if new_robot.connected:
|
||||||
|
print("created new robot and successfully connected to it!")
|
||||||
|
|
||||||
|
while not self.estimator.all_robots_detected(): # wait until the robot gets detected
|
||||||
|
pass
|
||||||
|
|
||||||
|
# drive the new robot to its starting position
|
||||||
|
self.grid_control(new_robot.id, 'nop')
|
||||||
|
|
||||||
|
clientsocket.sendall(b'OK\n')
|
||||||
|
else:
|
||||||
|
clientsocket.sendall(f"error: could not connect to new robot {new_robot}".encode())
|
||||||
|
|
||||||
|
except IndexError:
|
||||||
|
print("could not initialize a new robot")
|
||||||
|
clientsocket.sendall('could not initialize a new robot: invalid command format\n'
|
||||||
|
'expected: initialize_robot, <id>, <ip>, <x>, <y>, <orientation>\n'.encode())
|
||||||
|
except ValueError:
|
||||||
|
print("could not initialize a new robot")
|
||||||
|
clientsocket.sendall('could not initialize a new robot: invalid command format\n'
|
||||||
|
'expected: initialize_robot, <id>, <ip>, <x>, <y>, <orientation>\n'.encode())
|
||||||
|
|
||||||
|
elif robot_id in self.robot_ids:
|
||||||
|
if cmd == b'get position':
|
||||||
|
clientsocket.sendall(bytes('{}\n'.format(self.robot_ids[robot_id].grid_pos)))
|
||||||
|
elif cmd == b'set position':
|
||||||
|
try:
|
||||||
|
pos_data = ",".join(inputs[2:])
|
||||||
|
new_grid_pos = tuple(map(lambda x: int(x[1]) if x[0] < 2 else float(x[1]),
|
||||||
|
enumerate(pos_data.strip().strip('()').split(','))))
|
||||||
|
self.robot_ids[robot_id].grid_pos = new_grid_pos
|
||||||
|
clientsocket.sendall(b'OK\n')
|
||||||
|
except IndexError as e:
|
||||||
|
print("could not set grid position!")
|
||||||
|
clientsocket.sendall(bytes(
|
||||||
|
'could not set grid position! (invalid format)\n'.format(
|
||||||
|
self.robot_ids[robot_id].grid_pos)))
|
||||||
|
except ValueError as e:
|
||||||
|
print("could not set grid position!")
|
||||||
|
clientsocket.sendall(bytes(
|
||||||
|
'could not set grid position! (invalid format)\n'.format(
|
||||||
|
self.robot_ids[robot_id].grid_pos)))
|
||||||
|
else:
|
||||||
|
self.grid_control(robot_id, cmd)
|
||||||
|
clientsocket.sendall(b'OK\n')
|
||||||
|
|
||||||
|
else:
|
||||||
|
print("invalid robot id!")
|
||||||
|
clientsocket.sendall(b'Invalid robot id!\n')
|
||||||
|
else:
|
||||||
|
clientsocket.sendall(b'Invalid command!\n')
|
||||||
|
else: # len(inputs) <= 1
|
||||||
|
if b'quit' in inputs[0]:
|
||||||
|
clientsocket.close()
|
||||||
|
self.comm_socket.close()
|
||||||
|
connected = False
|
||||||
|
running = False
|
||||||
|
else:
|
||||||
|
print("could not process command!")
|
||||||
|
clientsocket.sendall(b'Could not process request!\n')
|
||||||
|
|
||||||
|
except RuntimeError:
|
||||||
|
print("disconnected")
|
||||||
|
connected = False
|
||||||
|
clientsocket.close()
|
||||||
|
|
||||||
|
def grid_control(self, robot_id, cmd):
|
||||||
|
robot = self.robot_ids[robot_id] # get robot to be controlled
|
||||||
|
|
||||||
|
print("robot grid pos before move: ", robot)
|
||||||
|
robot.move(cmd)
|
||||||
|
print("robot grid pos after move: ", robot)
|
||||||
|
|
||||||
|
target = self.estimator.get_pos_from_grid_point(robot.grid_x, robot.grid_y, robot.grid_orientation)
|
||||||
|
|
||||||
|
self.controller.move_to_pos(target, robot)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args):
|
||||||
|
rc = RemoteController()
|
||||||
|
rc.run()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main(sys.argv)
|
37
remote_control/robot.py
Normal file
37
remote_control/robot.py
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
import socket
|
||||||
|
|
||||||
|
class Robot:
|
||||||
|
def __init__(self, marker_id, ip):
|
||||||
|
self.id = marker_id
|
||||||
|
self.pos = None
|
||||||
|
self.euler = None
|
||||||
|
|
||||||
|
self.ip = ip
|
||||||
|
self.socket = socket.socket()
|
||||||
|
|
||||||
|
# currently active control
|
||||||
|
self.u1 = 0.0
|
||||||
|
self.u2 = 0.0
|
||||||
|
|
||||||
|
self.connected = False
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
# connect to robot
|
||||||
|
try:
|
||||||
|
print("connecting to robot {} with ip {} ...".format(self.id, self.ip))
|
||||||
|
self.socket.connect((self.ip, 1234)) # connect to robot
|
||||||
|
print("connected!")
|
||||||
|
self.connected = True
|
||||||
|
except socket.error:
|
||||||
|
print("could not connect to robot {} with ip {}".format(self.id, self.ip))
|
||||||
|
|
||||||
|
def send_cmd(self, u1=0.0, u2=0.0):
|
||||||
|
if self.socket:
|
||||||
|
try:
|
||||||
|
self.socket.send(f'({u1},{u2})\n'.encode())
|
||||||
|
except BrokenPipeError:
|
||||||
|
print(f"error: connection to robot {self.id} with ip {self.ip} lost")
|
||||||
|
pass
|
||||||
|
except ConnectionResetError:
|
||||||
|
print(f"error: connection to robot {self.id} with ip {self.ip} lost")
|
||||||
|
pass
|
Loading…
Reference in New Issue
Block a user