Compare commits

..

37 Commits

Author SHA1 Message Date
a872232880 enabled backwards driving and adjusted parameters a bit 2020-11-09 22:07:09 +01:00
91cde26908 working PID controller for driving forwards 2020-11-09 21:34:43 +01:00
993d4c0141 added options to run detection on inverted markers and to disable auto-exposure 2020-11-08 16:29:48 +01:00
119d857531 general refactoring 2020-10-24 21:16:17 +02:00
8728db82e8 general refactoring 2020-10-24 21:12:03 +02:00
a0f701fe83 general refactoring 2020-10-24 21:11:22 +02:00
7ded3bee79 general refactoring 2020-10-24 21:08:20 +02:00
dd162018d8 improved grid drawing code to also work when markers are occluded and added documentation 2020-10-24 20:54:47 +02:00
a22a3fdea3 adjusted to rename of aruco estimator and changed order for server commands 2020-10-24 20:11:32 +02:00
ab6cc79eea added kick-start for small motor control signals 2020-10-24 20:05:39 +02:00
7f194f57f2 added python3 support 2020-10-24 20:05:00 +02:00
409a9980bd added exception for lost connection 2020-10-24 20:01:16 +02:00
9e1ce9b512 renamed aruco estimator file 2020-10-24 19:59:56 +02:00
3bd11a53ce client and server for robot position estimate 2020-10-24 19:56:18 +02:00
02a83f405f added option to draw marker coordinate system and modified dict for corner marker estimates 2020-10-24 19:55:04 +02:00
ec6b2bbf4a removed unnecessary inheritance 2020-10-23 20:56:41 +02:00
9bb4f2920f rename ArucoEstimator file 2020-10-23 20:55:08 +02:00
0acc59cc53 replaced cv2.aruco with python bindings for new aruco version 2020-10-23 20:44:20 +02:00
280fb10427 replaced cv2.aruco with python bindings for new aruco version 2020-10-23 20:43:45 +02:00
0bc6b68a20 playing around with different markers and sizes 2020-10-21 21:10:04 +02:00
e93ae65e0f added option to dynamically initialize grid and robots and added support for arbitrary grid orientation w.r.t. camera 2020-10-18 18:03:33 +02:00
056d91da52 fixed error with turn around command 2020-10-18 16:58:32 +02:00
3ab8fadc7b robot driving to position determined by mouse click 2020-10-18 15:16:28 +02:00
49645765d7 input handling using opencv window 2020-10-15 22:41:30 +02:00
e3753f7644 improved grid logic for robots + general cleanup 2020-10-15 20:35:32 +02:00
32892e5dcf added option to compute angle for marker based grid 2020-10-14 23:08:19 +02:00
149e9b3536 coupled robot control with new position estimator 2020-10-14 21:15:43 +02:00
36f3ccbd6c fixed a problem where robot could drive diagonally caused by int casting error and improved the socket buffer size which could lead to problems for longer messages 2020-09-15 15:23:01 +02:00
e7ab92dfe2 added more commands and improved error checking 2020-09-09 20:21:34 +02:00
c98760a2c7 working version with two robots 2020-09-09 18:12:12 +02:00
280aee75ee parameter tuning. works well with control scaling of 0.3 2020-09-09 17:33:13 +02:00
826fa4be0f fixed problem with target angle of -pi, pi which was caused by discontinuities in the angle measurements 2020-09-07 16:18:35 +02:00
6fb3cd5484 so-so working version for mpc control 2020-09-04 16:53:39 +02:00
d170524370 added 8th waypoint 2020-01-16 10:58:33 +01:00
0c59a77b28 changed marker ids 2020-01-16 07:51:54 +01:00
c1b7640d03 fixed bug with initialization of i2c option 2020-01-12 16:11:58 +01:00
aab1b78e8f added accessability features (choosing ip, robot id at startup, changing max speed), remove obstacle code 2020-01-12 16:11:26 +01:00
13 changed files with 1387 additions and 79 deletions

View File

@ -10,6 +10,10 @@ m0.speed(5000)
"""
import ustruct
import utime
import math
sign = lambda x: math.copysign(1, x)
_STATE_BRAKE = const(0)
@ -49,6 +53,10 @@ class Motor:
def speed(self, speed=None):
if speed is None:
return self._speed
if abs(speed) < 30 and abs(speed) > 10: # kick-start for small speeds
self.speed(int(100 * sign(speed)))
utime.sleep_ms(5)
if speed > 0:
self._speed = min(10000, max(1, speed))
self._state = _STATE_RIGHT

View File

@ -2,7 +2,7 @@ import machine
import sys
from machine import I2C, Pin
i2c = True
i2c = False
if i2c:
import d1motor
else:
@ -100,5 +100,5 @@ class Robot:
del comm_socket
del socket
wall_e = Robot()
wall_e = Robot(i2c=i2c)
wall_e.remote_control()

View File

@ -0,0 +1,400 @@
import pyrealsense2 as rs
import numpy as np
import cv2
import os
import time
from shapely.geometry import LineString
from queue import Queue
import aruco
class ArucoEstimator:
corner_marker_ids = {
'a': 0,
'b': 1,
'c': 2,
'd': 3
}
corner_estimates = {
'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
}
def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8):
self.grid_columns = grid_columns
self.grid_rows = grid_rows
if robot_marker_ids is None:
robot_marker_ids = []
self.robot_marker_ids = robot_marker_ids
self.robot_marker_estimates = dict([(marker_id, {'t': None, 'x': None, 'y': None, 'angle': None})
for marker_id in self.robot_marker_ids])
self.draw_grid = False
self.event_queue = Queue()
if use_realsense: # check if realsense camera is connected
# Configure depth and color streams
self.pipeline = rs.pipeline()
config = rs.config()
#config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30)
config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
# Start streaming
self.pipeline.start(config)
# disable auto exposure
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
color_sensor.set_option(rs.option.enable_auto_exposure, False)
camera_intrinsics = self.pipeline.get_active_profile().get_stream(
rs.stream.color).as_video_stream_profile().get_intrinsics()
self.camera_matrix = np.zeros((3, 3))
self.camera_matrix[0][0] = camera_intrinsics.fx
self.camera_matrix[1][1] = camera_intrinsics.fy
self.camera_matrix[0][2] = camera_intrinsics.ppx
self.camera_matrix[1][2] = camera_intrinsics.ppy
self.dist_coeffs = np.array(camera_intrinsics.coeffs)
# more info: https://dev.intelrealsense.com/docs/projection-in-intel-realsense-sdk-20
else:
# use other camera
self.cv_camera = cv2.VideoCapture(0)
self.pipeline = None
# create detector and get parameters
self.detector = aruco.MarkerDetector()
# self.detector.setDictionary('ARUCO_MIP_36h12')
#self.detector.setDictionary('ARUCO_MIP_16h3')
# self.detector.setDictionary('ARUCO')
#self.detector.setDetectionMode(aruco.DM_NORMAL, 0.05)
self.detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.05)
self.detector_params = self.detector.getParameters()
# print detector parameters
print("detector params:")
for val in dir(self.detector_params):
if not val.startswith("__"):
print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
self.camparam = aruco.CameraParameters()
if use_realsense:
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "realsense.yml"))
else:
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "dfk72_6mm_param2.yml"))
print(self.camparam)
def mouse_callback(self, event, px, py, flags, param):
"""
This function is called for each mouse event inside the opencv window.
If there are reference markers available we compute the real world position corresponding to the clicked
position and put it in an event queue.
:param event: type of event (mouse movement, left click, right click, etc.)
:param px: x-position of event
:param py: y-position of event
"""
if event == cv2.EVENT_LBUTTONDOWN:
if self.all_corners_detected():
# inter/extrapolate from clicked point to marker position
px1 = self.corner_estimates['a']['pixel_coordinate'][0]
px3 = self.corner_estimates['c']['pixel_coordinate'][0]
py1 = self.corner_estimates['a']['pixel_coordinate'][1]
py3 = self.corner_estimates['c']['pixel_coordinate'][1]
x1 = self.corner_estimates['a']['x']
x3 = self.corner_estimates['c']['x']
y1 = self.corner_estimates['a']['y']
y3 = self.corner_estimates['c']['y']
alpha = (px - px1) / (px3 - px1)
beta = (py - py1) / (py3 - py1)
print(f"alpha = {alpha}, beta = {beta}")
target_x = x1 + alpha * (x3 - x1)
target_y = y1 + beta * (y3 - y1)
target = np.array([target_x, target_y, 0])
print(f"target = ({target_x},{target_y})")
self.event_queue.put(('click', target))
else:
print("not all markers have been detected!")
def run_tracking(self, draw_markers=True, draw_marker_coordinate_system=False, invert_grayscale=False):
"""
Run the marker tracking in a loop
"""
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
cv2.setMouseCallback('RoboRally', self.mouse_callback)
try:
running = True
while running:
if self.pipeline:
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
if not color_frame:
continue
# Convert images to numpy arrays
color_image = np.asanyarray(color_frame.get_data())
else:
# Capture frame-by-frame
ret, color_image = self.cv_camera.read()
t_image = time.time()
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
if invert_grayscale:
cv2.bitwise_not(gray, gray)
# run aruco marker detection
detected_markers = self.detector.detect(gray)
# extract data for detected markers
detected_marker_data = {}
for marker in detected_markers:
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
if marker.id >= 0:
if marker.id in self.corner_marker_ids.values():
marker.calculateExtrinsics(0.1, self.camparam)
else:
marker.calculateExtrinsics(0.07, self.camparam)
detected_marker_data[marker.id]['Rvec'] = marker.Rvec
detected_marker_data[marker.id]['Tvec'] = marker.Tvec
if marker.id >= 0: # draw markers onto the image
if draw_markers:
marker.draw(color_image, np.array([255, 255, 255]), 2, True)
if draw_marker_coordinate_system:
aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
# store data
for marker_id, data in detected_marker_data.items():
self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image)
# draw grid
if self.draw_grid:
color_image = self.draw_grid_lines(color_image, detected_marker_data)
color_image = self.draw_robot_pos(color_image, detected_marker_data)
# Show images
cv2.imshow('RoboRally', color_image)
key = cv2.waitKey(1)
if key > 0:
self.event_queue.put(('key', key))
if key == ord('g'):
self.draw_grid = not self.draw_grid
if key == ord('q'):
running = False
if key == ord('a'):
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
if color_sensor.get_option(rs.option.enable_auto_exposure) == 1.0:
color_sensor.set_option(rs.option.enable_auto_exposure, False)
print("auto exposure OFF")
else:
color_sensor.set_option(rs.option.enable_auto_exposure, True)
print("auto exposure ON")
finally:
cv2.destroyAllWindows()
if self.pipeline is not None:
# Stop streaming
self.pipeline.stop()
def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image):
# update the marker estimate with new data
if marker_id in self.corner_marker_ids.values():
# for corner markers we compute the mean of all measurements s.t. the position stabilizes over time
# (we assume the markers do not move)
# get corresponding corner to the detected marker
corner = next(filter(lambda key: self.corner_marker_ids[key] == marker_id, self.corner_marker_ids.keys()))
old_estimate_x = self.corner_estimates[corner]['x']
old_estimate_y = self.corner_estimates[corner]['y']
n_estimates = self.corner_estimates[corner]['n_estimates']
x = tvec[0][0]
y = -tvec[1][0] # flip y coordinate
if not any([old_estimate_x is None, old_estimate_y is None]):
new_estimate_x = (n_estimates * old_estimate_x + x) / (n_estimates + 1) # weighted update
new_estimate_y = (n_estimates * old_estimate_y + y) / (n_estimates + 1)
else:
new_estimate_x = x # first estimate
new_estimate_y = y
self.corner_estimates[corner]['t'] = t_image
self.corner_estimates[corner]['x'] = new_estimate_x
self.corner_estimates[corner]['y'] = new_estimate_y
self.corner_estimates[corner]['n_estimates'] = n_estimates + 1
self.corner_estimates[corner]['pixel_coordinate'] = pixel_coord_center
elif marker_id in self.robot_marker_ids:
# for robot markers we extract x and y position as well as the angle
# here we could also implement a filter
x = tvec[0][0]
y = -tvec[1][0] # flip y coordinate
# compute angle
rot_mat, _ = cv2.Rodrigues(rvec)
pose_mat = cv2.hconcat((rot_mat, tvec))
_, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat)
angle = -euler_angles[2][0] * np.pi / 180.0
self.robot_marker_estimates[marker_id] = {'t': t_image, 'x': x, 'y': y, 'angle': angle}
def all_corners_detected(self):
# checks if all corner markers have been detected at least once
return not any([estimate['n_estimates'] == 0 for estimate in self.corner_estimates.values()])
def all_robots_detected(self):
# checks if all robot markers have been detected at least once
return not any([estimate['t'] is None for estimate in self.robot_marker_estimates.values()])
def get_pos_from_grid_point(self, x, y, orientation=None):
"""
returns the position for the given grid point based on the current corner estimates
:param x: x position on the grid ( 0 &le x &lt number of grid columns)
:param y: y position on the grid ( 0 &le x &lt number of grid rows)
:param orientation: (optional) orientation in the given grid cell (one of ^, >, v, &lt )
:return: numpy array with corresponding real world x- and y-position
if orientation was specified the array also contains the matching angle for the orientation
"""
assert 0 <= x < self.grid_columns
assert 0 <= y < self.grid_rows
assert self.all_corners_detected()
# compute column line
a = np.array([self.corner_estimates['a']['x'], self.corner_estimates['a']['y']])
b = np.array([self.corner_estimates['b']['x'], self.corner_estimates['b']['y']])
c = np.array([self.corner_estimates['c']['x'], self.corner_estimates['c']['y']])
d = np.array([self.corner_estimates['d']['x'], self.corner_estimates['d']['y']])
x_frac = (x + 0.5) / self.grid_columns
y_frac = (y + 0.5) / self.grid_rows
vab = b - a
vdc = c - d
column_line_top = a + x_frac * vab
column_line_bottom = d + x_frac * vdc
vad = d - a
vbc = c - b
row_line_top = a + y_frac * vad
row_line_bottom = b + y_frac * vbc
column_line = LineString([column_line_top, column_line_bottom])
row_line = LineString([row_line_top, row_line_bottom])
int_pt = column_line.intersection(row_line)
point_of_intersection = np.array([int_pt.x, int_pt.y])
if orientation is not None:
# compute angle corresponding to the orientation w.r.t. the grid
# TODO: test this code
angle_ab = np.arctan2(vab[1], vab[0])
angle_dc = np.arctan2(vdc[1], vdc[0])
angle_ad = np.arctan2(vad[1], vad[0])
angle_bc = np.arctan2(vbc[1], vbc[0])
angle = 0.0
if orientation == '>':
angle = y_frac * angle_ab + (1 - y_frac) * angle_dc
elif orientation == '<':
angle = y_frac * angle_ab + (1 - y_frac) * angle_dc + np.pi
elif orientation == 'v':
angle = x_frac * angle_ad + (1 - x_frac) * angle_bc
elif orientation == '^':
angle = x_frac * angle_ad + (1 - x_frac) * angle_bc + np.pi
return np.array((point_of_intersection[0], point_of_intersection[1], angle))
else:
return point_of_intersection
def get_grid_point_from_pos(self):
# TODO return the nearest grid point for the given position estimate
pass
def print_corner_estimates(self):
for key, value in self.corner_estimates.items():
if value['n_estimates'] != 0:
print(f"corner marker {key} at pos ({value['x']},{value['y']})")
def draw_corner_line(self, frame, corner_1, corner_2):
# draws a line between the given markers onto the given frame
corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate']
corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate']
if corner_1_center is not None and corner_2_center is not None:
frame = cv2.line(frame, tuple(corner_1_center), tuple(corner_2_center), color=(0, 0, 255), thickness=2)
return frame
def draw_grid_lines(self, frame, detected_marker_data):
# draws a grid onto the given frame
frame = self.draw_corner_line(frame, 'a', 'b')
frame = self.draw_corner_line(frame, 'b', 'c')
frame = self.draw_corner_line(frame, 'c', 'd')
frame = self.draw_corner_line(frame, 'd', 'a')
if not any([estimate['pixel_coordinate'] is None for estimate in self.corner_estimates.values()]):
# compute outlines of board
a = self.corner_estimates['a']['pixel_coordinate']
b = self.corner_estimates['b']['pixel_coordinate']
c = self.corner_estimates['c']['pixel_coordinate']
d = self.corner_estimates['d']['pixel_coordinate']
# draw vertical lines
vab = b - a
vdc = c - d
for x in range(1, self.grid_columns):
column_line_top = a + x / self.grid_columns * vab
column_line_bottom = d + x / self.grid_columns * vdc
frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(0, 255, 0),
thickness=1)
# draw horizontal lines
vad = d - a
vbc = c - b
for y in range(1, self.grid_rows):
row_line_top = a + y / self.grid_rows * vad
row_line_bottom = b + y / self.grid_rows * vbc
frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(0, 255, 0),
thickness=1)
return frame
def get_robot_state_estimate(self, marker_id):
if marker_id in self.robot_marker_estimates:
if self.robot_marker_estimates[marker_id]['t'] is not None:
return np.array([self.robot_marker_estimates[marker_id]['x'],
self.robot_marker_estimates[marker_id]['y'],
self.robot_marker_estimates[marker_id]['angle']])
else:
print(f"error: no estimate available for robot {marker_id}")
return None
else:
print(f"error: invalid robot id {marker_id}")
return None
def draw_robot_pos(self, frame, detected_marker_data):
# draws information about the robot positions onto the given frame
robot_corners_pixel_coords = {}
for marker_id, estimate in self.robot_marker_estimates.items():
if marker_id in detected_marker_data.keys():
robot_corners_pixel_coords[marker_id] = tuple(detected_marker_data[marker_id]['marker_center'])
for marker_id, coord in robot_corners_pixel_coords.items():
x = self.robot_marker_estimates[marker_id]['x']
y = self.robot_marker_estimates[marker_id]['y']
angle = self.robot_marker_estimates[marker_id]['angle']
#frame = cv2.putText(frame, "pos = ({:5.3f}, {:5.3f}), ang = {:5.3f}".format(x, y, angle), coord,
# cv2.FONT_HERSHEY_SIMPLEX, 0.50, (0, 255, 0))
frame = cv2.putText(frame, f"{marker_id}", coord, cv2.FONT_HERSHEY_PLAIN, 2, (0, 255, 255), thickness=4)
return frame
if __name__ == "__main__":
estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14])
estimator.run_tracking(draw_markers=True, invert_grayscale=True)

View File

@ -3,6 +3,9 @@ import time
# look at: https://github.com/casadi/casadi/blob/master/docs/examples/python/vdp_indirect_multiple_shooting.py
# TODO for roborally
# implement essential movements: forward, backward, turn left, turn right on grid-based level
class OpenLoopSolver:
def __init__(self, N=20, T=4.0):
self.T = T
@ -40,7 +43,8 @@ class OpenLoopSolver:
# cost functional
target = (-0.0, 0.0)
L = (x-target[0]) ** 2 + (y-target[1]) ** 2 + 1e-2 * theta ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
L = (x-target[0]) ** 2 + (y-target[1]) ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
#L = (x-target[0]) ** 2 + (y-target[1]) ** 2 + 1e-2 * theta ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
# Fixed step Runge-Kutta 4 integrator
M = 4 # RK4 steps per interval
@ -138,7 +142,12 @@ class OpenLoopSolver:
#return
def solve(self, x0, target, obstacles, track):
def solve(self, x0, target):
angles_unwrapped = np.unwrap([x0[2], target[2]]) # unwrap angle to avoid jump in data
x0[2] = angles_unwrapped[0]
target[2] = angles_unwrapped[1]
tstart = time.time()
# alternative solution using multiple shooting (way faster!)
self.opti = Opti() # Optimization problem
@ -186,7 +195,11 @@ class OpenLoopSolver:
theta)) * omegal * omega_max
f3 = -(r / (2 * R) * omegar - r / (2 * R) * omegal) * omega_max
xdot = vertcat(f1, f2, f3)
L = (x - target[0]) ** 2 + (y - target[1]) ** 2 + 1e-2 * (theta - target[2]) ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
#L = 100 * (x - target[0]) ** 2 + 100 * (y - target[1]) ** 2 + 1e-1 * (theta - target[2])**2 + 1e-2 * (omegar ** 2 + omegal ** 2)
L = 100 * (x - target[0]) ** 2 + 100 * (y - target[1]) ** 2 + 1e-1 * (target[2] - theta) ** 2 + 1e-2 * (
omegar ** 2 + omegal ** 2)
#L = (x - target[0]) ** 2 + (y - target[1]) ** 2 + 1e-2 * (theta - target[2]) ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
self.f = Function('f', [state, control], [xdot, L])
# ---- solve NLP ------
@ -222,31 +235,13 @@ class OpenLoopSolver:
self.opti.subject_to(self.Q[:, 0] == 0.0)
solver = self.opti.solver("ipopt", {}, {"print_level": 0})
solver = self.opti.solver("ipopt", {'print_time': False}, {"print_level": 0})
# ---- misc. constraints ----------
# self.opti.subject_to(X[1,:]>=0) # Time must be positive
# self.opti.subject_to(X[2,:]<=4) # Time must be positive
# self.opti.subject_to(X[2,:]>=-2) # Time must be positive
# avoid obstacle
for o in obstacles:
p = obstacles[o].pos
r = obstacles[o].radius
if p is not None:
for k in range(1,self.N):
self.opti.subject_to((self.X[0,k]-p[0])**2 + (self.X[1,k]-p[1])**2 + self.slack > r**2)
# keep inside track
# TODO
# track_ids = track.inner.keys()
# a = track.outer[track_ids[0]]
# b = track.outer[track_ids[1]]
# c = track.outer[track_ids[2]]
# d = track.outer[track_ids[3]]
#
# for k in range(1, self.N):
# self.opti.subject_to(self.opti.subject_to(self.X))
posx = self.X[0, :]
posy = self.X[1, :]
angle = self.X[2, :]
@ -256,7 +251,7 @@ class OpenLoopSolver:
self.opti.subject_to(angle[0] == x0[2]) # finish line at position 1
tend = time.time()
print("setting up problem took {} seconds".format(tend - tstart))
#print("setting up problem took {} seconds".format(tend - tstart))
tstart = time.time()
if self.use_warmstart and self.opti_x0 is not None:
@ -267,7 +262,7 @@ class OpenLoopSolver:
print("could not set warmstart")
sol = self.opti.solve() # actual solve
tend = time.time()
print("solving the problem took {} seconds".format(tend - tstart))
#print("solving the problem took {} seconds".format(tend - tstart))
tstart = time.time()
self.opti_x0 = sol.value(self.opti.x)
@ -278,7 +273,7 @@ class OpenLoopSolver:
u_opt_1 = sol.value(self.U[0,:])
u_opt_2 = sol.value(self.U[1,:])
tend = time.time()
print("postprocessing took {} seconds".format(tend - tstart))
#print("postprocessing took {} seconds".format(tend - tstart))
return (u_opt_1, u_opt_2, sol.value(posx), sol.value(posy))

View File

@ -28,27 +28,27 @@ while running:
if event.key == pygame.K_LEFT:
u1 = -vmax
u2 = vmax
print("turn left: ({},{})".format(u1, u2))
print(f"turn left: ({u1},{u2})")
elif event.key == pygame.K_RIGHT:
u1 = vmax
u2 = -vmax
print("turn right: ({},{})".format(u1, u2))
print(f"turn right: ({u1},{u2})")
elif event.key == pygame.K_UP:
u1 = vmax
u2 = vmax
print("forward: ({},{})".format(u1, u2))
print(f"forward: ({u1},{u2})")
elif event.key == pygame.K_DOWN:
u1 = -vmax
u2 = -vmax
print("backward: ({},{})".format(u1, u2))
print(f"backward: ({u1},{u2})")
elif event.key == pygame.K_ESCAPE:
print("quit")
running = False
u1 = 0.0
u2 = 0.0
rc_socket.send('({},{})\n'.format(u1, u2))
rc_socket.send(f'({u1},{u2})\n'.encode())
elif event.type == pygame.KEYUP:
print("key released, resetting: ({},{})".format(u1, u2))
rc_socket.send('({},{})\n'.format(u1, u2))
rc_socket.send(f'({u1},{u2})\n'.encode())

View File

@ -0,0 +1,13 @@
import socket
HOST, PORT = "localhost", 42424
robot_id = 15
# SOCK_DGRAM is the socket type to use for UDP sockets
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(f"{robot_id}\n".encode(), (HOST, PORT))
while True:
received = str(sock.recv(1024), "utf-8")
print("Received: {}".format(received))

View File

@ -0,0 +1,46 @@
import socketserver
import threading
import time
from aruco_estimator import ArucoEstimator
class MeasurementHandler(socketserver.BaseRequestHandler):
def handle(self) -> None:
data = self.request[0]
socket = self.request[1]
cur_thread = threading.current_thread()
print(f"current thread {cur_thread}")
try:
marker_id = int(data)
if marker_id in self.server.estimator.robot_marker_estimates:
while True:
socket.sendto(f"{self.server.estimator.robot_marker_estimates[marker_id]}\n".encode(),
self.client_address)
time.sleep(1.0 / self.server.max_measurements_per_second)
else:
socket.sendto("error: unknown robot marker id\n".encode(),
self.client_address)
except ValueError:
socket.sendto("error: data not understood. expected robot marker id (int)\n".encode(), self.client_address)
return
class MeasurementServer(socketserver.ThreadingMixIn, socketserver.UDPServer):
def __init__(self, server_address, RequestHandlerClass, estimator, max_measurements_per_second=30):
super().__init__(server_address, RequestHandlerClass)
self.estimator = estimator
self.max_measurements_per_second = max_measurements_per_second
if __name__ == "__main__":
aruco_estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[15])
estimator_thread = threading.Thread(target=aruco_estimator.run_tracking)
estimator_thread.start()
with MeasurementServer(('127.0.0.1', 42424), MeasurementHandler, aruco_estimator,
max_measurements_per_second=30) as measurement_server:
measurement_server.serve_forever()
# receive with: nc 127.0.0.1 42424 -u -> 15 + Enter

View File

@ -0,0 +1,175 @@
import numpy as np
import time
from casadi_opt import OpenLoopSolver
class MPCController:
def __init__(self, estimator):
self.t = time.time()
self.estimator = estimator
self.controlling = False
self.mstep = 2
self.ols = OpenLoopSolver(N=20, T=1.0)
self.ols.setup()
self.dt = self.ols.T / self.ols.N
# integrator
self.omega_max = 5.0
self.control_scaling = 0.4
def move_to_pos(self, target_pos, robot, near_target_counter=5):
near_target = 0
while near_target < near_target_counter:
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
pass
elif event[0] == 'key':
key = event[1]
if key == 84: # arrow up
self.controlling = True
self.t = time.time()
elif key == 82: # arrow down
self.controlling = False
robot.send_cmd()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0])
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 113:
print("quit!")
self.controlling = False
robot.send_cmd()
return
elif key == 27: # escape
print("quit!")
self.controlling = False
robot.send_cmd()
return
x_pred = self.get_measurement(robot.id)
if x_pred is not None:
error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
# print("error pos = ", error_pos)
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
# if error_pos > 0.075 or error_ang > 0.35:
if error_pos > 0.05 or error_ang > 0.1:
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
# us1 = res[0]
# us2 = res[1]
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
# print("u = {}", (us1, us2))
# print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
# print("sleeping for {} seconds...".format(self.dt - dt_mpc))
time.sleep(self.dt - dt_mpc)
else:
us1 = [0] * self.mstep
us2 = [0] * self.mstep
near_target += 1
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
else:
print("robot not detected yet!")
def interactive_control(self, robots):
controlled_robot_number = 0
robot = robots[controlled_robot_number]
target_pos = np.array([0.0, 0.0, 0.0])
running = True
while running:
# handle events from opencv window
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
target_pos = event[1]
elif event[0] == 'key':
key = event[1]
if key == 32: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
robot.send_cmd() # stop robot
else:
print("enable control")
self.t = time.time()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 9: # TAB
# switch controlled robot
robot.send_cmd() # stop current robot
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
robot = robots[controlled_robot_number]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
self.controlling = False
robot.send_cmd()
return
if self.controlling:
# measure state
x_pred = self.get_measurement(robot.id)
# print(np.linalg.norm(x_pred-target_pos))
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
time.sleep(self.dt - dt_mpc)
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
def get_measurement(self, robot_id):
return self.estimator.get_robot_state_estimate(robot_id)

View File

@ -0,0 +1,55 @@
import sys
import socket
import threading
import time
from robot import Robot
from mpc_controller import MPCController
from aruco_estimator import ArucoEstimator
class RemoteController:
def __init__(self):
self.robots = []
#self.robots = [Robot(11, '192.168.1.11'), Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
# socket for movement commands
self.comm_socket = socket.socket()
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
for robot in self.robots:
robot.connect()
self.comm_socket.bind(('', 1337))
self.comm_socket.listen(5)
self.t = time.time()
# start thread for marker position detection
self.estimator = ArucoEstimator(self.robot_ids.keys())
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking)
self.estimator_thread.start()
self.controller = MPCController(self.estimator)
def run(self):
print("waiting until all markers are detected...")
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
pass
print("starting control")
self.controller.interactive_control(robots=self.robots)
def main(args):
rc = RemoteController()
rc.run()
if __name__ == '__main__':
main(sys.argv)

View File

@ -0,0 +1,267 @@
import numpy as np
import math
import time
class PIDController:
def __init__(self, estimator):
self.t = time.time()
self.estimator = estimator
self.controlling = False
self.P_angle = 0.4 # PID parameters determined by Ziegler-Nichols. measured K_crit = 1.4, T_crit = 1.5
self.I_angle = 0.35
self.D_angle = 0.1
self.P_pos = 0.50
self.I_pos = 0.3
self.D_pos = 0.1
self.mode = None
def move_to_pos(self, target_pos, robot, near_target_counter=5):
near_target = 0
while near_target < near_target_counter:
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
pass
elif event[0] == 'key':
key = event[1]
if key == 84: # arrow up
self.controlling = True
self.t = time.time()
elif key == 82: # arrow down
self.controlling = False
robot.send_cmd()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0])
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 113:
print("quit!")
self.controlling = False
robot.send_cmd()
return
elif key == 27: # escape
print("quit!")
self.controlling = False
robot.send_cmd()
return
x_pred = self.get_measurement(robot.id)
if x_pred is not None:
error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
# print("error pos = ", error_pos)
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
# if error_pos > 0.075 or error_ang > 0.35:
if error_pos > 0.05 or error_ang > 0.1:
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
# us1 = res[0]
# us2 = res[1]
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
# print("u = {}", (us1, us2))
# print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
# print("sleeping for {} seconds...".format(self.dt - dt_mpc))
time.sleep(self.dt - dt_mpc)
else:
us1 = [0] * self.mstep
us2 = [0] * self.mstep
near_target += 1
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
else:
print("robot not detected yet!")
def interactive_control(self, robots):
controlled_robot_number = 0
robot = robots[controlled_robot_number]
ts = []
angles = []
target_pos = np.array([0.0, 0.0, 0.0])
e_angle_old = 0.0
e_pos_old = 0.0
i = 0.0
i_angle = 0.0
i_pos = 0.0
t0 = time.time()
running = True
while running:
# handle events from opencv window
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
target_pos = event[1]
i_angle = 0
i_pos = 0
e_pos_old = 0
e_angle_old = 0
self.mode = 'combined'
elif event[0] == 'key':
key = event[1]
if key == 32: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
robot.send_cmd() # stop robot
self.mode = None # reset control mode
else:
print("enable control")
i = 0
self.t = time.time()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
elif key == 97: # a
self.mode = 'angle'
e_angle_old = 0
i = 0
self.t = time.time()
elif key == 99: # c
self.mode = 'combined'
e_angle_old = 0
e_pos_old = 0
i_angle = 0
i_pos = 0
self.t = time.time()
elif key == 112: # p
self.mode = 'position'
e_pos_old = 0
i = 0
self.t = time.time()
elif key == 43: # +
self.P_pos += 0.1
print("P = ", self.P_angle)
elif key == 45: # -
self.P_pos -= 0.1
print("P = ", self.P_angle)
elif key == 9: # TAB
# switch controlled robot
robot.send_cmd() # stop current robot
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
robot = robots[controlled_robot_number]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
self.controlling = False
robot.send_cmd()
return
if self.controlling:
# measure state
x_pred = self.get_measurement(robot.id)
dt = self.t - time.time()
#print(f"x_pred = {x_pred}\ntarget = {target_pos}\nerror = {np.linalg.norm(target_pos - x_pred)}\n")
if self.mode == 'angle':
# compute angle such that robot faces to target point
target_angle = target_pos[2]
ts.append(time.time() - t0)
angles.append(x_pred[2])
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
p = self.P_angle * e_angle
# i += self.I * dt * e # right Riemann sum
i += self.I_angle * dt * (e_angle + e_angle_old)/2.0 # trapezoidal rule
d = self.D_angle * (e_angle - e_angle_old)/dt
u1 = p - i - d
u2 = - u1
e_angle_old = e_angle
elif self.mode == 'combined':
# compute angle such that robot faces to target point
v = target_pos[0:2] - x_pred[0:2]
target_angle = math.atan2(v[1], v[0])
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
e_pos = np.linalg.norm(v)
if e_pos < 0.05:
self.mode = 'angle'
e_angle_old = 0
e_pos_old = 0
i_angle = 0
i_pos = 0
u1 = 0
u2 = 0
else:
forward = abs(e_angle) < np.pi/2.0
if not forward:
if e_angle > np.pi/2.0:
e_angle -= np.pi
else:
e_angle += np.pi
p_angle = self.P_angle * e_angle
i_angle += self.I_angle * dt * (e_angle + e_angle_old) / 2.0 # trapezoidal rule
d_angle = self.D_angle * (e_angle - e_angle_old) / dt
p_pos = self.P_pos * e_pos
i_pos += self.I_pos * dt * (e_pos + e_pos_old) / 2.0 # trapezoidal rule
d_pos = self.D_pos * (e_pos - e_pos_old) / dt
if forward:
print("forward")
u1 = p_angle + p_pos - i_angle - i_pos - d_angle - d_pos
u2 = - p_angle + p_pos + i_angle - i_pos + d_angle - d_pos
else:
print("backward")
u1 = p_angle - p_pos - i_angle + i_pos - d_angle + d_pos
u2 = - p_angle - p_pos + i_angle + i_pos + d_angle + d_pos
e_pos_old = e_pos
e_angle_old = e_angle
else:
u1 = 0.0
u2 = 0.0
#print(f"u = ({u1}, {u2})")
robot.send_cmd(u1, u2)
self.t = time.time() # save time when the most recent control was applied
time.sleep(0.1)
def get_measurement(self, robot_id):
return self.estimator.get_robot_state_estimate(robot_id)

View File

@ -30,8 +30,12 @@ from marker_pos_angle.msg import id_pos_angle
from collections import OrderedDict
from argparse import ArgumentParser
class Robot:
def __init__(self, id, ip=None):
def __init__(self, id, ip):
self.pos = None
self.orient = None
@ -52,16 +56,16 @@ class Track:
def __init__(self):
# ids in clockwise direction
self.inner = OrderedDict()
self.inner[0] = None
self.inner[1] = None
self.inner[2] = None
self.inner[3] = None
self.inner[4] = None
self.outer = OrderedDict()
self.outer[4] = None
self.outer[5] = None
self.outer[6] = None
self.outer[7] = None
self.outer[8] = None
self.track_complete = False
@ -122,20 +126,18 @@ def f_ode(t, x, u):
return dx
class RemoteController:
def __init__(self):
def __init__(self, id, ip):
self.robots = [Robot(11, '192.168.1.103')]
#self.robots = [Robot(14, '192.168.1.102')]
self.anim_stopped = False
#self.robots = [Robot(14, '192.168.1.103')]
#self.robots = [Robot(15, '192.168.1.102')]
self.robots = [Robot(id, ip)]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
obst = [Obstacle(12, 0.275), Obstacle(10, 0.275), Obstacle(14, 0.275), Obstacle(15, 0.275)]
self.obstacles = {}
for r in obst:
self.obstacles[r.id] = r
self.track = Track()
@ -198,13 +200,6 @@ class RemoteController:
self.track_line_inner, = self.ax.plot([], [])
self.track_line_outer, = self.ax.plot([], [])
self.circles = []
for o in self.obstacles:
self.circles.append(patch.Circle((0.0, 0.0), radius=0.1, fill=False, color='red', linestyle='--'))
for s in self.circles:
self.ax.add_artist(s)
self.ax.set_xlabel('x-position')
self.ax.set_ylabel('y-position')
self.ax.grid()
@ -231,6 +226,7 @@ class RemoteController:
def ani(self):
print("starting animation")
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
plt.ion()
@ -251,10 +247,13 @@ class RemoteController:
self.track_line_outer.set_data(self.track.outer_poly.exterior.xy)
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol,\
self.track_line_inner, self.track_line_outer, self.line_x,self.line_y, self.circles[0], self.circles[1],\
self.circles[2], self.circles[3],
self.track_line_inner, self.track_line_outer, self.line_x,self.line_y,
def ani_update(self, frame):
if self.anim_stopped:
self.ani.event_source.stop()
sys.exit(0)
#print("plotting")
self.mutex.acquire()
try:
@ -306,21 +305,11 @@ class RemoteController:
else:
self.line_ol.set_data([],[])
i = 0
obst_keys = self.obstacles.keys()
for s in self.circles:
o = self.obstacles[obst_keys[i]]
i = i + 1
if o.pos is not None:
s.center = o.pos
s.radius = o.radius
finally:
self.mutex.release()
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol, self.track_line_inner, self.track_line_outer,\
self.line_x, self.line_y, self.circles[0], self.circles[1],self.circles[2],self.circles[3],
self.line_x, self.line_y,
def measurement_callback(self, data):
#print(data)
@ -352,11 +341,6 @@ class RemoteController:
finally:
self.mutex.release()
# detect obstacles
if data.id in self.obstacles.keys():
obst = (data.x, data.y)
self.obstacles[data.id].pos = obst
# detect track
if data.id in self.track.inner.keys() or data.id in self.track.outer.keys():
self.track.set_id(data)
@ -369,7 +353,8 @@ class RemoteController:
markers_out = self.track.outer.values()
# find targets:
lamb = 0.2
# generate waypoints
lamb = 0.5
j = 0
for i in range(0,4):
p = np.array(markers_in[i]) + lamb * (np.array(markers_out[i]) - np.array(markers_in[i]))
@ -382,9 +367,18 @@ class RemoteController:
targets[j] = (mean[0], mean[1], 0.0)
j += 1
# final connection between first and last marker
mean_in = (np.array(markers_in[3]) + np.array(markers_in[0])) * 0.5
mean_out = (np.array(markers_out[3]) + np.array(markers_out[0])) * 0.5
mean = mean_in + (1.0 - lamb) * (mean_out - mean_in)
targets[j] = (mean[0], mean[1], 0.0)
print("targets = {}".format(targets.keys()))
auto_control = False
current_target = 0
control_scaling = 0.3
while True:
# open loop controller
@ -418,12 +412,32 @@ class RemoteController:
self.target = targets[5]
elif event.key == pygame.K_7:
self.target = targets[6]
elif event.key == pygame.K_8:
self.target = targets[7]
elif event.key == pygame.K_SPACE:
auto_control = not auto_control
if auto_control:
self.target = targets[current_target]
elif event.key == pygame.K_PLUS:
control_scaling += 0.1
control_scaling = min(control_scaling, 1.0)
elif event.key == pygame.K_MINUS:
control_scaling -= 0.1
control_scaling = max(control_scaling, 0.3)
elif event.key == pygame.K_ESCAPE:
print("quit!")
self.controlling = False
if self.rc_socket:
self.rc_socket.send('(0.0,0.0)\n')
self.anim_stopped = True
return
elif event.key == pygame.QUIT:
sys.exit(0)
print("quit!")
self.controlling = False
if self.rc_socket:
self.rc_socket.send('(0.0,0.0)\n')
self.anim_stopped = True
return
if self.controlling:
@ -432,17 +446,20 @@ class RemoteController:
if auto_control:
if np.linalg.norm(x_pred[0:2]-np.array(self.target[0:2])) < 0.3:
print("close to target!")
current_target = (current_target + 1) % 7
current_target = (current_target + 1) % 8
self.target = targets[current_target]
print("new target = {}".format(current_target))
tmpc_start = time.time()
# solve mpc open loop problem
res = self.ols.solve(x_pred, self.target, self.obstacles, self.track)
res = self.ols.solve(x_pred, self.target, [], self.track)
us1 = res[0]
us2 = res[1]
#us1 = res[0]
#us2 = res[1]
us1 = res[0] * control_scaling
us2 = res[1] * control_scaling
print("u = {}", (us1, us2))
# save open loop trajectories for plotting
self.mutex.acquire()
@ -493,9 +510,18 @@ class RemoteController:
def main(args):
parser = ArgumentParser()
parser.add_argument('id', metavar='id', type=str, help='marker id of the controlled robot')
parser.add_argument('ip', metavar='ip', type=str, help='ip address of the controlled robot')
args = parser.parse_args()
marker_id = int(args.id)
ip = args.ip
rospy.init_node('controller_node', anonymous=True)
rc = RemoteController()
rc = RemoteController(marker_id, ip)
pygame.init()
@ -508,7 +534,8 @@ def main(args):
pass
#threading.Thread(target=rc.input_handling).start()
threading.Thread(target=rc.controller).start()
controller_thread = threading.Thread(target=rc.controller)
controller_thread.start()
rc.ani()

285
remote_control/roborally.py Normal file
View File

@ -0,0 +1,285 @@
import sys
import socket
import threading
import time
from mpc_controller import MPCController
from robot import Robot
from aruco_estimator import ArucoEstimator
MSGLEN = 64
def myreceive(sock):
chunks = []
bytes_recd = 0
while bytes_recd < MSGLEN:
chunk = sock.recv(1)
if chunk == b'':
raise RuntimeError("socket connection broken")
chunks.append(chunk)
bytes_recd = bytes_recd + len(chunk)
if chunks[-1] == b'\n':
break
return b''.join(chunks)
class RoboRallyRobot(Robot):
# dictionary mapping the current orientation and a turn command to the resulting orientation
resulting_orientation = {
'^': {'turn left': '<', 'turn right': '>', 'turn around': 'v'},
'>': {'turn left': '^', 'turn right': 'v', 'turn around': '<'},
'v': {'turn left': '>', 'turn right': '<', 'turn around': '^'},
'<': {'turn left': 'v', 'turn right': '^', 'turn around': '>'},
}
# dictionary mapping an orientation to its opposite
opposites = {'^': 'v', '>': '<', 'v': '^', '<': '>'}
def __init__(self, id, ip, x, y, orientation):
super().__init__(id, ip)
self.grid_x = x
self.grid_y = y
self.grid_orientation = orientation
def get_neighbor_coordinates(self, direction):
# get the coordinates of the neighboring tile in the given direction
if direction == '^':
return self.grid_x, self.grid_y - 1
elif direction == '>':
return self.grid_x + 1, self.grid_y
elif direction == 'v':
return self.grid_x, self.grid_y + 1
elif direction == '<':
return self.grid_x - 1, self.grid_y
else:
print("error: unknown direction")
sys.exit(1)
def move(self, move_type):
if move_type == 'forward':
target_tile = self.get_neighbor_coordinates(self.grid_orientation)
self.grid_x = target_tile[0]
self.grid_y = target_tile[1]
elif move_type == 'backward':
opposite_orientation = RoboRallyRobot.opposites[self.grid_orientation]
target_tile = self.get_neighbor_coordinates(opposite_orientation)
self.grid_x = target_tile[0]
self.grid_y = target_tile[1]
elif 'turn' in move_type:
self.grid_orientation = RoboRallyRobot.resulting_orientation[self.grid_orientation][move_type]
elif 'nop' in move_type:
pass # nop command -> robot grid position does not change (used e.g. for driving the robot to initial
# position)
else:
print(f"error: invalid move: {move_type}")
sys.exit(1)
def __str__(self):
return self.__repr__()
def __repr__(self):
return f"grid x: {self.grid_x}, grid y: {self.grid_y}, grid orientation: {self.grid_orientation}"
class RemoteController:
valid_cmds = ['forward', 'backward', 'turn left', 'turn right', 'turn around', 'nop', 'get position',
'set position', 'initialize_robot', 'initialize_grid']
def __init__(self):
self.robots = []
#self.robots = [RoboRallyRobot(12, '192.168.1.12', x=1, y=1, orientation='>')]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
# socket for movement commands
self.comm_socket = socket.socket()
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
for robot in self.robots:
robot.connect()
self.comm_socket.bind(('', 1337))
self.comm_socket.listen(5)
self.t = time.time()
# start thread for marker position detection
self.estimator = ArucoEstimator(self.robot_ids.keys())
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking)
self.estimator_thread.start()
self.controller = MPCController(self.estimator)
print("waiting for corner and robot detection..")
while not self.estimator.all_robots_detected() or not self.estimator.all_corners_detected():
pass
print("everything detected!")
# drive robots to initial position
for robot_id in self.robot_ids:
self.grid_control(robot_id, 'nop')
def run(self):
print("waiting until all markers are detected...")
while not self.estimator.all_corners_detected():
pass
print("starting control")
running = True
while running:
(clientsocket, address) = self.comm_socket.accept()
clientsocket.settimeout(None)
connected = True
while connected:
try:
data = myreceive(clientsocket)
print("data received: ", data)
inputs = data.split(b',')
cmd = inputs[0]
cmd = cmd.strip().decode()
if len(inputs) > 1:
if cmd in RemoteController.valid_cmds:
if cmd == 'initialize_grid':
try:
grid_columns = int(inputs[1])
grid_rows = int(inputs[2])
self.estimator.grid_columns = grid_columns
self.estimator.grid_rows = grid_rows
clientsocket.sendall(b'OK\n')
except ValueError:
print("could not initialize grid!")
clientsocket.sendall(b'"could not initialize grid!\n'
b'expected: initialize_robot, <grid columns>, <grid rows>')
except IndexError:
print("could not initialize grid!")
clientsocket.sendall(b'"could not initialize grid!\n'
b'expected: initialize_robot, <grid columns>, <grid rows>')
else: # robot command
try:
robot_id = int(inputs[1])
except ValueError:
robot_id = None
print("could not read robot id!")
clientsocket.sendall(b'Could not read robot id!\n')
if cmd == 'initialize_robot':
# add a new robot to the game
try:
id = int(inputs[1])
ip = inputs[2].decode().strip()
x = int(inputs[3])
y = int(inputs[4])
orientation = inputs[5].decode().strip()
print(f"initializing new robot with id {id} and ip {ip} at pos ({x},{y}) with "
f"orientation '{orientation}'")
if id not in self.robot_ids:
new_robot = RoboRallyRobot(id=id, ip=ip, x=x, y=y, orientation=orientation)
new_robot.connect()
# store the new robot in the list of robots
self.robots.append(new_robot)
self.robot_ids[new_robot.id] = new_robot # this also means the estimator
# will track the new robot because
# it got a reference to the list of
# robot ids to keep an eye out for
else: # robot is already created -> just re-initialize its position
new_robot = self.robot_ids[id]
new_robot.grid_x = x
new_robot.grid_y = y
new_robot.orientation = orientation
if new_robot.connected:
print("created new robot and successfully connected to it!")
while not self.estimator.all_robots_detected(): # wait until the robot gets detected
pass
# drive the new robot to its starting position
self.grid_control(new_robot.id, 'nop')
clientsocket.sendall(b'OK\n')
else:
clientsocket.sendall(f"error: could not connect to new robot {new_robot}".encode())
except IndexError:
print("could not initialize a new robot")
clientsocket.sendall('could not initialize a new robot: invalid command format\n'
'expected: initialize_robot, <id>, <ip>, <x>, <y>, <orientation>\n'.encode())
except ValueError:
print("could not initialize a new robot")
clientsocket.sendall('could not initialize a new robot: invalid command format\n'
'expected: initialize_robot, <id>, <ip>, <x>, <y>, <orientation>\n'.encode())
elif robot_id in self.robot_ids:
if cmd == b'get position':
clientsocket.sendall(bytes('{}\n'.format(self.robot_ids[robot_id].grid_pos)))
elif cmd == b'set position':
try:
pos_data = ",".join(inputs[2:])
new_grid_pos = tuple(map(lambda x: int(x[1]) if x[0] < 2 else float(x[1]),
enumerate(pos_data.strip().strip('()').split(','))))
self.robot_ids[robot_id].grid_pos = new_grid_pos
clientsocket.sendall(b'OK\n')
except IndexError as e:
print("could not set grid position!")
clientsocket.sendall(bytes(
'could not set grid position! (invalid format)\n'.format(
self.robot_ids[robot_id].grid_pos)))
except ValueError as e:
print("could not set grid position!")
clientsocket.sendall(bytes(
'could not set grid position! (invalid format)\n'.format(
self.robot_ids[robot_id].grid_pos)))
else:
self.grid_control(robot_id, cmd)
clientsocket.sendall(b'OK\n')
else:
print("invalid robot id!")
clientsocket.sendall(b'Invalid robot id!\n')
else:
clientsocket.sendall(b'Invalid command!\n')
else: # len(inputs) <= 1
if b'quit' in inputs[0]:
clientsocket.close()
self.comm_socket.close()
connected = False
running = False
else:
print("could not process command!")
clientsocket.sendall(b'Could not process request!\n')
except RuntimeError:
print("disconnected")
connected = False
clientsocket.close()
def grid_control(self, robot_id, cmd):
robot = self.robot_ids[robot_id] # get robot to be controlled
print("robot grid pos before move: ", robot)
robot.move(cmd)
print("robot grid pos after move: ", robot)
target = self.estimator.get_pos_from_grid_point(robot.grid_x, robot.grid_y, robot.grid_orientation)
self.controller.move_to_pos(target, robot)
def main(args):
rc = RemoteController()
rc.run()
if __name__ == '__main__':
main(sys.argv)

37
remote_control/robot.py Normal file
View File

@ -0,0 +1,37 @@
import socket
class Robot:
def __init__(self, marker_id, ip):
self.id = marker_id
self.pos = None
self.euler = None
self.ip = ip
self.socket = socket.socket()
# currently active control
self.u1 = 0.0
self.u2 = 0.0
self.connected = False
def connect(self):
# connect to robot
try:
print("connecting to robot {} with ip {} ...".format(self.id, self.ip))
self.socket.connect((self.ip, 1234)) # connect to robot
print("connected!")
self.connected = True
except socket.error:
print("could not connect to robot {} with ip {}".format(self.id, self.ip))
def send_cmd(self, u1=0.0, u2=0.0):
if self.socket:
try:
self.socket.send(f'({u1},{u2})\n'.encode())
except BrokenPipeError:
print(f"error: connection to robot {self.id} with ip {self.ip} lost")
pass
except ConnectionResetError:
print(f"error: connection to robot {self.id} with ip {self.ip} lost")
pass