use qt gui instead of opencv

This commit is contained in:
Simon Pirkelmann 2020-11-25 21:19:56 +01:00
parent f26f958dc7
commit 9c1115d505
4 changed files with 387 additions and 103 deletions

View File

@ -4,6 +4,9 @@ import cv2
import os import os
import time import time
import math import math
from pyqtgraph.Qt import QtCore, QtGui
import pyqtgraph as pg
from shapely.geometry import LineString from shapely.geometry import LineString
from queue import Queue from queue import Queue
@ -26,7 +29,25 @@ class ArucoEstimator:
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0}, 'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
} }
def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8): def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=12, grid_rows=12):
self.app = QtGui.QApplication([])
## Create window with GraphicsView widget
self.win = pg.GraphicsLayoutWidget()
self.win.keyPressEvent = self.handleKeyPressEvent
self.win.show() ## show widget alone in its own window
self.win.setWindowTitle('ArucoEstimator')
self.view = self.win.addViewBox()
## lock the aspect ratio so pixels are always square
self.view.setAspectLocked(True)
## Create image item
self.img = pg.ImageItem(border='w')
self.img.setLevels([[0, 255], [0, 255], [0, 255]])
self.img.mouseClickEvent = self.handleMouseEvent
self.view.addItem(self.img)
self.grid_columns = grid_columns self.grid_columns = grid_columns
self.grid_rows = grid_rows self.grid_rows = grid_rows
@ -38,6 +59,9 @@ class ArucoEstimator:
self.draw_grid = False self.draw_grid = False
self.fps_start_time = time.time()
self.fps_counter = 0
self.event_queue = Queue() self.event_queue = Queue()
if use_realsense: # check if realsense camera is connected if use_realsense: # check if realsense camera is connected
@ -89,6 +113,7 @@ class ArucoEstimator:
self.drag_line_end = None self.drag_line_end = None
self.drag_line_start = None self.drag_line_start = None
self.previous_click = None self.previous_click = None
self.invert_grayscale = False
def compute_clicked_position(self, px, py): def compute_clicked_position(self, px, py):
if self.all_corners_detected(): if self.all_corners_detected():
@ -116,21 +141,16 @@ class ArucoEstimator:
target = np.array([px, -py]) target = np.array([px, -py])
return target return target
def mouse_callback(self, event, px, py, flags, param): def handleMouseEvent(self, event):
""" # get click position as distance to top-left corner of the image
This function is called for each mouse event inside the opencv window. px = event.pos().x()
If there are reference markers available we compute the real world position corresponding to the clicked py = self.img.height() - event.pos().y()
position and put it in an event queue. print(f"px = {px}, py = {py}")
:param event: type of event (mouse movement, left click, right click, etc.) if event.button() == QtCore.Qt.MouseButton.LeftButton:
:param px: x-position of event # self.drag_line_start = (px, py)
:param py: y-position of event # elif event == cv2.EVENT_LBUTTONUP:
"""
target = None
if event == cv2.EVENT_LBUTTONDOWN:
self.drag_line_start = (px, py)
elif event == cv2.EVENT_LBUTTONUP:
self.drag_line_end = (px, py) self.drag_line_end = (px, py)
self.drag_line_start = (px, py)
target_pos = self.compute_clicked_position(self.drag_line_start[0], self.drag_line_start[1]) target_pos = self.compute_clicked_position(self.drag_line_start[0], self.drag_line_start[1])
if self.drag_line_start != self.drag_line_end: if self.drag_line_start != self.drag_line_end:
# compute target angle for clicked position # compute target angle for clicked position
@ -155,106 +175,99 @@ class ArucoEstimator:
if self.drag_line_start is not None: if self.drag_line_start is not None:
self.drag_line_end = (px, py) self.drag_line_end = (px, py)
def run_tracking(self, draw_markers=True, draw_marker_coordinate_system=False, invert_grayscale=False): def process_frame(self):
""" draw_markers=True
Run the marker tracking in a loop draw_marker_coordinate_system=False
""" #cv2.setMouseCallback('RoboRally', self.mouse_callback)
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
cv2.setMouseCallback('RoboRally', self.mouse_callback)
fps_display_rate = 1 # displays the frame rate every 1 second fps_display_rate = 1 # displays the frame rate every 1 second
fps_counter = 0 if self.pipeline:
start_time = time.time() frames = self.pipeline.wait_for_frames()
try: color_frame = frames.get_color_frame()
running = True # if not color_frame:
while running: # continue
if self.pipeline: # Convert images to numpy arrays
frames = self.pipeline.wait_for_frames() color_image = np.asanyarray(color_frame.get_data())
color_frame = frames.get_color_frame() else:
if not color_frame: # Capture frame-by-frame
continue ret, color_image = self.cv_camera.read()
# Convert images to numpy arrays t_image = time.time()
color_image = np.asanyarray(color_frame.get_data())
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
if self.invert_grayscale:
cv2.bitwise_not(gray, gray)
# run aruco marker detection
detected_markers = self.detector.detect(gray)
# extract data for detected markers
detected_marker_data = {}
for marker in detected_markers:
if marker.id >= 0:
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
if marker.id in self.corner_marker_ids.values():
marker.calculateExtrinsics(0.075, self.camparam)
else: else:
# Capture frame-by-frame marker.calculateExtrinsics(0.07, self.camparam)
ret, color_image = self.cv_camera.read() detected_marker_data[marker.id]['Rvec'] = marker.Rvec
t_image = time.time() detected_marker_data[marker.id]['Tvec'] = marker.Tvec
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY) if draw_markers:
if invert_grayscale: marker.draw(color_image, np.array([255, 255, 255]), 2, True)
cv2.bitwise_not(gray, gray)
# run aruco marker detection if draw_marker_coordinate_system:
detected_markers = self.detector.detect(gray) aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
# extract data for detected markers # store data
detected_marker_data = {} for marker_id, data in detected_marker_data.items():
for marker in detected_markers: self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image)
if marker.id >= 0:
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
if marker.id in self.corner_marker_ids.values(): # draw grid
marker.calculateExtrinsics(0.075, self.camparam) if self.draw_grid:
else: color_image = self.draw_grid_lines(color_image, detected_marker_data)
marker.calculateExtrinsics(0.07, self.camparam) color_image = self.draw_robot_pos(color_image, detected_marker_data)
detected_marker_data[marker.id]['Rvec'] = marker.Rvec
detected_marker_data[marker.id]['Tvec'] = marker.Tvec
if draw_markers: # draw drag
marker.draw(color_image, np.array([255, 255, 255]), 2, True) if self.drag_line_start is not None and self.drag_line_end is not None:
color_image = cv2.line(color_image, self.drag_line_start, self.drag_line_end, color=(0, 0, 255), thickness=2)
if draw_marker_coordinate_system: # compute frame rate
aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1) self.fps_counter += 1
delta_t = time.time() - self.fps_start_time
if delta_t > fps_display_rate:
self.fps_counter = 0
self.fps_start_time = time.time()
color_image = cv2.putText(color_image, f"fps = {(self.fps_counter / delta_t):.2f}", (10, 25), cv2.FONT_HERSHEY_PLAIN, 2,
(0, 255, 255),
thickness=2)
# store data # Show images
for marker_id, data in detected_marker_data.items(): color_image_rgb = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) # convert to RGB
self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image) self.img.setImage(np.transpose(np.flipud(color_image_rgb), axes=(1, 0, 2)))
# draw grid QtCore.QTimer.singleShot(1, self.process_frame)
if self.draw_grid:
color_image = self.draw_grid_lines(color_image, detected_marker_data)
color_image = self.draw_robot_pos(color_image, detected_marker_data)
# draw drag def handleKeyPressEvent(self, ev):
if self.drag_line_start is not None and self.drag_line_end is not None: key = ev.key()
color_image = cv2.line(color_image, self.drag_line_start, self.drag_line_end, color=(0, 0, 255), thickness=2) self.event_queue.put(('key', key))
if key == QtCore.Qt.Key_G:
# compute frame rate self.draw_grid = not self.draw_grid
fps_counter += 1 elif key == QtCore.Qt.Key_Q:
delta_t = time.time() - start_time
if delta_t > fps_display_rate:
fps_counter = 0
start_time = time.time()
color_image = cv2.putText(color_image, f"fps = {(fps_counter / delta_t):.2f}", (10, 25), cv2.FONT_HERSHEY_PLAIN, 2,
(0, 255, 255),
thickness=2)
# Show images
cv2.imshow('RoboRally', color_image)
key = cv2.waitKey(1)
if key > 0:
self.event_queue.put(('key', key))
if key == ord('g'):
self.draw_grid = not self.draw_grid
if key == ord('q'):
running = False
if key == ord('x'):
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
if color_sensor.get_option(rs.option.enable_auto_exposure) == 1.0:
color_sensor.set_option(rs.option.enable_auto_exposure, False)
print("auto exposure OFF")
else:
color_sensor.set_option(rs.option.enable_auto_exposure, True)
print("auto exposure ON")
if key == ord('i'):
invert_grayscale = not invert_grayscale
finally:
cv2.destroyAllWindows()
if self.pipeline is not None: if self.pipeline is not None:
# Stop streaming # Stop streaming
self.pipeline.stop() self.pipeline.stop()
self.app.quit()
elif key == QtCore.Qt.Key_X:
if self.pipeline is not None:
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
if color_sensor.get_option(rs.option.enable_auto_exposure) == 1.0:
color_sensor.set_option(rs.option.enable_auto_exposure, False)
print("auto exposure OFF")
else:
color_sensor.set_option(rs.option.enable_auto_exposure, True)
print("auto exposure ON")
elif key == QtCore.Qt.Key_I:
self.invert_grayscale = not self.invert_grayscale
def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image): def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image):
# update the marker estimate with new data # update the marker estimate with new data
if marker_id in self.corner_marker_ids.values(): if marker_id in self.corner_marker_ids.values():
@ -445,5 +458,12 @@ class ArucoEstimator:
if __name__ == "__main__": if __name__ == "__main__":
estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14]) estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[11, 12, 13, 14])
estimator.run_tracking(draw_markers=True, invert_grayscale=True) estimator.process_frame()
import sys
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()
#estimator.run_tracking(draw_markers=True, invert_grayscale=True)

View File

@ -0,0 +1,62 @@
import socket
import pygame
from argparse import ArgumentParser
class ManualController:
def __init__(self, estimator):
self.estimator = estimator
self.estimator.external_keyboard_callback = self.handle_keypress
self.controlling = False
def handle_keypress(self, key):
pass
parser = ArgumentParser()
parser.add_argument('ip', metavar='ip', type=str, help='ip address of the controlled robot')
args = parser.parse_args()
ip = args.ip
pygame.init()
pygame.display.set_mode((640, 480))
rc_socket = socket.socket()
try:
rc_socket.connect((ip, 1234)) # connect to robot
except socket.error:
print("could not connect to socket")
running = True
while running:
u1 = 0
u2 = 0
vmax = 1.0
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
u1 = -vmax
u2 = vmax
print(f"turn left: ({u1},{u2})")
elif event.key == pygame.K_RIGHT:
u1 = vmax
u2 = -vmax
print(f"turn right: ({u1},{u2})")
elif event.key == pygame.K_UP:
u1 = vmax
u2 = vmax
print(f"forward: ({u1},{u2})")
elif event.key == pygame.K_DOWN:
u1 = -vmax
u2 = -vmax
print(f"backward: ({u1},{u2})")
elif event.key == pygame.K_ESCAPE:
print("quit")
running = False
u1 = 0.0
u2 = 0.0
rc_socket.send(f'({u1},{u2})\n'.encode())
elif event.type == pygame.KEYUP:
print("key released, resetting: ({},{})".format(u1, u2))
rc_socket.send(f'({u1},{u2})\n'.encode())

View File

@ -0,0 +1,117 @@
from flask import Flask, render_template, request
import socket
import threading
import time
import numpy as np
from robot import Robot
from mpc_controller import MPCController
from aruco_estimator import ArucoEstimator
from shapely.geometry import LineString
width = 600
height = 400
robot_ids = [11, 12, 13, 14]
robot_ids_control_history = [11, 12, 13, 14]
app = Flask(__name__)
def calc_target_pos(px, py):
if rc.estimator.all_corners_detected():
# compute column line
a = np.array([rc.estimator.corner_estimates['a']['x'], rc.estimator.corner_estimates['a']['y']])
b = np.array([rc.estimator.corner_estimates['b']['x'], rc.estimator.corner_estimates['b']['y']])
c = np.array([rc.estimator.corner_estimates['c']['x'], rc.estimator.corner_estimates['c']['y']])
d = np.array([rc.estimator.corner_estimates['d']['x'], rc.estimator.corner_estimates['d']['y']])
x_frac = (px + 0.5) / width
y_frac = (py + 0.5) / height
vab = b - a
vdc = c - d
column_line_top = a + x_frac * vab
column_line_bottom = d + x_frac * vdc
vad = d - a
vbc = c - b
row_line_top = a + y_frac * vad
row_line_bottom = b + y_frac * vbc
column_line = LineString([column_line_top, column_line_bottom])
row_line = LineString([row_line_top, row_line_bottom])
int_pt = column_line.intersection(row_line)
point_of_intersection = np.array([int_pt.x, int_pt.y, 0.0])
return point_of_intersection
@app.route('/', methods=('GET', 'POST'))
def presence():
global robot_ids_control_history
if request.method == 'GET':
return render_template('telepresence.html', oldest_control=robot_ids.index(robot_ids_control_history[0]))
elif request.method == 'POST':
x = int(float(request.form.get('x')))
y = int(float(request.form.get('y')))
print(x, y)
margin = 0.1
x = min(max(width * margin, x), width * (1 - margin))
y = min(max(height * margin, y), height * (1 - margin))
id = int(request.form.get('robot'))
robot_ids_control_history.append(robot_ids_control_history.pop(robot_ids_control_history.index(id)))
target_pos = calc_target_pos(x, y)
rc.estimator.event_queue.put(('robot', id, target_pos))
return 'OK'
class RemoteController:
def __init__(self):
self.robots = []
#self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
self.robots = [Robot(11, '10.10.11.88'), Robot(12, '10.10.11.91'),
Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
# socket for movement commands
self.comm_socket = socket.socket()
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
for robot in self.robots:
robot.connect()
self.comm_socket.bind(('', 1337))
self.comm_socket.listen(5)
self.t = time.time()
# start thread for marker position detection
self.estimator = ArucoEstimator(self.robot_ids.keys())
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking, kwargs={'draw_markers':False})
self.estimator_thread.start()
self.controller = MPCController(self.estimator)
def run(self):
print("waiting until all markers are detected...")
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
pass
print("starting control")
self.controller.interactive_control(robots=self.robots)
if __name__ == "__main__":
rc = RemoteController()
rc_thread = threading.Thread(target=rc.run)
rc_thread.start()
app.run('0.0.0.0')

View File

@ -0,0 +1,85 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Mensa-Abend Telepräsenz</title>
<style>
div.meerschweinchen {
max-width: 600px;
margin: auto;
text-align: center;
}
div.arena {
height: 400px;
width: 600px;
background-image: url("https://p.0au.de/3d62ea7/foo.png");
border: none;
text-align: center;
position: relative
}
div.arena p {
margin: 0;
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%)
}
</style>
<script type=text/javascript src="{{url_for('static', filename='jquery-3.5.1.min.js') }}"></script>
</head>
<body>
<h1 style="text-align: center">Welcome to Mensa-Abend digital</h1>
<div style="text-align: center">
<p>Click anywhere to control the robots in our hackerspace. You can see them in the Zoom video.<br>
<br>
Please let us know about your study background.
</p>
</div>
<div class="meerschweinchen">
<div class=arena style="font-size: large;">
</div>
<form method="post">
<p>Choose your robot:</p>
<div>
<input type="radio" id="r11" name="robot_id" value="11">
<label for="r11">Robot 11</label>
</div>
<div>
<input type="radio" id="r12" name="robot_id" value="12">
<label for="r12">Robot 12</label>
</div>
<div>
<input type="radio" id="r13" name="robot_id" value="13">
<label for="r13">Robot 13</label>
</div>
<div>
<input type="radio" id="r14" name="robot_id" value="14">
<label for="r14">Robot 14</label>
</div>
</form>
</div>
</body>
<script>
function fireThis(e) {
var parentOffset = $(this).parent().offset();
var x = e.pageX - parentOffset.left;
var y = e.pageY - parentOffset.top;
var robot = document.querySelector('input[name = "robot_id"]:checked').value;
$.post( "/", { 'x': x, 'y': y , 'robot': robot} );
}
$('.arena').on('mousedown', fireThis);
function callRandom() {
var array = document.getElementsByName('robot_id');
array[{{ oldest_control }}].checked = true;
}
callRandom();
</script>
</html>