input handling using opencv window

This commit is contained in:
Simon Pirkelmann 2020-10-15 22:41:30 +02:00
parent e3753f7644
commit 49645765d7
3 changed files with 146 additions and 197 deletions

View File

@ -0,0 +1,102 @@
import numpy as np
import time
from casadi_opt import OpenLoopSolver
class MPCController:
def __init__(self, estimator):
self.t = time.time()
self.estimator = estimator
self.controlling = False
self.mstep = 2
self.ols = OpenLoopSolver(N=20, T=1.0)
self.ols.setup()
self.dt = self.ols.T / self.ols.N
# integrator
self.omega_max = 5.0
self.control_scaling = 0.2
def move_to_pos(self, target_pos, robot, near_target_counter=5):
near_target = 0
while near_target < near_target_counter:
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
pass
elif event[0] == 'key':
key = event[1]
if key == 84: # arrow up
self.controlling = True
self.t = time.time()
elif key == 82: # arrow down
self.controlling = False
robot.send_cmd()
elif key == 48: # 0
target_pos = np.array([0.0,0.0,0.0])
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 113:
print("quit!")
self.controlling = False
robot.send_cmd()
return
elif key == 27: # escape
print("quit!")
self.controlling = False
robot.send_cmd()
self.anim_stopped = True
return
x_pred = self.get_measurement(robot.id)
error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
#print("error pos = ", error_pos)
#print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
#if error_pos > 0.075 or error_ang > 0.35:
if error_pos > 0.05 or error_ang > 0.2:
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
#us1 = res[0]
#us2 = res[1]
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
#print("u = {}", (us1, us2))
tmpc_end = time.time()
#print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
#print("sleeping for {} seconds...".format(self.dt - dt_mpc))
time.sleep(self.dt - dt_mpc)
else:
us1 = [0] * self.mstep
us2 = [0] * self.mstep
near_target += 1
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
def get_measurement(self, robot_id):
return np.array(self.estimator.get_robot_state_estimate(robot_id))

View File

@ -3,6 +3,7 @@ import numpy as np
import cv2
from cv2 import aruco
from shapely.geometry import LineString
from queue import Queue
DS5_product_ids = ["0AD1", "0AD2", "0AD3", "0AD4", "0AD5", "0AF6", "0AFE", "0AFF", "0B00", "0B01", "0B03", "0B07","0B3A"]
@ -42,6 +43,8 @@ class ArucoEstimator:
self.robot_marker_ids = robot_marker_ids
self.robot_marker_estimates = dict([(id, None) for id in self.robot_marker_ids])
self.event_queue = Queue()
if False: # check if realsense camera is connected
# Configure depth and color streams
self.pipeline = rs.pipeline()
@ -68,9 +71,18 @@ class ArucoEstimator:
self.cv_camera = cv2.VideoCapture(0)
self.pipeline = None
def mouse_callback(self, event, x, y, flags, param):
if event == 1:
print(event)
self.event_queue.put(('click', (x,y)))
def run_tracking(self):
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
cv2.setMouseCallback('RoboRally', self.mouse_callback)
try:
while True:
running = True
while running:
if self.pipeline:
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
@ -113,11 +125,16 @@ class ArucoEstimator:
tvec=np.array([1.2, 0.42, 0]))
# Show images
cv2.namedWindow('RealSense', cv2.WINDOW_AUTOSIZE)
cv2.imshow('RealSense', frame)
cv2.waitKey(1)
cv2.imshow('RoboRally', frame)
key = cv2.waitKey(1)
if key > 0:
self.event_queue.put(('key', key))
print('key = ', key)
if key == ord('q'):
running = False
finally:
cv2.destroyAllWindows()
if self.pipeline is not None:
# Stop streaming
self.pipeline.stop()
@ -304,3 +321,5 @@ class ArucoEstimator:
if __name__ == "__main__":
estimator = ArucoEstimator()
estimator.run_tracking()

View File

@ -1,36 +1,9 @@
# startup:
# roscore -> start ros
# rosparam set cv_camera/device_id 0 -> set appropriate camera device
# rosrun cv_camera cv_camera_node -> start the camera
# roslaunch aruco_detect aruco_detect.launch camera:=cv_camera image:=image_raw dictionary:=16 transport:= fiducial_len:=0.1 # aruco marker detection
# python fiducial_to_2d_pos_angle.py -> compute position and angle of markers in 2d plane
import sys
import rospy
import pygame
import numpy as np
import socket
import scipy.integrate
import copy
import threading
from copy import deepcopy
import matplotlib.pyplot as plt
import matplotlib.animation as anim
import matplotlib.patches as patch
from shapely.geometry import Polygon
import time
from casadi_opt import OpenLoopSolver
#from marker_pos_angle.msg import id_pos_angle
from collections import OrderedDict
from argparse import ArgumentParser
from mpc_controller import MPCController
import opencv_viewer_example
@ -76,12 +49,6 @@ class Robot:
self.ip = ip
self.socket = socket.socket()
# variables for measurements
self.tms_0 = None
self.xm_0 = None
self.tms = None
self.xms = None
# currently active control
self.u1 = 0.0
self.u2 = 0.0
@ -126,36 +93,23 @@ class Robot:
print("could not connect to robot {} with ip {}".format(self.id, self.ip))
sys.exit(1)
def send_cmd(self, u1=0.0, u2=0.0):
if self.socket:
try:
self.socket.send(f'({u1},{u2})\n'.encode())
except BrokenPipeError:
#print(f"error: connection to robot {self.id} with ip {self.ip} lost")
pass
def __str__(self):
return self.__repr__()
def __repr__(self):
return f"x: {self.x}, y: {self.y}, orienation: {self.orientation}"
def f_ode(t, x, u):
# dynamical model of the two-wheeled robot
# TODO: find exact values for these parameters
r = 0.03
R = 0.05
d = 0.02
theta = x[2]
omega_r = u[0]
omega_l = u[1]
dx = np.zeros(3)
dx[0] = (r/2.0 * np.cos(theta) - r*d/(2.0*R) * np.sin(theta)) * omega_r \
+ (r/2.0 * np.cos(theta) + r*d/(2.0 * R) * np.sin(theta)) * omega_l
dx[1] = (r/2.0 * np.sin(theta) + r*d/(2.0*R) * np.cos(theta)) * omega_r \
+ (r/2 * np.sin(theta) - r*d/(2.0*R) * np.cos(theta)) * omega_l
dx[2] = -r/(2.0*R) * (omega_r - omega_l)
return dx
class RemoteController:
def __init__(self, id, ip):
def __init__(self):
# self.robots = #[Robot(11, '192.168.1.11', (6, -3, np.pi)), Robot(12, '192.168.1.12', (6, -3, np.pi)),
# Robot(13, '192.168.1.13', (6, -3, np.pi)), Robot(14, '192.168.1.14', (6, -2, np.pi))]
#self.robots = [Robot(13, '192.168.1.13', (6, -3, np.pi))]
@ -182,38 +136,14 @@ class RemoteController:
self.t = time.time()
# variables for simulated state
self.x0 = None
self.ts = np.array([])
self.xs = []
# variable for mpc open loop
self.ol_x = None
self.ol_y = None
# ROS subscriber for detected markers
# start thread for marker position detection
self.estimator = opencv_viewer_example.ArucoEstimator(self.robot_ids.keys())
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking)
self.estimator_thread.start()
# pid parameters
self.controlling = False
self.controller = MPCController(self.estimator)
self.mstep = 2
self.ols = OpenLoopSolver(N=20, T=1.0)
self.ols.setup()
self.dt = self.ols.T / self.ols.N
self.target = (0.0, 0.0, 0.0)
# integrator
self.r = scipy.integrate.ode(f_ode)
self.omega_max = 5.0
self.control_scaling = 0.2
#self.omega_max = 13.32
def controller(self):
def run(self):
print("waiting until all markers are detected...")
while not self.estimator.all_corners_detected():
pass
@ -260,7 +190,7 @@ class RemoteController:
print("could not set grid position!")
clientsocket.sendall(bytes('could not set grid position! (invalid format)\n'.format(self.robot_ids[robot_id].grid_pos)))
else:
self.mpc_control(robot_id, cmd)
self.grid_control(robot_id, cmd)
clientsocket.sendall(b'OK\n')
else:
print("invalid robot id!")
@ -283,123 +213,21 @@ class RemoteController:
connected = False
clientsocket.close()
def mpc_control(self, robot_id, cmd):
def grid_control(self, robot_id, cmd):
robot = self.robot_ids[robot_id] # get robot to be controlled
print("robot grid pos before move: ", robot)
robot.move(cmd)
print("robot grid pos after move: ", robot)
self.target = self.estimator.get_pos_from_grid_point(robot.x, robot.y, robot.orientation)
target = self.estimator.get_pos_from_grid_point(robot.x, robot.y, robot.orientation)
self.pid = False
self.mpc = True
near_target = 0
while near_target < 5:
# open loop controller
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_UP:
self.controlling = True
self.t = time.time()
elif event.key == pygame.K_DOWN:
self.controlling = False
if self.robot_ids[robot_id].socket:
self.robot_ids[robot_id].socket.send('(0.0,0.0)\n')
elif event.key == pygame.K_0:
self.target = np.array([0,0,0])
elif event.key == pygame.K_PLUS:
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif event.key == pygame.K_MINUS:
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif event.key == pygame.K_ESCAPE:
print("quit!")
self.controlling = False
if self.robot_ids[robot_id].socket:
self.robot_ids[robot_id].socket.send('(0.0,0.0)\n')
self.anim_stopped = True
return
elif event.key == pygame.QUIT:
print("quit!")
self.controlling = False
if self.robot_ids[robot_id].socket:
self.robot_ids[robot_id].socket.send('(0.0,0.0)\n')
self.anim_stopped = True
return
if self.mpc:
x_pred = self.get_measurement(robot_id)
tmpc_start = time.time()
error_pos = np.linalg.norm(x_pred[0:2] - self.target[0:2])
angles_unwrapped = np.unwrap([x_pred[2], self.target[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
#print("error pos = ", error_pos)
#print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
#if error_pos > 0.075 or error_ang > 0.35:
if error_pos > 0.05 or error_ang > 0.2:
# solve mpc open loop problem
res = self.ols.solve(x_pred, self.target)
#us1 = res[0]
#us2 = res[1]
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
#print("u = {}", (us1, us2))
tmpc_end = time.time()
#print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
#print("sleeping for {} seconds...".format(self.dt - dt_mpc))
time.sleep(self.dt - dt_mpc)
else:
us1 = [0] * self.mstep
us2 = [0] * self.mstep
near_target += 1
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
if self.robot_ids[robot_id].socket:
#self.robot_ids[robot_id].socket.send('({},{})\n'.format(u1, u2).encode())
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
def get_measurement(self, robot_id):
return np.array(self.estimator.get_robot_state_estimate(robot_id))
self.controller.move_to_pos(target, robot)
def main(args):
parser = ArgumentParser()
parser.add_argument('id', metavar='id', type=str, help='marker id of the controlled robot')
parser.add_argument('ip', metavar='ip', type=str, help='ip address of the controlled robot')
args = parser.parse_args()
rc = RemoteController()
marker_id = int(args.id)
ip = args.ip
rc = RemoteController(marker_id, ip)
pygame.init()
screenheight = 480
screenwidth = 640
pygame.display.set_mode([screenwidth, screenheight])
rc.controller()
rc.run()
if __name__ == '__main__':
main(sys.argv)