Compare commits

..

No commits in common. "a8722328801264f0434253401265a97f8f292806" and "c2fad136989e0fd013d7a607b76eef4a942d6aae" have entirely different histories.

13 changed files with 79 additions and 1387 deletions

View File

@ -10,10 +10,6 @@ m0.speed(5000)
""" """
import ustruct import ustruct
import utime
import math
sign = lambda x: math.copysign(1, x)
_STATE_BRAKE = const(0) _STATE_BRAKE = const(0)
@ -53,10 +49,6 @@ class Motor:
def speed(self, speed=None): def speed(self, speed=None):
if speed is None: if speed is None:
return self._speed return self._speed
if abs(speed) < 30 and abs(speed) > 10: # kick-start for small speeds
self.speed(int(100 * sign(speed)))
utime.sleep_ms(5)
if speed > 0: if speed > 0:
self._speed = min(10000, max(1, speed)) self._speed = min(10000, max(1, speed))
self._state = _STATE_RIGHT self._state = _STATE_RIGHT

View File

@ -2,7 +2,7 @@ import machine
import sys import sys
from machine import I2C, Pin from machine import I2C, Pin
i2c = False i2c = True
if i2c: if i2c:
import d1motor import d1motor
else: else:
@ -100,5 +100,5 @@ class Robot:
del comm_socket del comm_socket
del socket del socket
wall_e = Robot(i2c=i2c) wall_e = Robot()
wall_e.remote_control() wall_e.remote_control()

View File

@ -1,400 +0,0 @@
import pyrealsense2 as rs
import numpy as np
import cv2
import os
import time
from shapely.geometry import LineString
from queue import Queue
import aruco
class ArucoEstimator:
corner_marker_ids = {
'a': 0,
'b': 1,
'c': 2,
'd': 3
}
corner_estimates = {
'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
}
def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8):
self.grid_columns = grid_columns
self.grid_rows = grid_rows
if robot_marker_ids is None:
robot_marker_ids = []
self.robot_marker_ids = robot_marker_ids
self.robot_marker_estimates = dict([(marker_id, {'t': None, 'x': None, 'y': None, 'angle': None})
for marker_id in self.robot_marker_ids])
self.draw_grid = False
self.event_queue = Queue()
if use_realsense: # check if realsense camera is connected
# Configure depth and color streams
self.pipeline = rs.pipeline()
config = rs.config()
#config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30)
config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
# Start streaming
self.pipeline.start(config)
# disable auto exposure
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
color_sensor.set_option(rs.option.enable_auto_exposure, False)
camera_intrinsics = self.pipeline.get_active_profile().get_stream(
rs.stream.color).as_video_stream_profile().get_intrinsics()
self.camera_matrix = np.zeros((3, 3))
self.camera_matrix[0][0] = camera_intrinsics.fx
self.camera_matrix[1][1] = camera_intrinsics.fy
self.camera_matrix[0][2] = camera_intrinsics.ppx
self.camera_matrix[1][2] = camera_intrinsics.ppy
self.dist_coeffs = np.array(camera_intrinsics.coeffs)
# more info: https://dev.intelrealsense.com/docs/projection-in-intel-realsense-sdk-20
else:
# use other camera
self.cv_camera = cv2.VideoCapture(0)
self.pipeline = None
# create detector and get parameters
self.detector = aruco.MarkerDetector()
# self.detector.setDictionary('ARUCO_MIP_36h12')
#self.detector.setDictionary('ARUCO_MIP_16h3')
# self.detector.setDictionary('ARUCO')
#self.detector.setDetectionMode(aruco.DM_NORMAL, 0.05)
self.detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.05)
self.detector_params = self.detector.getParameters()
# print detector parameters
print("detector params:")
for val in dir(self.detector_params):
if not val.startswith("__"):
print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
self.camparam = aruco.CameraParameters()
if use_realsense:
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "realsense.yml"))
else:
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "dfk72_6mm_param2.yml"))
print(self.camparam)
def mouse_callback(self, event, px, py, flags, param):
"""
This function is called for each mouse event inside the opencv window.
If there are reference markers available we compute the real world position corresponding to the clicked
position and put it in an event queue.
:param event: type of event (mouse movement, left click, right click, etc.)
:param px: x-position of event
:param py: y-position of event
"""
if event == cv2.EVENT_LBUTTONDOWN:
if self.all_corners_detected():
# inter/extrapolate from clicked point to marker position
px1 = self.corner_estimates['a']['pixel_coordinate'][0]
px3 = self.corner_estimates['c']['pixel_coordinate'][0]
py1 = self.corner_estimates['a']['pixel_coordinate'][1]
py3 = self.corner_estimates['c']['pixel_coordinate'][1]
x1 = self.corner_estimates['a']['x']
x3 = self.corner_estimates['c']['x']
y1 = self.corner_estimates['a']['y']
y3 = self.corner_estimates['c']['y']
alpha = (px - px1) / (px3 - px1)
beta = (py - py1) / (py3 - py1)
print(f"alpha = {alpha}, beta = {beta}")
target_x = x1 + alpha * (x3 - x1)
target_y = y1 + beta * (y3 - y1)
target = np.array([target_x, target_y, 0])
print(f"target = ({target_x},{target_y})")
self.event_queue.put(('click', target))
else:
print("not all markers have been detected!")
def run_tracking(self, draw_markers=True, draw_marker_coordinate_system=False, invert_grayscale=False):
"""
Run the marker tracking in a loop
"""
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
cv2.setMouseCallback('RoboRally', self.mouse_callback)
try:
running = True
while running:
if self.pipeline:
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
if not color_frame:
continue
# Convert images to numpy arrays
color_image = np.asanyarray(color_frame.get_data())
else:
# Capture frame-by-frame
ret, color_image = self.cv_camera.read()
t_image = time.time()
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
if invert_grayscale:
cv2.bitwise_not(gray, gray)
# run aruco marker detection
detected_markers = self.detector.detect(gray)
# extract data for detected markers
detected_marker_data = {}
for marker in detected_markers:
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
if marker.id >= 0:
if marker.id in self.corner_marker_ids.values():
marker.calculateExtrinsics(0.1, self.camparam)
else:
marker.calculateExtrinsics(0.07, self.camparam)
detected_marker_data[marker.id]['Rvec'] = marker.Rvec
detected_marker_data[marker.id]['Tvec'] = marker.Tvec
if marker.id >= 0: # draw markers onto the image
if draw_markers:
marker.draw(color_image, np.array([255, 255, 255]), 2, True)
if draw_marker_coordinate_system:
aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
# store data
for marker_id, data in detected_marker_data.items():
self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image)
# draw grid
if self.draw_grid:
color_image = self.draw_grid_lines(color_image, detected_marker_data)
color_image = self.draw_robot_pos(color_image, detected_marker_data)
# Show images
cv2.imshow('RoboRally', color_image)
key = cv2.waitKey(1)
if key > 0:
self.event_queue.put(('key', key))
if key == ord('g'):
self.draw_grid = not self.draw_grid
if key == ord('q'):
running = False
if key == ord('a'):
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
if color_sensor.get_option(rs.option.enable_auto_exposure) == 1.0:
color_sensor.set_option(rs.option.enable_auto_exposure, False)
print("auto exposure OFF")
else:
color_sensor.set_option(rs.option.enable_auto_exposure, True)
print("auto exposure ON")
finally:
cv2.destroyAllWindows()
if self.pipeline is not None:
# Stop streaming
self.pipeline.stop()
def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image):
# update the marker estimate with new data
if marker_id in self.corner_marker_ids.values():
# for corner markers we compute the mean of all measurements s.t. the position stabilizes over time
# (we assume the markers do not move)
# get corresponding corner to the detected marker
corner = next(filter(lambda key: self.corner_marker_ids[key] == marker_id, self.corner_marker_ids.keys()))
old_estimate_x = self.corner_estimates[corner]['x']
old_estimate_y = self.corner_estimates[corner]['y']
n_estimates = self.corner_estimates[corner]['n_estimates']
x = tvec[0][0]
y = -tvec[1][0] # flip y coordinate
if not any([old_estimate_x is None, old_estimate_y is None]):
new_estimate_x = (n_estimates * old_estimate_x + x) / (n_estimates + 1) # weighted update
new_estimate_y = (n_estimates * old_estimate_y + y) / (n_estimates + 1)
else:
new_estimate_x = x # first estimate
new_estimate_y = y
self.corner_estimates[corner]['t'] = t_image
self.corner_estimates[corner]['x'] = new_estimate_x
self.corner_estimates[corner]['y'] = new_estimate_y
self.corner_estimates[corner]['n_estimates'] = n_estimates + 1
self.corner_estimates[corner]['pixel_coordinate'] = pixel_coord_center
elif marker_id in self.robot_marker_ids:
# for robot markers we extract x and y position as well as the angle
# here we could also implement a filter
x = tvec[0][0]
y = -tvec[1][0] # flip y coordinate
# compute angle
rot_mat, _ = cv2.Rodrigues(rvec)
pose_mat = cv2.hconcat((rot_mat, tvec))
_, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat)
angle = -euler_angles[2][0] * np.pi / 180.0
self.robot_marker_estimates[marker_id] = {'t': t_image, 'x': x, 'y': y, 'angle': angle}
def all_corners_detected(self):
# checks if all corner markers have been detected at least once
return not any([estimate['n_estimates'] == 0 for estimate in self.corner_estimates.values()])
def all_robots_detected(self):
# checks if all robot markers have been detected at least once
return not any([estimate['t'] is None for estimate in self.robot_marker_estimates.values()])
def get_pos_from_grid_point(self, x, y, orientation=None):
"""
returns the position for the given grid point based on the current corner estimates
:param x: x position on the grid ( 0 &le x &lt number of grid columns)
:param y: y position on the grid ( 0 &le x &lt number of grid rows)
:param orientation: (optional) orientation in the given grid cell (one of ^, >, v, &lt )
:return: numpy array with corresponding real world x- and y-position
if orientation was specified the array also contains the matching angle for the orientation
"""
assert 0 <= x < self.grid_columns
assert 0 <= y < self.grid_rows
assert self.all_corners_detected()
# compute column line
a = np.array([self.corner_estimates['a']['x'], self.corner_estimates['a']['y']])
b = np.array([self.corner_estimates['b']['x'], self.corner_estimates['b']['y']])
c = np.array([self.corner_estimates['c']['x'], self.corner_estimates['c']['y']])
d = np.array([self.corner_estimates['d']['x'], self.corner_estimates['d']['y']])
x_frac = (x + 0.5) / self.grid_columns
y_frac = (y + 0.5) / self.grid_rows
vab = b - a
vdc = c - d
column_line_top = a + x_frac * vab
column_line_bottom = d + x_frac * vdc
vad = d - a
vbc = c - b
row_line_top = a + y_frac * vad
row_line_bottom = b + y_frac * vbc
column_line = LineString([column_line_top, column_line_bottom])
row_line = LineString([row_line_top, row_line_bottom])
int_pt = column_line.intersection(row_line)
point_of_intersection = np.array([int_pt.x, int_pt.y])
if orientation is not None:
# compute angle corresponding to the orientation w.r.t. the grid
# TODO: test this code
angle_ab = np.arctan2(vab[1], vab[0])
angle_dc = np.arctan2(vdc[1], vdc[0])
angle_ad = np.arctan2(vad[1], vad[0])
angle_bc = np.arctan2(vbc[1], vbc[0])
angle = 0.0
if orientation == '>':
angle = y_frac * angle_ab + (1 - y_frac) * angle_dc
elif orientation == '<':
angle = y_frac * angle_ab + (1 - y_frac) * angle_dc + np.pi
elif orientation == 'v':
angle = x_frac * angle_ad + (1 - x_frac) * angle_bc
elif orientation == '^':
angle = x_frac * angle_ad + (1 - x_frac) * angle_bc + np.pi
return np.array((point_of_intersection[0], point_of_intersection[1], angle))
else:
return point_of_intersection
def get_grid_point_from_pos(self):
# TODO return the nearest grid point for the given position estimate
pass
def print_corner_estimates(self):
for key, value in self.corner_estimates.items():
if value['n_estimates'] != 0:
print(f"corner marker {key} at pos ({value['x']},{value['y']})")
def draw_corner_line(self, frame, corner_1, corner_2):
# draws a line between the given markers onto the given frame
corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate']
corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate']
if corner_1_center is not None and corner_2_center is not None:
frame = cv2.line(frame, tuple(corner_1_center), tuple(corner_2_center), color=(0, 0, 255), thickness=2)
return frame
def draw_grid_lines(self, frame, detected_marker_data):
# draws a grid onto the given frame
frame = self.draw_corner_line(frame, 'a', 'b')
frame = self.draw_corner_line(frame, 'b', 'c')
frame = self.draw_corner_line(frame, 'c', 'd')
frame = self.draw_corner_line(frame, 'd', 'a')
if not any([estimate['pixel_coordinate'] is None for estimate in self.corner_estimates.values()]):
# compute outlines of board
a = self.corner_estimates['a']['pixel_coordinate']
b = self.corner_estimates['b']['pixel_coordinate']
c = self.corner_estimates['c']['pixel_coordinate']
d = self.corner_estimates['d']['pixel_coordinate']
# draw vertical lines
vab = b - a
vdc = c - d
for x in range(1, self.grid_columns):
column_line_top = a + x / self.grid_columns * vab
column_line_bottom = d + x / self.grid_columns * vdc
frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(0, 255, 0),
thickness=1)
# draw horizontal lines
vad = d - a
vbc = c - b
for y in range(1, self.grid_rows):
row_line_top = a + y / self.grid_rows * vad
row_line_bottom = b + y / self.grid_rows * vbc
frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(0, 255, 0),
thickness=1)
return frame
def get_robot_state_estimate(self, marker_id):
if marker_id in self.robot_marker_estimates:
if self.robot_marker_estimates[marker_id]['t'] is not None:
return np.array([self.robot_marker_estimates[marker_id]['x'],
self.robot_marker_estimates[marker_id]['y'],
self.robot_marker_estimates[marker_id]['angle']])
else:
print(f"error: no estimate available for robot {marker_id}")
return None
else:
print(f"error: invalid robot id {marker_id}")
return None
def draw_robot_pos(self, frame, detected_marker_data):
# draws information about the robot positions onto the given frame
robot_corners_pixel_coords = {}
for marker_id, estimate in self.robot_marker_estimates.items():
if marker_id in detected_marker_data.keys():
robot_corners_pixel_coords[marker_id] = tuple(detected_marker_data[marker_id]['marker_center'])
for marker_id, coord in robot_corners_pixel_coords.items():
x = self.robot_marker_estimates[marker_id]['x']
y = self.robot_marker_estimates[marker_id]['y']
angle = self.robot_marker_estimates[marker_id]['angle']
#frame = cv2.putText(frame, "pos = ({:5.3f}, {:5.3f}), ang = {:5.3f}".format(x, y, angle), coord,
# cv2.FONT_HERSHEY_SIMPLEX, 0.50, (0, 255, 0))
frame = cv2.putText(frame, f"{marker_id}", coord, cv2.FONT_HERSHEY_PLAIN, 2, (0, 255, 255), thickness=4)
return frame
if __name__ == "__main__":
estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14])
estimator.run_tracking(draw_markers=True, invert_grayscale=True)

View File

@ -3,9 +3,6 @@ import time
# look at: https://github.com/casadi/casadi/blob/master/docs/examples/python/vdp_indirect_multiple_shooting.py # look at: https://github.com/casadi/casadi/blob/master/docs/examples/python/vdp_indirect_multiple_shooting.py
# TODO for roborally
# implement essential movements: forward, backward, turn left, turn right on grid-based level
class OpenLoopSolver: class OpenLoopSolver:
def __init__(self, N=20, T=4.0): def __init__(self, N=20, T=4.0):
self.T = T self.T = T
@ -43,8 +40,7 @@ class OpenLoopSolver:
# cost functional # cost functional
target = (-0.0, 0.0) target = (-0.0, 0.0)
L = (x-target[0]) ** 2 + (y-target[1]) ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2) L = (x-target[0]) ** 2 + (y-target[1]) ** 2 + 1e-2 * theta ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
#L = (x-target[0]) ** 2 + (y-target[1]) ** 2 + 1e-2 * theta ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
# Fixed step Runge-Kutta 4 integrator # Fixed step Runge-Kutta 4 integrator
M = 4 # RK4 steps per interval M = 4 # RK4 steps per interval
@ -142,12 +138,7 @@ class OpenLoopSolver:
#return #return
def solve(self, x0, target): def solve(self, x0, target, obstacles, track):
angles_unwrapped = np.unwrap([x0[2], target[2]]) # unwrap angle to avoid jump in data
x0[2] = angles_unwrapped[0]
target[2] = angles_unwrapped[1]
tstart = time.time() tstart = time.time()
# alternative solution using multiple shooting (way faster!) # alternative solution using multiple shooting (way faster!)
self.opti = Opti() # Optimization problem self.opti = Opti() # Optimization problem
@ -195,11 +186,7 @@ class OpenLoopSolver:
theta)) * omegal * omega_max theta)) * omegal * omega_max
f3 = -(r / (2 * R) * omegar - r / (2 * R) * omegal) * omega_max f3 = -(r / (2 * R) * omegar - r / (2 * R) * omegal) * omega_max
xdot = vertcat(f1, f2, f3) xdot = vertcat(f1, f2, f3)
L = (x - target[0]) ** 2 + (y - target[1]) ** 2 + 1e-2 * (theta - target[2]) ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
#L = 100 * (x - target[0]) ** 2 + 100 * (y - target[1]) ** 2 + 1e-1 * (theta - target[2])**2 + 1e-2 * (omegar ** 2 + omegal ** 2)
L = 100 * (x - target[0]) ** 2 + 100 * (y - target[1]) ** 2 + 1e-1 * (target[2] - theta) ** 2 + 1e-2 * (
omegar ** 2 + omegal ** 2)
#L = (x - target[0]) ** 2 + (y - target[1]) ** 2 + 1e-2 * (theta - target[2]) ** 2 + 1e-2 * (omegar ** 2 + omegal ** 2)
self.f = Function('f', [state, control], [xdot, L]) self.f = Function('f', [state, control], [xdot, L])
# ---- solve NLP ------ # ---- solve NLP ------
@ -235,13 +222,31 @@ class OpenLoopSolver:
self.opti.subject_to(self.Q[:, 0] == 0.0) self.opti.subject_to(self.Q[:, 0] == 0.0)
solver = self.opti.solver("ipopt", {'print_time': False}, {"print_level": 0}) solver = self.opti.solver("ipopt", {}, {"print_level": 0})
# ---- misc. constraints ---------- # ---- misc. constraints ----------
# self.opti.subject_to(X[1,:]>=0) # Time must be positive # self.opti.subject_to(X[1,:]>=0) # Time must be positive
# self.opti.subject_to(X[2,:]<=4) # Time must be positive # self.opti.subject_to(X[2,:]<=4) # Time must be positive
# self.opti.subject_to(X[2,:]>=-2) # Time must be positive # self.opti.subject_to(X[2,:]>=-2) # Time must be positive
# avoid obstacle
for o in obstacles:
p = obstacles[o].pos
r = obstacles[o].radius
if p is not None:
for k in range(1,self.N):
self.opti.subject_to((self.X[0,k]-p[0])**2 + (self.X[1,k]-p[1])**2 + self.slack > r**2)
# keep inside track
# TODO
# track_ids = track.inner.keys()
# a = track.outer[track_ids[0]]
# b = track.outer[track_ids[1]]
# c = track.outer[track_ids[2]]
# d = track.outer[track_ids[3]]
#
# for k in range(1, self.N):
# self.opti.subject_to(self.opti.subject_to(self.X))
posx = self.X[0, :] posx = self.X[0, :]
posy = self.X[1, :] posy = self.X[1, :]
angle = self.X[2, :] angle = self.X[2, :]
@ -251,7 +256,7 @@ class OpenLoopSolver:
self.opti.subject_to(angle[0] == x0[2]) # finish line at position 1 self.opti.subject_to(angle[0] == x0[2]) # finish line at position 1
tend = time.time() tend = time.time()
#print("setting up problem took {} seconds".format(tend - tstart)) print("setting up problem took {} seconds".format(tend - tstart))
tstart = time.time() tstart = time.time()
if self.use_warmstart and self.opti_x0 is not None: if self.use_warmstart and self.opti_x0 is not None:
@ -262,7 +267,7 @@ class OpenLoopSolver:
print("could not set warmstart") print("could not set warmstart")
sol = self.opti.solve() # actual solve sol = self.opti.solve() # actual solve
tend = time.time() tend = time.time()
#print("solving the problem took {} seconds".format(tend - tstart)) print("solving the problem took {} seconds".format(tend - tstart))
tstart = time.time() tstart = time.time()
self.opti_x0 = sol.value(self.opti.x) self.opti_x0 = sol.value(self.opti.x)
@ -273,7 +278,7 @@ class OpenLoopSolver:
u_opt_1 = sol.value(self.U[0,:]) u_opt_1 = sol.value(self.U[0,:])
u_opt_2 = sol.value(self.U[1,:]) u_opt_2 = sol.value(self.U[1,:])
tend = time.time() tend = time.time()
#print("postprocessing took {} seconds".format(tend - tstart)) print("postprocessing took {} seconds".format(tend - tstart))
return (u_opt_1, u_opt_2, sol.value(posx), sol.value(posy)) return (u_opt_1, u_opt_2, sol.value(posx), sol.value(posy))

View File

@ -28,27 +28,27 @@ while running:
if event.key == pygame.K_LEFT: if event.key == pygame.K_LEFT:
u1 = -vmax u1 = -vmax
u2 = vmax u2 = vmax
print(f"turn left: ({u1},{u2})") print("turn left: ({},{})".format(u1, u2))
elif event.key == pygame.K_RIGHT: elif event.key == pygame.K_RIGHT:
u1 = vmax u1 = vmax
u2 = -vmax u2 = -vmax
print(f"turn right: ({u1},{u2})") print("turn right: ({},{})".format(u1, u2))
elif event.key == pygame.K_UP: elif event.key == pygame.K_UP:
u1 = vmax u1 = vmax
u2 = vmax u2 = vmax
print(f"forward: ({u1},{u2})") print("forward: ({},{})".format(u1, u2))
elif event.key == pygame.K_DOWN: elif event.key == pygame.K_DOWN:
u1 = -vmax u1 = -vmax
u2 = -vmax u2 = -vmax
print(f"backward: ({u1},{u2})") print("backward: ({},{})".format(u1, u2))
elif event.key == pygame.K_ESCAPE: elif event.key == pygame.K_ESCAPE:
print("quit") print("quit")
running = False running = False
u1 = 0.0 u1 = 0.0
u2 = 0.0 u2 = 0.0
rc_socket.send(f'({u1},{u2})\n'.encode()) rc_socket.send('({},{})\n'.format(u1, u2))
elif event.type == pygame.KEYUP: elif event.type == pygame.KEYUP:
print("key released, resetting: ({},{})".format(u1, u2)) print("key released, resetting: ({},{})".format(u1, u2))
rc_socket.send(f'({u1},{u2})\n'.encode()) rc_socket.send('({},{})\n'.format(u1, u2))

View File

@ -1,13 +0,0 @@
import socket
HOST, PORT = "localhost", 42424
robot_id = 15
# SOCK_DGRAM is the socket type to use for UDP sockets
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(f"{robot_id}\n".encode(), (HOST, PORT))
while True:
received = str(sock.recv(1024), "utf-8")
print("Received: {}".format(received))

View File

@ -1,46 +0,0 @@
import socketserver
import threading
import time
from aruco_estimator import ArucoEstimator
class MeasurementHandler(socketserver.BaseRequestHandler):
def handle(self) -> None:
data = self.request[0]
socket = self.request[1]
cur_thread = threading.current_thread()
print(f"current thread {cur_thread}")
try:
marker_id = int(data)
if marker_id in self.server.estimator.robot_marker_estimates:
while True:
socket.sendto(f"{self.server.estimator.robot_marker_estimates[marker_id]}\n".encode(),
self.client_address)
time.sleep(1.0 / self.server.max_measurements_per_second)
else:
socket.sendto("error: unknown robot marker id\n".encode(),
self.client_address)
except ValueError:
socket.sendto("error: data not understood. expected robot marker id (int)\n".encode(), self.client_address)
return
class MeasurementServer(socketserver.ThreadingMixIn, socketserver.UDPServer):
def __init__(self, server_address, RequestHandlerClass, estimator, max_measurements_per_second=30):
super().__init__(server_address, RequestHandlerClass)
self.estimator = estimator
self.max_measurements_per_second = max_measurements_per_second
if __name__ == "__main__":
aruco_estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[15])
estimator_thread = threading.Thread(target=aruco_estimator.run_tracking)
estimator_thread.start()
with MeasurementServer(('127.0.0.1', 42424), MeasurementHandler, aruco_estimator,
max_measurements_per_second=30) as measurement_server:
measurement_server.serve_forever()
# receive with: nc 127.0.0.1 42424 -u -> 15 + Enter

View File

@ -1,175 +0,0 @@
import numpy as np
import time
from casadi_opt import OpenLoopSolver
class MPCController:
def __init__(self, estimator):
self.t = time.time()
self.estimator = estimator
self.controlling = False
self.mstep = 2
self.ols = OpenLoopSolver(N=20, T=1.0)
self.ols.setup()
self.dt = self.ols.T / self.ols.N
# integrator
self.omega_max = 5.0
self.control_scaling = 0.4
def move_to_pos(self, target_pos, robot, near_target_counter=5):
near_target = 0
while near_target < near_target_counter:
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
pass
elif event[0] == 'key':
key = event[1]
if key == 84: # arrow up
self.controlling = True
self.t = time.time()
elif key == 82: # arrow down
self.controlling = False
robot.send_cmd()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0])
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 113:
print("quit!")
self.controlling = False
robot.send_cmd()
return
elif key == 27: # escape
print("quit!")
self.controlling = False
robot.send_cmd()
return
x_pred = self.get_measurement(robot.id)
if x_pred is not None:
error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
# print("error pos = ", error_pos)
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
# if error_pos > 0.075 or error_ang > 0.35:
if error_pos > 0.05 or error_ang > 0.1:
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
# us1 = res[0]
# us2 = res[1]
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
# print("u = {}", (us1, us2))
# print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
# print("sleeping for {} seconds...".format(self.dt - dt_mpc))
time.sleep(self.dt - dt_mpc)
else:
us1 = [0] * self.mstep
us2 = [0] * self.mstep
near_target += 1
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
else:
print("robot not detected yet!")
def interactive_control(self, robots):
controlled_robot_number = 0
robot = robots[controlled_robot_number]
target_pos = np.array([0.0, 0.0, 0.0])
running = True
while running:
# handle events from opencv window
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
target_pos = event[1]
elif event[0] == 'key':
key = event[1]
if key == 32: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
robot.send_cmd() # stop robot
else:
print("enable control")
self.t = time.time()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 9: # TAB
# switch controlled robot
robot.send_cmd() # stop current robot
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
robot = robots[controlled_robot_number]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
self.controlling = False
robot.send_cmd()
return
if self.controlling:
# measure state
x_pred = self.get_measurement(robot.id)
# print(np.linalg.norm(x_pred-target_pos))
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
time.sleep(self.dt - dt_mpc)
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
def get_measurement(self, robot_id):
return self.estimator.get_robot_state_estimate(robot_id)

View File

@ -1,55 +0,0 @@
import sys
import socket
import threading
import time
from robot import Robot
from mpc_controller import MPCController
from aruco_estimator import ArucoEstimator
class RemoteController:
def __init__(self):
self.robots = []
#self.robots = [Robot(11, '192.168.1.11'), Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
# socket for movement commands
self.comm_socket = socket.socket()
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
for robot in self.robots:
robot.connect()
self.comm_socket.bind(('', 1337))
self.comm_socket.listen(5)
self.t = time.time()
# start thread for marker position detection
self.estimator = ArucoEstimator(self.robot_ids.keys())
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking)
self.estimator_thread.start()
self.controller = MPCController(self.estimator)
def run(self):
print("waiting until all markers are detected...")
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
pass
print("starting control")
self.controller.interactive_control(robots=self.robots)
def main(args):
rc = RemoteController()
rc.run()
if __name__ == '__main__':
main(sys.argv)

View File

@ -1,267 +0,0 @@
import numpy as np
import math
import time
class PIDController:
def __init__(self, estimator):
self.t = time.time()
self.estimator = estimator
self.controlling = False
self.P_angle = 0.4 # PID parameters determined by Ziegler-Nichols. measured K_crit = 1.4, T_crit = 1.5
self.I_angle = 0.35
self.D_angle = 0.1
self.P_pos = 0.50
self.I_pos = 0.3
self.D_pos = 0.1
self.mode = None
def move_to_pos(self, target_pos, robot, near_target_counter=5):
near_target = 0
while near_target < near_target_counter:
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
pass
elif event[0] == 'key':
key = event[1]
if key == 84: # arrow up
self.controlling = True
self.t = time.time()
elif key == 82: # arrow down
self.controlling = False
robot.send_cmd()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0])
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 113:
print("quit!")
self.controlling = False
robot.send_cmd()
return
elif key == 27: # escape
print("quit!")
self.controlling = False
robot.send_cmd()
return
x_pred = self.get_measurement(robot.id)
if x_pred is not None:
error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
# print("error pos = ", error_pos)
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
# if error_pos > 0.075 or error_ang > 0.35:
if error_pos > 0.05 or error_ang > 0.1:
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
# us1 = res[0]
# us2 = res[1]
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
# print("u = {}", (us1, us2))
# print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
# print("sleeping for {} seconds...".format(self.dt - dt_mpc))
time.sleep(self.dt - dt_mpc)
else:
us1 = [0] * self.mstep
us2 = [0] * self.mstep
near_target += 1
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
else:
print("robot not detected yet!")
def interactive_control(self, robots):
controlled_robot_number = 0
robot = robots[controlled_robot_number]
ts = []
angles = []
target_pos = np.array([0.0, 0.0, 0.0])
e_angle_old = 0.0
e_pos_old = 0.0
i = 0.0
i_angle = 0.0
i_pos = 0.0
t0 = time.time()
running = True
while running:
# handle events from opencv window
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
target_pos = event[1]
i_angle = 0
i_pos = 0
e_pos_old = 0
e_angle_old = 0
self.mode = 'combined'
elif event[0] == 'key':
key = event[1]
if key == 32: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
robot.send_cmd() # stop robot
self.mode = None # reset control mode
else:
print("enable control")
i = 0
self.t = time.time()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
elif key == 97: # a
self.mode = 'angle'
e_angle_old = 0
i = 0
self.t = time.time()
elif key == 99: # c
self.mode = 'combined'
e_angle_old = 0
e_pos_old = 0
i_angle = 0
i_pos = 0
self.t = time.time()
elif key == 112: # p
self.mode = 'position'
e_pos_old = 0
i = 0
self.t = time.time()
elif key == 43: # +
self.P_pos += 0.1
print("P = ", self.P_angle)
elif key == 45: # -
self.P_pos -= 0.1
print("P = ", self.P_angle)
elif key == 9: # TAB
# switch controlled robot
robot.send_cmd() # stop current robot
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
robot = robots[controlled_robot_number]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
self.controlling = False
robot.send_cmd()
return
if self.controlling:
# measure state
x_pred = self.get_measurement(robot.id)
dt = self.t - time.time()
#print(f"x_pred = {x_pred}\ntarget = {target_pos}\nerror = {np.linalg.norm(target_pos - x_pred)}\n")
if self.mode == 'angle':
# compute angle such that robot faces to target point
target_angle = target_pos[2]
ts.append(time.time() - t0)
angles.append(x_pred[2])
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
p = self.P_angle * e_angle
# i += self.I * dt * e # right Riemann sum
i += self.I_angle * dt * (e_angle + e_angle_old)/2.0 # trapezoidal rule
d = self.D_angle * (e_angle - e_angle_old)/dt
u1 = p - i - d
u2 = - u1
e_angle_old = e_angle
elif self.mode == 'combined':
# compute angle such that robot faces to target point
v = target_pos[0:2] - x_pred[0:2]
target_angle = math.atan2(v[1], v[0])
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
e_pos = np.linalg.norm(v)
if e_pos < 0.05:
self.mode = 'angle'
e_angle_old = 0
e_pos_old = 0
i_angle = 0
i_pos = 0
u1 = 0
u2 = 0
else:
forward = abs(e_angle) < np.pi/2.0
if not forward:
if e_angle > np.pi/2.0:
e_angle -= np.pi
else:
e_angle += np.pi
p_angle = self.P_angle * e_angle
i_angle += self.I_angle * dt * (e_angle + e_angle_old) / 2.0 # trapezoidal rule
d_angle = self.D_angle * (e_angle - e_angle_old) / dt
p_pos = self.P_pos * e_pos
i_pos += self.I_pos * dt * (e_pos + e_pos_old) / 2.0 # trapezoidal rule
d_pos = self.D_pos * (e_pos - e_pos_old) / dt
if forward:
print("forward")
u1 = p_angle + p_pos - i_angle - i_pos - d_angle - d_pos
u2 = - p_angle + p_pos + i_angle - i_pos + d_angle - d_pos
else:
print("backward")
u1 = p_angle - p_pos - i_angle + i_pos - d_angle + d_pos
u2 = - p_angle - p_pos + i_angle + i_pos + d_angle + d_pos
e_pos_old = e_pos
e_angle_old = e_angle
else:
u1 = 0.0
u2 = 0.0
#print(f"u = ({u1}, {u2})")
robot.send_cmd(u1, u2)
self.t = time.time() # save time when the most recent control was applied
time.sleep(0.1)
def get_measurement(self, robot_id):
return self.estimator.get_robot_state_estimate(robot_id)

View File

@ -30,12 +30,8 @@ from marker_pos_angle.msg import id_pos_angle
from collections import OrderedDict from collections import OrderedDict
from argparse import ArgumentParser
class Robot: class Robot:
def __init__(self, id, ip): def __init__(self, id, ip=None):
self.pos = None self.pos = None
self.orient = None self.orient = None
@ -56,16 +52,16 @@ class Track:
def __init__(self): def __init__(self):
# ids in clockwise direction # ids in clockwise direction
self.inner = OrderedDict() self.inner = OrderedDict()
self.inner[0] = None
self.inner[1] = None self.inner[1] = None
self.inner[2] = None self.inner[2] = None
self.inner[3] = None self.inner[3] = None
self.inner[4] = None
self.outer = OrderedDict() self.outer = OrderedDict()
self.outer[4] = None
self.outer[5] = None self.outer[5] = None
self.outer[6] = None self.outer[6] = None
self.outer[7] = None self.outer[7] = None
self.outer[8] = None
self.track_complete = False self.track_complete = False
@ -126,18 +122,20 @@ def f_ode(t, x, u):
return dx return dx
class RemoteController: class RemoteController:
def __init__(self, id, ip): def __init__(self):
self.anim_stopped = False self.robots = [Robot(11, '192.168.1.103')]
#self.robots = [Robot(14, '192.168.1.102')]
#self.robots = [Robot(14, '192.168.1.103')]
#self.robots = [Robot(15, '192.168.1.102')]
self.robots = [Robot(id, ip)]
self.robot_ids = {} self.robot_ids = {}
for r in self.robots: for r in self.robots:
self.robot_ids[r.id] = r self.robot_ids[r.id] = r
obst = [Obstacle(12, 0.275), Obstacle(10, 0.275), Obstacle(14, 0.275), Obstacle(15, 0.275)]
self.obstacles = {}
for r in obst:
self.obstacles[r.id] = r
self.track = Track() self.track = Track()
@ -200,6 +198,13 @@ class RemoteController:
self.track_line_inner, = self.ax.plot([], []) self.track_line_inner, = self.ax.plot([], [])
self.track_line_outer, = self.ax.plot([], []) self.track_line_outer, = self.ax.plot([], [])
self.circles = []
for o in self.obstacles:
self.circles.append(patch.Circle((0.0, 0.0), radius=0.1, fill=False, color='red', linestyle='--'))
for s in self.circles:
self.ax.add_artist(s)
self.ax.set_xlabel('x-position') self.ax.set_xlabel('x-position')
self.ax.set_ylabel('y-position') self.ax.set_ylabel('y-position')
self.ax.grid() self.ax.grid()
@ -226,7 +231,6 @@ class RemoteController:
def ani(self): def ani(self):
print("starting animation") print("starting animation")
self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True) self.ani = anim.FuncAnimation(self.fig, init_func=self.ani_init, func=self.ani_update, interval=10, blit=True)
plt.ion() plt.ion()
@ -247,13 +251,10 @@ class RemoteController:
self.track_line_outer.set_data(self.track.outer_poly.exterior.xy) self.track_line_outer.set_data(self.track.outer_poly.exterior.xy)
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol,\ return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol,\
self.track_line_inner, self.track_line_outer, self.line_x,self.line_y, self.track_line_inner, self.track_line_outer, self.line_x,self.line_y, self.circles[0], self.circles[1],\
self.circles[2], self.circles[3],
def ani_update(self, frame): def ani_update(self, frame):
if self.anim_stopped:
self.ani.event_source.stop()
sys.exit(0)
#print("plotting") #print("plotting")
self.mutex.acquire() self.mutex.acquire()
try: try:
@ -305,11 +306,21 @@ class RemoteController:
else: else:
self.line_ol.set_data([],[]) self.line_ol.set_data([],[])
i = 0
obst_keys = self.obstacles.keys()
for s in self.circles:
o = self.obstacles[obst_keys[i]]
i = i + 1
if o.pos is not None:
s.center = o.pos
s.radius = o.radius
finally: finally:
self.mutex.release() self.mutex.release()
return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol, self.track_line_inner, self.track_line_outer,\ return self.line, self.line_sim, self.dirm, self.dirs, self.line_ol, self.track_line_inner, self.track_line_outer,\
self.line_x, self.line_y, self.line_x, self.line_y, self.circles[0], self.circles[1],self.circles[2],self.circles[3],
def measurement_callback(self, data): def measurement_callback(self, data):
#print(data) #print(data)
@ -341,6 +352,11 @@ class RemoteController:
finally: finally:
self.mutex.release() self.mutex.release()
# detect obstacles
if data.id in self.obstacles.keys():
obst = (data.x, data.y)
self.obstacles[data.id].pos = obst
# detect track # detect track
if data.id in self.track.inner.keys() or data.id in self.track.outer.keys(): if data.id in self.track.inner.keys() or data.id in self.track.outer.keys():
self.track.set_id(data) self.track.set_id(data)
@ -353,8 +369,7 @@ class RemoteController:
markers_out = self.track.outer.values() markers_out = self.track.outer.values()
# find targets: # find targets:
# generate waypoints lamb = 0.2
lamb = 0.5
j = 0 j = 0
for i in range(0,4): for i in range(0,4):
p = np.array(markers_in[i]) + lamb * (np.array(markers_out[i]) - np.array(markers_in[i])) p = np.array(markers_in[i]) + lamb * (np.array(markers_out[i]) - np.array(markers_in[i]))
@ -367,18 +382,9 @@ class RemoteController:
targets[j] = (mean[0], mean[1], 0.0) targets[j] = (mean[0], mean[1], 0.0)
j += 1 j += 1
# final connection between first and last marker
mean_in = (np.array(markers_in[3]) + np.array(markers_in[0])) * 0.5
mean_out = (np.array(markers_out[3]) + np.array(markers_out[0])) * 0.5
mean = mean_in + (1.0 - lamb) * (mean_out - mean_in)
targets[j] = (mean[0], mean[1], 0.0)
print("targets = {}".format(targets.keys()))
auto_control = False auto_control = False
current_target = 0 current_target = 0
control_scaling = 0.3
while True: while True:
# open loop controller # open loop controller
@ -412,32 +418,12 @@ class RemoteController:
self.target = targets[5] self.target = targets[5]
elif event.key == pygame.K_7: elif event.key == pygame.K_7:
self.target = targets[6] self.target = targets[6]
elif event.key == pygame.K_8:
self.target = targets[7]
elif event.key == pygame.K_SPACE: elif event.key == pygame.K_SPACE:
auto_control = not auto_control auto_control = not auto_control
if auto_control: if auto_control:
self.target = targets[current_target] self.target = targets[current_target]
elif event.key == pygame.K_PLUS:
control_scaling += 0.1
control_scaling = min(control_scaling, 1.0)
elif event.key == pygame.K_MINUS:
control_scaling -= 0.1
control_scaling = max(control_scaling, 0.3)
elif event.key == pygame.K_ESCAPE:
print("quit!")
self.controlling = False
if self.rc_socket:
self.rc_socket.send('(0.0,0.0)\n')
self.anim_stopped = True
return
elif event.key == pygame.QUIT: elif event.key == pygame.QUIT:
print("quit!") sys.exit(0)
self.controlling = False
if self.rc_socket:
self.rc_socket.send('(0.0,0.0)\n')
self.anim_stopped = True
return
if self.controlling: if self.controlling:
@ -446,20 +432,17 @@ class RemoteController:
if auto_control: if auto_control:
if np.linalg.norm(x_pred[0:2]-np.array(self.target[0:2])) < 0.3: if np.linalg.norm(x_pred[0:2]-np.array(self.target[0:2])) < 0.3:
print("close to target!") print("close to target!")
current_target = (current_target + 1) % 8 current_target = (current_target + 1) % 7
self.target = targets[current_target] self.target = targets[current_target]
print("new target = {}".format(current_target)) print("new target = {}".format(current_target))
tmpc_start = time.time() tmpc_start = time.time()
# solve mpc open loop problem # solve mpc open loop problem
res = self.ols.solve(x_pred, self.target, [], self.track) res = self.ols.solve(x_pred, self.target, self.obstacles, self.track)
#us1 = res[0] us1 = res[0]
#us2 = res[1] us2 = res[1]
us1 = res[0] * control_scaling
us2 = res[1] * control_scaling
print("u = {}", (us1, us2))
# save open loop trajectories for plotting # save open loop trajectories for plotting
self.mutex.acquire() self.mutex.acquire()
@ -510,18 +493,9 @@ class RemoteController:
def main(args): def main(args):
parser = ArgumentParser()
parser.add_argument('id', metavar='id', type=str, help='marker id of the controlled robot')
parser.add_argument('ip', metavar='ip', type=str, help='ip address of the controlled robot')
args = parser.parse_args()
marker_id = int(args.id)
ip = args.ip
rospy.init_node('controller_node', anonymous=True) rospy.init_node('controller_node', anonymous=True)
rc = RemoteController(marker_id, ip) rc = RemoteController()
pygame.init() pygame.init()
@ -534,8 +508,7 @@ def main(args):
pass pass
#threading.Thread(target=rc.input_handling).start() #threading.Thread(target=rc.input_handling).start()
controller_thread = threading.Thread(target=rc.controller) threading.Thread(target=rc.controller).start()
controller_thread.start()
rc.ani() rc.ani()

View File

@ -1,285 +0,0 @@
import sys
import socket
import threading
import time
from mpc_controller import MPCController
from robot import Robot
from aruco_estimator import ArucoEstimator
MSGLEN = 64
def myreceive(sock):
chunks = []
bytes_recd = 0
while bytes_recd < MSGLEN:
chunk = sock.recv(1)
if chunk == b'':
raise RuntimeError("socket connection broken")
chunks.append(chunk)
bytes_recd = bytes_recd + len(chunk)
if chunks[-1] == b'\n':
break
return b''.join(chunks)
class RoboRallyRobot(Robot):
# dictionary mapping the current orientation and a turn command to the resulting orientation
resulting_orientation = {
'^': {'turn left': '<', 'turn right': '>', 'turn around': 'v'},
'>': {'turn left': '^', 'turn right': 'v', 'turn around': '<'},
'v': {'turn left': '>', 'turn right': '<', 'turn around': '^'},
'<': {'turn left': 'v', 'turn right': '^', 'turn around': '>'},
}
# dictionary mapping an orientation to its opposite
opposites = {'^': 'v', '>': '<', 'v': '^', '<': '>'}
def __init__(self, id, ip, x, y, orientation):
super().__init__(id, ip)
self.grid_x = x
self.grid_y = y
self.grid_orientation = orientation
def get_neighbor_coordinates(self, direction):
# get the coordinates of the neighboring tile in the given direction
if direction == '^':
return self.grid_x, self.grid_y - 1
elif direction == '>':
return self.grid_x + 1, self.grid_y
elif direction == 'v':
return self.grid_x, self.grid_y + 1
elif direction == '<':
return self.grid_x - 1, self.grid_y
else:
print("error: unknown direction")
sys.exit(1)
def move(self, move_type):
if move_type == 'forward':
target_tile = self.get_neighbor_coordinates(self.grid_orientation)
self.grid_x = target_tile[0]
self.grid_y = target_tile[1]
elif move_type == 'backward':
opposite_orientation = RoboRallyRobot.opposites[self.grid_orientation]
target_tile = self.get_neighbor_coordinates(opposite_orientation)
self.grid_x = target_tile[0]
self.grid_y = target_tile[1]
elif 'turn' in move_type:
self.grid_orientation = RoboRallyRobot.resulting_orientation[self.grid_orientation][move_type]
elif 'nop' in move_type:
pass # nop command -> robot grid position does not change (used e.g. for driving the robot to initial
# position)
else:
print(f"error: invalid move: {move_type}")
sys.exit(1)
def __str__(self):
return self.__repr__()
def __repr__(self):
return f"grid x: {self.grid_x}, grid y: {self.grid_y}, grid orientation: {self.grid_orientation}"
class RemoteController:
valid_cmds = ['forward', 'backward', 'turn left', 'turn right', 'turn around', 'nop', 'get position',
'set position', 'initialize_robot', 'initialize_grid']
def __init__(self):
self.robots = []
#self.robots = [RoboRallyRobot(12, '192.168.1.12', x=1, y=1, orientation='>')]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
# socket for movement commands
self.comm_socket = socket.socket()
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
for robot in self.robots:
robot.connect()
self.comm_socket.bind(('', 1337))
self.comm_socket.listen(5)
self.t = time.time()
# start thread for marker position detection
self.estimator = ArucoEstimator(self.robot_ids.keys())
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking)
self.estimator_thread.start()
self.controller = MPCController(self.estimator)
print("waiting for corner and robot detection..")
while not self.estimator.all_robots_detected() or not self.estimator.all_corners_detected():
pass
print("everything detected!")
# drive robots to initial position
for robot_id in self.robot_ids:
self.grid_control(robot_id, 'nop')
def run(self):
print("waiting until all markers are detected...")
while not self.estimator.all_corners_detected():
pass
print("starting control")
running = True
while running:
(clientsocket, address) = self.comm_socket.accept()
clientsocket.settimeout(None)
connected = True
while connected:
try:
data = myreceive(clientsocket)
print("data received: ", data)
inputs = data.split(b',')
cmd = inputs[0]
cmd = cmd.strip().decode()
if len(inputs) > 1:
if cmd in RemoteController.valid_cmds:
if cmd == 'initialize_grid':
try:
grid_columns = int(inputs[1])
grid_rows = int(inputs[2])
self.estimator.grid_columns = grid_columns
self.estimator.grid_rows = grid_rows
clientsocket.sendall(b'OK\n')
except ValueError:
print("could not initialize grid!")
clientsocket.sendall(b'"could not initialize grid!\n'
b'expected: initialize_robot, <grid columns>, <grid rows>')
except IndexError:
print("could not initialize grid!")
clientsocket.sendall(b'"could not initialize grid!\n'
b'expected: initialize_robot, <grid columns>, <grid rows>')
else: # robot command
try:
robot_id = int(inputs[1])
except ValueError:
robot_id = None
print("could not read robot id!")
clientsocket.sendall(b'Could not read robot id!\n')
if cmd == 'initialize_robot':
# add a new robot to the game
try:
id = int(inputs[1])
ip = inputs[2].decode().strip()
x = int(inputs[3])
y = int(inputs[4])
orientation = inputs[5].decode().strip()
print(f"initializing new robot with id {id} and ip {ip} at pos ({x},{y}) with "
f"orientation '{orientation}'")
if id not in self.robot_ids:
new_robot = RoboRallyRobot(id=id, ip=ip, x=x, y=y, orientation=orientation)
new_robot.connect()
# store the new robot in the list of robots
self.robots.append(new_robot)
self.robot_ids[new_robot.id] = new_robot # this also means the estimator
# will track the new robot because
# it got a reference to the list of
# robot ids to keep an eye out for
else: # robot is already created -> just re-initialize its position
new_robot = self.robot_ids[id]
new_robot.grid_x = x
new_robot.grid_y = y
new_robot.orientation = orientation
if new_robot.connected:
print("created new robot and successfully connected to it!")
while not self.estimator.all_robots_detected(): # wait until the robot gets detected
pass
# drive the new robot to its starting position
self.grid_control(new_robot.id, 'nop')
clientsocket.sendall(b'OK\n')
else:
clientsocket.sendall(f"error: could not connect to new robot {new_robot}".encode())
except IndexError:
print("could not initialize a new robot")
clientsocket.sendall('could not initialize a new robot: invalid command format\n'
'expected: initialize_robot, <id>, <ip>, <x>, <y>, <orientation>\n'.encode())
except ValueError:
print("could not initialize a new robot")
clientsocket.sendall('could not initialize a new robot: invalid command format\n'
'expected: initialize_robot, <id>, <ip>, <x>, <y>, <orientation>\n'.encode())
elif robot_id in self.robot_ids:
if cmd == b'get position':
clientsocket.sendall(bytes('{}\n'.format(self.robot_ids[robot_id].grid_pos)))
elif cmd == b'set position':
try:
pos_data = ",".join(inputs[2:])
new_grid_pos = tuple(map(lambda x: int(x[1]) if x[0] < 2 else float(x[1]),
enumerate(pos_data.strip().strip('()').split(','))))
self.robot_ids[robot_id].grid_pos = new_grid_pos
clientsocket.sendall(b'OK\n')
except IndexError as e:
print("could not set grid position!")
clientsocket.sendall(bytes(
'could not set grid position! (invalid format)\n'.format(
self.robot_ids[robot_id].grid_pos)))
except ValueError as e:
print("could not set grid position!")
clientsocket.sendall(bytes(
'could not set grid position! (invalid format)\n'.format(
self.robot_ids[robot_id].grid_pos)))
else:
self.grid_control(robot_id, cmd)
clientsocket.sendall(b'OK\n')
else:
print("invalid robot id!")
clientsocket.sendall(b'Invalid robot id!\n')
else:
clientsocket.sendall(b'Invalid command!\n')
else: # len(inputs) <= 1
if b'quit' in inputs[0]:
clientsocket.close()
self.comm_socket.close()
connected = False
running = False
else:
print("could not process command!")
clientsocket.sendall(b'Could not process request!\n')
except RuntimeError:
print("disconnected")
connected = False
clientsocket.close()
def grid_control(self, robot_id, cmd):
robot = self.robot_ids[robot_id] # get robot to be controlled
print("robot grid pos before move: ", robot)
robot.move(cmd)
print("robot grid pos after move: ", robot)
target = self.estimator.get_pos_from_grid_point(robot.grid_x, robot.grid_y, robot.grid_orientation)
self.controller.move_to_pos(target, robot)
def main(args):
rc = RemoteController()
rc.run()
if __name__ == '__main__':
main(sys.argv)

View File

@ -1,37 +0,0 @@
import socket
class Robot:
def __init__(self, marker_id, ip):
self.id = marker_id
self.pos = None
self.euler = None
self.ip = ip
self.socket = socket.socket()
# currently active control
self.u1 = 0.0
self.u2 = 0.0
self.connected = False
def connect(self):
# connect to robot
try:
print("connecting to robot {} with ip {} ...".format(self.id, self.ip))
self.socket.connect((self.ip, 1234)) # connect to robot
print("connected!")
self.connected = True
except socket.error:
print("could not connect to robot {} with ip {}".format(self.id, self.ip))
def send_cmd(self, u1=0.0, u2=0.0):
if self.socket:
try:
self.socket.send(f'({u1},{u2})\n'.encode())
except BrokenPipeError:
print(f"error: connection to robot {self.id} with ip {self.ip} lost")
pass
except ConnectionResetError:
print(f"error: connection to robot {self.id} with ip {self.ip} lost")
pass