forked from Telos4/RoboRally
robot driving to position determined by mouse click
This commit is contained in:
parent
49645765d7
commit
3ab8fadc7b
|
@ -22,7 +22,6 @@ class MPCController:
|
|||
def move_to_pos(self, target_pos, robot, near_target_counter=5):
|
||||
near_target = 0
|
||||
while near_target < near_target_counter:
|
||||
|
||||
while not self.estimator.event_queue.empty():
|
||||
event = self.estimator.event_queue.get()
|
||||
print("event: ", event)
|
||||
|
@ -56,7 +55,6 @@ class MPCController:
|
|||
print("quit!")
|
||||
self.controlling = False
|
||||
robot.send_cmd()
|
||||
self.anim_stopped = True
|
||||
return
|
||||
|
||||
x_pred = self.get_measurement(robot.id)
|
||||
|
@ -98,5 +96,77 @@ class MPCController:
|
|||
time.sleep(self.dt)
|
||||
self.t = time.time() # save time the most recent control was applied
|
||||
|
||||
def interactive_control(self, robots):
|
||||
controlled_robot_number = 0
|
||||
robot = robots[controlled_robot_number]
|
||||
|
||||
target_pos = np.array([0.0, 0.0, 0.0])
|
||||
|
||||
running = True
|
||||
while running:
|
||||
# handle events from opencv window
|
||||
while not self.estimator.event_queue.empty():
|
||||
event = self.estimator.event_queue.get()
|
||||
print("event: ", event)
|
||||
if event[0] == 'click':
|
||||
target_pos = event[1]
|
||||
elif event[0] == 'key':
|
||||
key = event[1]
|
||||
|
||||
if key == 32: # arrow up
|
||||
self.controlling = not self.controlling
|
||||
if not self.controlling:
|
||||
print("disable control")
|
||||
robot.send_cmd() # stop robot
|
||||
else:
|
||||
print("enable control")
|
||||
self.t = time.time()
|
||||
elif key == 48: # 0
|
||||
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
|
||||
elif key == 43: # +
|
||||
self.control_scaling += 0.1
|
||||
self.control_scaling = min(self.control_scaling, 1.0)
|
||||
print("control scaling = ", self.control_scaling)
|
||||
elif key == 45: # -
|
||||
self.control_scaling -= 0.1
|
||||
self.control_scaling = max(self.control_scaling, 0.1)
|
||||
print("control scaling = ", self.control_scaling)
|
||||
elif key == 9: # TAB
|
||||
# switch controlled robot
|
||||
robot.send_cmd() # stop current robot
|
||||
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
|
||||
robot = robots[controlled_robot_number]
|
||||
print(f"controlled robot: {robot.id}")
|
||||
elif key == 113 or key == 27: # q or ESCAPE
|
||||
print("quit!")
|
||||
self.controlling = False
|
||||
robot.send_cmd()
|
||||
return
|
||||
|
||||
if self.controlling:
|
||||
# measure state
|
||||
x_pred = self.get_measurement(robot.id)
|
||||
|
||||
#print(np.linalg.norm(x_pred-target_pos))
|
||||
|
||||
# solve mpc open loop problem
|
||||
res = self.ols.solve(x_pred, target_pos)
|
||||
|
||||
us1 = res[0] * self.control_scaling
|
||||
us2 = res[1] * self.control_scaling
|
||||
|
||||
dt_mpc = time.time() - self.t
|
||||
if dt_mpc < self.dt: # wait until next control can be applied
|
||||
time.sleep(self.dt - dt_mpc)
|
||||
|
||||
# send controls to the robot
|
||||
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
|
||||
u1 = us1[i]
|
||||
u2 = us2[i]
|
||||
robot.send_cmd(u1, u2)
|
||||
if i < self.mstep:
|
||||
time.sleep(self.dt)
|
||||
self.t = time.time() # save time the most recent control was applied
|
||||
|
||||
def get_measurement(self, robot_id):
|
||||
return np.array(self.estimator.get_robot_state_estimate(robot_id))
|
||||
return np.array(self.estimator.get_robot_state_estimate(robot_id))
|
53
remote_control/mpc_test.py
Normal file
53
remote_control/mpc_test.py
Normal file
|
@ -0,0 +1,53 @@
|
|||
import sys
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
||||
from robot import Robot
|
||||
|
||||
from mpc_controller import MPCController
|
||||
|
||||
import opencv_viewer_example
|
||||
|
||||
|
||||
class RemoteController:
|
||||
def __init__(self):
|
||||
self.robots = [Robot(12, '192.168.1.12')]
|
||||
|
||||
self.robot_ids = {}
|
||||
for r in self.robots:
|
||||
self.robot_ids[r.id] = r
|
||||
|
||||
# socket for movement commands
|
||||
self.comm_socket = socket.socket()
|
||||
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
for robot in self.robots:
|
||||
robot.connect()
|
||||
|
||||
self.comm_socket.bind(('', 1337))
|
||||
self.comm_socket.listen(5)
|
||||
|
||||
self.t = time.time()
|
||||
|
||||
# start thread for marker position detection
|
||||
self.estimator = opencv_viewer_example.ArucoEstimator(self.robot_ids.keys())
|
||||
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking)
|
||||
self.estimator_thread.start()
|
||||
|
||||
self.controller = MPCController(self.estimator)
|
||||
|
||||
def run(self):
|
||||
print("waiting until all markers are detected...")
|
||||
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
|
||||
pass
|
||||
print("starting control")
|
||||
self.controller.interactive_control(robots=self.robots)
|
||||
|
||||
def main(args):
|
||||
rc = RemoteController()
|
||||
rc.run()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(sys.argv)
|
|
@ -5,21 +5,6 @@ from cv2 import aruco
|
|||
from shapely.geometry import LineString
|
||||
from queue import Queue
|
||||
|
||||
DS5_product_ids = ["0AD1", "0AD2", "0AD3", "0AD4", "0AD5", "0AF6", "0AFE", "0AFF", "0B00", "0B01", "0B03", "0B07","0B3A"]
|
||||
|
||||
|
||||
def find_device_that_supports_advanced_mode():
|
||||
ctx = rs.context()
|
||||
ds5_dev = rs.device()
|
||||
devices = ctx.query_devices()
|
||||
for dev in devices:
|
||||
if dev.supports(rs.camera_info.product_id) and str(dev.get_info(rs.camera_info.product_id)) in DS5_product_ids:
|
||||
if dev.supports(rs.camera_info.name):
|
||||
print("Found device that supports advanced mode:", dev.get_info(rs.camera_info.name))
|
||||
return dev
|
||||
raise Exception("No device that supports advanced mode was found")
|
||||
|
||||
|
||||
class ArucoEstimator:
|
||||
grid_columns = 10
|
||||
grid_rows = 8
|
||||
|
@ -33,19 +18,21 @@ class ArucoEstimator:
|
|||
angles = []
|
||||
|
||||
corner_estimates = {
|
||||
'a': (None, 0), # (estimate, n_estimates)
|
||||
'b': (None, 0),
|
||||
'c': (None, 0),
|
||||
'd': (None, 0)
|
||||
'a': {'pixel_coordinate': None, 'real_world_estimate': None, 'n_estimates': 0 },
|
||||
'b': {'pixel_coordinate': None, 'real_world_estimate': None, 'n_estimates': 0 },
|
||||
'c': {'pixel_coordinate': None, 'real_world_estimate': None, 'n_estimates': 0 },
|
||||
'd': {'pixel_coordinate': None, 'real_world_estimate': None, 'n_estimates': 0 },
|
||||
}
|
||||
|
||||
def __init__(self, robot_marker_ids=[]):
|
||||
def __init__(self, robot_marker_ids=None, use_realsense=True):
|
||||
if robot_marker_ids is None:
|
||||
robot_marker_ids = []
|
||||
self.robot_marker_ids = robot_marker_ids
|
||||
self.robot_marker_estimates = dict([(id, None) for id in self.robot_marker_ids])
|
||||
|
||||
self.event_queue = Queue()
|
||||
|
||||
if False: # check if realsense camera is connected
|
||||
if use_realsense: # check if realsense camera is connected
|
||||
# Configure depth and color streams
|
||||
self.pipeline = rs.pipeline()
|
||||
config = rs.config()
|
||||
|
@ -71,10 +58,33 @@ class ArucoEstimator:
|
|||
self.cv_camera = cv2.VideoCapture(0)
|
||||
self.pipeline = None
|
||||
|
||||
def mouse_callback(self, event, x, y, flags, param):
|
||||
def mouse_callback(self, event, px, py, flags, param):
|
||||
if event == 1:
|
||||
print(event)
|
||||
self.event_queue.put(('click', (x,y)))
|
||||
if self.all_corners_detected():
|
||||
# inter/extrapolate from clicked point to marker position
|
||||
px1 = self.corner_estimates['a']['pixel_coordinate'][0]
|
||||
px3 = self.corner_estimates['c']['pixel_coordinate'][0]
|
||||
py1 = self.corner_estimates['a']['pixel_coordinate'][1]
|
||||
py3 = self.corner_estimates['c']['pixel_coordinate'][1]
|
||||
|
||||
x1 = self.corner_estimates['a']['real_world_estimate'][0]
|
||||
x3 = self.corner_estimates['c']['real_world_estimate'][0]
|
||||
y1 = self.corner_estimates['a']['real_world_estimate'][1]
|
||||
y3 = self.corner_estimates['c']['real_world_estimate'][1]
|
||||
|
||||
alpha = (px - px1) / (px3 - px1)
|
||||
beta = (py - py1) / (py3 - py1)
|
||||
|
||||
print(f"alpha = {alpha}, beta = {beta}")
|
||||
|
||||
target_x = x1 + alpha * (x3 - x1)
|
||||
target_y = y1 + beta * (y3 - y1)
|
||||
target = np.array([target_x, target_y, 0])
|
||||
|
||||
print(f"target = ({target_x},{target_y})")
|
||||
self.event_queue.put(('click', target))
|
||||
else:
|
||||
print("not all markers have been detected!")
|
||||
|
||||
def run_tracking(self):
|
||||
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
|
||||
|
@ -102,20 +112,21 @@ class ArucoEstimator:
|
|||
corners, ids, rejectedImgPoints = aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
|
||||
frame = aruco.drawDetectedMarkers(color_image.copy(), corners, ids)
|
||||
|
||||
if False:#ids is not None:
|
||||
for id, c in zip(ids, corners):
|
||||
res = aruco.estimatePoseSingleMarkers(c, 0.10, self.camera_matrix, self.dist_coeffs)
|
||||
if ids is not None:
|
||||
for id, corner in zip(ids, corners):
|
||||
corner_pixel_coord = np.mean(corner[0], axis=0)
|
||||
res = aruco.estimatePoseSingleMarkers(corner, 0.10, self.camera_matrix, self.dist_coeffs)
|
||||
rvecs = res[0][0][0]
|
||||
tvecs = res[1][0][0]
|
||||
|
||||
self.update_estimate(id, rvecs, tvecs)
|
||||
self.update_estimate(id[0], corner_pixel_coord, rvecs, tvecs)
|
||||
|
||||
frame = self.draw_grid_lines(frame, corners, ids)
|
||||
frame = self.draw_robot_pos(frame, corners, ids)
|
||||
else:
|
||||
# pretent we detected some markers
|
||||
pass
|
||||
self.update_estimate(0, rvec=np.array([0,0,0]), tvec=np.array([-1, 1, 0]))
|
||||
self.update_estimate(0, rvec=np.array([0, 0, 0]), tvec=np.array([-1, 1, 0]))
|
||||
self.update_estimate(1, rvec=np.array([0, 0, 0]), tvec=np.array([1, 1.5, 0]))
|
||||
self.update_estimate(2, rvec=np.array([0, 0, 0]), tvec=np.array([1, -1.5, 0]))
|
||||
self.update_estimate(3, rvec=np.array([0, 0, 0]), tvec=np.array([-1, -1, 0]))
|
||||
|
@ -123,6 +134,8 @@ class ArucoEstimator:
|
|||
#self.update_estimate(12, rvec=np.array([0.0, 0.0, 0.0]), tvec=np.array([-1.0 + random.random() * 2, -1.0 + random.random() * 2, 0]))
|
||||
self.update_estimate(12, rvec=np.array([0.0, 0.0, 0.0]),
|
||||
tvec=np.array([1.2, 0.42, 0]))
|
||||
self.update_estimate(13, rvec=np.array([0.0, 0.0, 0.0]),
|
||||
tvec=np.array([-1.2, -0.42, 0]))
|
||||
|
||||
# Show images
|
||||
cv2.imshow('RoboRally', frame)
|
||||
|
@ -139,21 +152,25 @@ class ArucoEstimator:
|
|||
# Stop streaming
|
||||
self.pipeline.stop()
|
||||
|
||||
def update_estimate(self, id, rvec, tvec):
|
||||
def update_estimate(self, id, pixel_coord, rvec, tvec):
|
||||
# update the marker estimate with new data
|
||||
if id in self.corner_marker_ids.values():
|
||||
# for corner markers we compute the mean of all measurements s.t. the position stabilizes over time
|
||||
# (we assume the markers do not move)
|
||||
corner = next(filter(lambda key: self.corner_marker_ids[key] == id, self.corner_marker_ids.keys())) # get corresponding corner to the detected marker
|
||||
old_estimate = self.corner_estimates[corner][0]
|
||||
n_estimates = self.corner_estimates[corner][1]
|
||||
old_estimate = self.corner_estimates[corner]['real_world_estimate']
|
||||
n_estimates = self.corner_estimates[corner]['n_estimates']
|
||||
|
||||
tvec_proj = tvec[0:2] # projection to get rid of z coordinate
|
||||
tvec_proj = np.array((tvec_proj[0], -tvec_proj[1])) # flip y coordinate
|
||||
if old_estimate is not None:
|
||||
new_estimate = (n_estimates * old_estimate + tvec_proj) / (n_estimates + 1) # weighted update
|
||||
else:
|
||||
new_estimate = tvec_proj # first estimate
|
||||
self.corner_estimates[corner] = (new_estimate, n_estimates + 1)
|
||||
self.corner_estimates[corner]['real_world_estimate'] = new_estimate
|
||||
self.corner_estimates[corner]['n_estimates'] = n_estimates + 1
|
||||
self.corner_estimates[corner]['pixel_coordinate'] = pixel_coord
|
||||
|
||||
elif id in self.robot_marker_ids:
|
||||
# for robot markers we extract x and y position as well as the angle
|
||||
# here we could also implement a filter
|
||||
|
@ -171,7 +188,11 @@ class ArucoEstimator:
|
|||
|
||||
def all_corners_detected(self):
|
||||
# checks if all corner markers have been detected at least once
|
||||
return not any([estimate[0] is None for estimate in self.corner_estimates.values()])
|
||||
return not any([estimate['n_estimates'] == 0 for estimate in self.corner_estimates.values()])
|
||||
|
||||
def all_robots_detected(self):
|
||||
# checks if all robot markers have been detected at least once
|
||||
return not any([estimate is None for estimate in self.robot_marker_estimates.values()])
|
||||
|
||||
def get_pos_from_grid_point(self, x, y, orientation=None):
|
||||
"""
|
||||
|
@ -187,10 +208,10 @@ class ArucoEstimator:
|
|||
assert self.all_corners_detected()
|
||||
|
||||
# compute column line
|
||||
a = self.corner_estimates['a'][0]
|
||||
b = self.corner_estimates['b'][0]
|
||||
c = self.corner_estimates['c'][0]
|
||||
d = self.corner_estimates['d'][0]
|
||||
a = self.corner_estimates['a']['real_world_estimate']
|
||||
b = self.corner_estimates['b']['real_world_estimate']
|
||||
c = self.corner_estimates['c']['real_world_estimate']
|
||||
d = self.corner_estimates['d']['real_world_estimate']
|
||||
x_frac = (x + 0.5) / self.grid_columns
|
||||
y_frac = (y + 0.5) / self.grid_rows
|
||||
|
||||
|
@ -239,8 +260,8 @@ class ArucoEstimator:
|
|||
|
||||
def print_corner_estimates(self):
|
||||
for key, value in self.corner_estimates.items():
|
||||
if value[0] is not None:
|
||||
print("corner marker {} at pos {}".format(key, value[0]))
|
||||
if value['n_estimates'] != 0:
|
||||
print("corner marker {} at pos {}".format(key, value['real_world_estimate']))
|
||||
print()
|
||||
|
||||
def draw_corner_line(self, frame, corner_1, corner_2, corner_coords_dict):
|
||||
|
@ -278,7 +299,7 @@ class ArucoEstimator:
|
|||
for x in range(1,self.grid_columns):
|
||||
column_line_top = a + x / self.grid_columns * vab
|
||||
column_line_bottom = d + x / self.grid_columns * vdc
|
||||
frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(255, 0, 0),
|
||||
frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(0, 255, 0),
|
||||
thickness=1)
|
||||
|
||||
vad = d - a
|
||||
|
@ -286,7 +307,7 @@ class ArucoEstimator:
|
|||
for y in range(1, self.grid_rows):
|
||||
row_line_top = a + y / self.grid_rows * vad
|
||||
row_line_bottom = b + y / self.grid_rows * vbc
|
||||
frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(255, 0, 0),
|
||||
frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(0, 255, 0),
|
||||
thickness=1)
|
||||
|
||||
return frame
|
||||
|
|
35
remote_control/robot.py
Normal file
35
remote_control/robot.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
import socket
|
||||
import sys
|
||||
|
||||
|
||||
class Robot:
|
||||
def __init__(self, marker_id, ip):
|
||||
self.id = marker_id
|
||||
self.pos = None
|
||||
self.euler = None
|
||||
|
||||
self.ip = ip
|
||||
self.socket = socket.socket()
|
||||
|
||||
# currently active control
|
||||
self.u1 = 0.0
|
||||
self.u2 = 0.0
|
||||
|
||||
def connect(self):
|
||||
# connect to robot
|
||||
try:
|
||||
print("connecting to robot {} with ip {} ...".format(self.id, self.ip))
|
||||
self.socket.connect((self.ip, 1234)) # connect to robot
|
||||
print("connected!")
|
||||
except socket.error:
|
||||
print("could not connect to robot {} with ip {}".format(self.id, self.ip))
|
||||
sys.exit(1)
|
||||
|
||||
def send_cmd(self, u1=0.0, u2=0.0):
|
||||
if self.socket:
|
||||
try:
|
||||
self.socket.send(f'({u1},{u2})\n'.encode())
|
||||
except BrokenPipeError:
|
||||
# print(f"error: connection to robot {self.id} with ip {self.ip} lost")
|
||||
pass
|
||||
|
Loading…
Reference in New Issue
Block a user