Compare commits

...

9 Commits

8 changed files with 502 additions and 299 deletions

View File

@ -3,12 +3,14 @@ import numpy as np
import cv2
import os
import time
import math
from shapely.geometry import LineString
from queue import Queue
import aruco
class ArucoEstimator:
corner_marker_ids = {
'a': 0,
@ -68,10 +70,6 @@ class ArucoEstimator:
# create detector and get parameters
self.detector = aruco.MarkerDetector()
# self.detector.setDictionary('ARUCO_MIP_36h12')
#self.detector.setDictionary('ARUCO_MIP_16h3')
# self.detector.setDictionary('ARUCO')
#self.detector.setDetectionMode(aruco.DM_NORMAL, 0.05)
self.detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.05)
self.detector_params = self.detector.getParameters()
@ -87,18 +85,12 @@ class ArucoEstimator:
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "realsense.yml"))
else:
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "dfk72_6mm_param2.yml"))
print(self.camparam)
def mouse_callback(self, event, px, py, flags, param):
"""
This function is called for each mouse event inside the opencv window.
If there are reference markers available we compute the real world position corresponding to the clicked
position and put it in an event queue.
:param event: type of event (mouse movement, left click, right click, etc.)
:param px: x-position of event
:param py: y-position of event
"""
if event == cv2.EVENT_LBUTTONDOWN:
self.drag_line_end = None
self.drag_line_start = None
self.previous_click = None
def compute_clicked_position(self, px, py):
if self.all_corners_detected():
# inter/extrapolate from clicked point to marker position
px1 = self.corner_estimates['a']['pixel_coordinate'][0]
@ -118,12 +110,50 @@ class ArucoEstimator:
target_x = x1 + alpha * (x3 - x1)
target_y = y1 + beta * (y3 - y1)
target = np.array([target_x, target_y, 0])
print(f"target = ({target_x},{target_y})")
self.event_queue.put(('click', target))
target = np.array([target_x, target_y])
else:
print("not all markers have been detected!")
target = np.array([px, -py])
return target
def mouse_callback(self, event, px, py, flags, param):
"""
This function is called for each mouse event inside the opencv window.
If there are reference markers available we compute the real world position corresponding to the clicked
position and put it in an event queue.
:param event: type of event (mouse movement, left click, right click, etc.)
:param px: x-position of event
:param py: y-position of event
"""
target = None
if event == cv2.EVENT_LBUTTONDOWN:
self.drag_line_start = (px, py)
elif event == cv2.EVENT_LBUTTONUP:
self.drag_line_end = (px, py)
target_pos = self.compute_clicked_position(self.drag_line_start[0], self.drag_line_start[1])
if self.drag_line_start != self.drag_line_end:
# compute target angle for clicked position
facing_pos = self.compute_clicked_position(px, py)
v = facing_pos - target_pos
target_angle = math.atan2(v[1], v[0])
else:
# determine angle from previously clicked pos (= self.drag_line_end)
if self.previous_click is not None:
previous_pos = self.compute_clicked_position(self.previous_click[0], self.previous_click[1])
v = target_pos - previous_pos
target_angle = math.atan2(v[1], v[0])
else:
target_angle = 0.0
target = np.array([target_pos[0], target_pos[1], target_angle])
print(target)
self.previous_click = (px, py)
self.event_queue.put(('click', {'x': target[0], 'y': target[1], 'angle': target[2]}))
self.drag_line_start = None
elif event == cv2.EVENT_MOUSEMOVE:
if self.drag_line_start is not None:
self.drag_line_end = (px, py)
def run_tracking(self, draw_markers=True, draw_marker_coordinate_system=False, invert_grayscale=False):
"""
@ -132,6 +162,9 @@ class ArucoEstimator:
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
cv2.setMouseCallback('RoboRally', self.mouse_callback)
fps_display_rate = 1 # displays the frame rate every 1 second
fps_counter = 0
start_time = time.time()
try:
running = True
while running:
@ -157,16 +190,16 @@ class ArucoEstimator:
# extract data for detected markers
detected_marker_data = {}
for marker in detected_markers:
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
if marker.id >= 0:
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
if marker.id in self.corner_marker_ids.values():
marker.calculateExtrinsics(0.1, self.camparam)
marker.calculateExtrinsics(0.075, self.camparam)
else:
marker.calculateExtrinsics(0.07, self.camparam)
detected_marker_data[marker.id]['Rvec'] = marker.Rvec
detected_marker_data[marker.id]['Tvec'] = marker.Tvec
if marker.id >= 0: # draw markers onto the image
if draw_markers:
marker.draw(color_image, np.array([255, 255, 255]), 2, True)
@ -182,6 +215,20 @@ class ArucoEstimator:
color_image = self.draw_grid_lines(color_image, detected_marker_data)
color_image = self.draw_robot_pos(color_image, detected_marker_data)
# draw drag
if self.drag_line_start is not None and self.drag_line_end is not None:
color_image = cv2.line(color_image, self.drag_line_start, self.drag_line_end, color=(0, 0, 255), thickness=2)
# compute frame rate
fps_counter += 1
delta_t = time.time() - start_time
if delta_t > fps_display_rate:
fps_counter = 0
start_time = time.time()
color_image = cv2.putText(color_image, f"fps = {(fps_counter / delta_t):.2f}", (10, 25), cv2.FONT_HERSHEY_PLAIN, 2,
(0, 255, 255),
thickness=2)
# Show images
cv2.imshow('RoboRally', color_image)
key = cv2.waitKey(1)
@ -192,7 +239,7 @@ class ArucoEstimator:
self.draw_grid = not self.draw_grid
if key == ord('q'):
running = False
if key == ord('a'):
if key == ord('x'):
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
if color_sensor.get_option(rs.option.enable_auto_exposure) == 1.0:
color_sensor.set_option(rs.option.enable_auto_exposure, False)
@ -200,6 +247,8 @@ class ArucoEstimator:
else:
color_sensor.set_option(rs.option.enable_auto_exposure, True)
print("auto exposure ON")
if key == ord('i'):
invert_grayscale = not invert_grayscale
finally:
cv2.destroyAllWindows()
if self.pipeline is not None:
@ -244,7 +293,7 @@ class ArucoEstimator:
_, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat)
angle = -euler_angles[2][0] * np.pi / 180.0
self.robot_marker_estimates[marker_id] = {'t': t_image, 'x': x, 'y': y, 'angle': angle}
self.robot_marker_estimates[marker_id] = {'t': float(t_image), 'x': float(x), 'y': float(y), 'angle': float(angle)}
def all_corners_detected(self):
# checks if all corner markers have been detected at least once

View File

@ -0,0 +1,80 @@
import numpy as np
import time
from robot import ControlledRobot
from pid_controller import PIDController
from event_listener import EventListener
class CommanderBase:
def __init__(self):
self.robots = []
self.robots = [ControlledRobot(12, '10.10.11.91')] # , Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
# self.robots = [ControlledRobot(marker_id=13, ip='10.10.11.90'),
# ControlledRobot(marker_id=14, ip='10.10.11.89')]
for r in self.robots:
r.connect()
r.attach_controller(PIDController())
self.event_listener = EventListener(event_server=('127.0.0.1', 42424))
self.current_robot_index = 0
self.controlling = False
self.running = False
def run(self):
unconnected_robots = list(filter(lambda r: not r.connected, self.robots))
if len(unconnected_robots) > 0:
print(f"warning: could not connect to the following robots: {unconnected_robots}")
return
all_detected = False
while not all_detected:
undetected_robots = list(filter(lambda r: None in r.get_measurement(), self.robots))
all_detected = len(undetected_robots) == 0
if not all_detected:
print(f"warning: no measurements available for the following robots: {undetected_robots}")
time.sleep(0.5)
print("starting control")
self.running = True
while self.running:
while not self.event_listener.event_queue.empty():
event = self.event_listener.event_queue.get()
self.handle_event(event)
def handle_event(self, event):
# handle events from opencv window
print("event: ", event)
if event[0] == 'click':
target = event[1]
target_pos = np.array([target['x'], target['y'], target['angle']])
self.robots[self.current_robot_index].move_to_pos(target_pos)
elif event[0] == 'key':
key = event[1]
if key == 32: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
for r in self.robots:
r.stop_control()
else:
print("enable control")
for r in self.robots:
r.start_control()
elif key == 9: # TAB
# switch controlled robot
self.current_robot_index = (self.current_robot_index + 1) % len(self.robots)
robot = self.robots[self.current_robot_index]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
for r in self.robots:
r.stop_control()
self.running = False
if __name__ == '__main__':
rc = CommanderBase()
rc.run()

View File

@ -0,0 +1,59 @@
import threading
import time
class ControllerBase:
def __init__(self, control_rate=0.1):
self.robot = None
self.controlling = False
self.target_pos = None
self.control_thread = None
self.control_rate = control_rate
def control_loop(self):
while self.controlling:
if self.target_pos is not None:
state = self.get_measured_position()
control = self.compute_control(state)
self.apply_control(control)
time.sleep(self.control_rate)
self.apply_control((0.0, 0.0)) # stop robot
def set_target_position(self, target_pos):
self.target_pos = target_pos
def get_measured_position(self):
if self.robot is not None:
return self.robot.get_measurement()
else:
raise Exception("error: controller cannot get measurement!\n"
" there is no robot attached to the controller!")
def compute_control(self, state):
return 0.0, 0.0
def apply_control(self, control):
if self.robot is not None:
self.robot.send_cmd(u1=control[0], u2=control[1])
else:
raise Exception("error: controller cannot apply control!\n"
" there is no robot attached to the controller!")
def attach_robot(self, robot):
self.robot = robot
def start(self):
self.controlling = True
# start control thread
self.control_thread = threading.Thread(target=self.control_loop)
self.control_thread.start()
def stop(self):
# pause controller
self.controlling = False
if self.control_thread is not None:
self.control_thread.join()

View File

@ -0,0 +1,49 @@
import socket
import threading
import queue
import json
class EventListener:
def __init__(self, event_server):
self.event_thread = threading.Thread(target=self.receive_events)
self.event_thread.daemon = True # mark thread as daemon -> it terminates automatically when program shuts down
self.event_queue = queue.Queue()
self.receiving = False
# connect to event server
print(f"connecting to event server on {event_server} ...")
self.event_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP socket
try:
self.event_socket.connect(event_server)
self.event_socket.sendall(f"events\n".encode())
self.event_socket.settimeout(0.1)
# check if we receive data from the event server
response = self.event_socket.recv(1024)
if 'error' not in str(response):
print("... connected! -> start listening for events")
self.event_socket.settimeout(None)
# if so we start the event thread
self.event_thread.start()
else:
print(f"error: cannot communicate with the event server.\n The response was: {response}")
except socket.timeout:
print(f"error: the event server did not respond.")
except ConnectionRefusedError:
print(f"error: could not connect to event server at {event_server}.")
def receive_events(self):
self.receiving = True
while self.receiving:
received = str(self.event_socket.recv(1024), "utf-8")
if len(received) > 0:
events = received.split('\n')
for event_json in events:
if len(event_json) > 0:
event = json.loads(event_json)
self.event_queue.put(event)
else:
self.receiving = False
print("event server seems to have shut down -> stop listening")

View File

@ -2,12 +2,14 @@ import socket
HOST, PORT = "localhost", 42424
robot_id = 11
robot_id = 12
# SOCK_DGRAM is the socket type to use for UDP sockets
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(f"{robot_id}\n".encode(), (HOST, PORT))
while True:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id
#sock.sendall(f"events\n".encode()) # request events
receiving = True
while receiving:
received = str(sock.recv(1024), "utf-8")
print("Received: {}".format(received))
receiving = len(received) > 0

View File

@ -1,38 +1,60 @@
import socketserver
import threading
import time
import json
from aruco_estimator import ArucoEstimator
class MeasurementHandler(socketserver.BaseRequestHandler):
def handle(self) -> None:
data = self.request[0]
socket = self.request[1]
data = self.request.recv(1024).strip()
cur_thread = threading.current_thread()
print(f"current thread {cur_thread}")
if 'events' in data.decode():
self.request.sendall('subscribed to events\n'.encode())
# send input events
while True:
while not self.server.estimator.event_queue.empty():
event = self.server.estimator.event_queue.get()
self.request.sendall((json.dumps(event) + '\n').encode())
self.server.estimator.last_event = None
time.sleep(1.0 / self.server.max_measurements_per_second)
else:
# send robot position
try:
marker_id = int(data)
except ValueError:
marker_id = None
if marker_id is not None:
if marker_id in self.server.estimator.robot_marker_estimates:
while True:
socket.sendto(f"{self.server.estimator.robot_marker_estimates[marker_id]}\n".encode(),
self.client_address)
self.request.sendall((json.dumps(self.server.estimator.robot_marker_estimates[marker_id])
+ '\n').encode())
time.sleep(1.0 / self.server.max_measurements_per_second)
else:
socket.sendto("error: unknown robot marker id\n".encode(),
self.client_address)
except ValueError:
socket.sendto("error: data not understood. expected robot marker id (int)\n".encode(), self.client_address)
self.request.sendall("error: unknown robot marker id\n".encode())
else:
self.request.sendall("error: data not understood. "
"expected <robot marker id (int)> or 'events'\n".encode())
return
class MeasurementServer(socketserver.ThreadingMixIn, socketserver.UDPServer):
class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
def __init__(self, server_address, RequestHandlerClass, estimator, max_measurements_per_second=30):
super().__init__(server_address, RequestHandlerClass)
self.estimator = estimator
self.max_measurements_per_second = max_measurements_per_second
def handle_error(self, request, client_address):
print("an error occurred -> terminating connection")
if __name__ == "__main__":
aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14])
@ -43,4 +65,4 @@ if __name__ == "__main__":
max_measurements_per_second=30) as measurement_server:
measurement_server.serve_forever()
# receive with: nc 127.0.0.1 42424 -u -> 15 + Enter
# receive with: nc 127.0.0.1 42424 -> 12 + Enter

View File

@ -2,15 +2,17 @@ import numpy as np
import math
import time
from controller import ControllerBase
class PIDController:
def __init__(self, estimator):
self.t = time.time()
self.estimator = estimator
class PIDController(ControllerBase):
def __init__(self):
super().__init__()
self.t = None
self.controlling = False
self.P_angle = 0.4 # PID parameters determined by Ziegler-Nichols. measured K_crit = 1.4, T_crit = 1.5
self.P_angle = 0.4
self.I_angle = 0.35
self.D_angle = 0.1
@ -18,198 +20,53 @@ class PIDController:
self.I_pos = 0.3
self.D_pos = 0.1
self.mode = None
def move_to_pos(self, target_pos, robot, near_target_counter=5):
near_target = 0
while near_target < near_target_counter:
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
pass
elif event[0] == 'key':
key = event[1]
if key == 84: # arrow up
self.controlling = True
self.t = time.time()
elif key == 82: # arrow down
self.controlling = False
robot.send_cmd()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0])
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 113:
print("quit!")
self.controlling = False
robot.send_cmd()
return
elif key == 27: # escape
print("quit!")
self.controlling = False
robot.send_cmd()
return
x_pred = self.get_measurement(robot.id)
if x_pred is not None:
error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
# print("error pos = ", error_pos)
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
# if error_pos > 0.075 or error_ang > 0.35:
if error_pos > 0.05 or error_ang > 0.1:
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
# us1 = res[0]
# us2 = res[1]
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
# print("u = {}", (us1, us2))
# print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
# print("sleeping for {} seconds...".format(self.dt - dt_mpc))
time.sleep(self.dt - dt_mpc)
else:
us1 = [0] * self.mstep
us2 = [0] * self.mstep
near_target += 1
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
else:
print("robot not detected yet!")
def interactive_control(self, robots):
controlled_robot_number = 0
robot = robots[controlled_robot_number]
ts = []
angles = []
target_pos = np.array([0.0, 0.0, 0.0])
e_angle_old = 0.0
e_pos_old = 0.0
i = 0.0
i_angle = 0.0
i_pos = 0.0
t0 = time.time()
running = True
while running:
# handle events from opencv window
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
target_pos = event[1]
i_angle = 0
i_pos = 0
e_pos_old = 0
e_angle_old = 0
self.mode = 'combined'
elif event[0] == 'key':
key = event[1]
if key == 32: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
robot.send_cmd() # stop robot
self.mode = None # reset control mode
else:
print("enable control")
i = 0
self.t = time.time()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
elif key == 97: # a
self.mode = 'angle'
e_angle_old = 0
i = 0
self.t = time.time()
elif key == 99: # c
self.e_angle_old = 0.0
self.e_pos_old = 0.0
self.i = 0.0
self.i_angle = 0.0
self.i_pos = 0.0
def set_target_position(self, target_pos):
super(PIDController, self).set_target_position(target_pos)
self.mode = 'combined'
e_angle_old = 0
e_pos_old = 0
i_angle = 0
i_pos = 0
self.t = time.time()
elif key == 112: # p
self.mode = 'position'
e_pos_old = 0
i = 0
self.t = time.time()
elif key == 43: # +
self.P_pos += 0.1
print("P = ", self.P_angle)
elif key == 45: # -
self.P_pos -= 0.1
print("P = ", self.P_angle)
elif key == 9: # TAB
# switch controlled robot
robot.send_cmd() # stop current robot
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
robot = robots[controlled_robot_number]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
self.controlling = False
robot.send_cmd()
return
self.e_angle_old = 0.0
self.e_pos_old = 0.0
if self.controlling:
self.i = 0.0
self.i_angle = 0.0
self.i_pos = 0.0
def compute_control(self, state):
# measure state
x_pred = self.get_measurement(robot.id)
dt = self.t - time.time()
x_pred = state[1:]
if self.t is None:
dt = 0.1
else:
dt = time.time() - self.t
#print(f"x_pred = {x_pred}\ntarget = {target_pos}\nerror = {np.linalg.norm(target_pos - x_pred)}\n")
if self.mode == 'angle':
# compute angle such that robot faces to target point
target_angle = target_pos[2]
ts.append(time.time() - t0)
angles.append(x_pred[2])
target_angle = self.target_pos[2]
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
p = self.P_angle * e_angle
# i += self.I * dt * e # right Riemann sum
i += self.I_angle * dt * (e_angle + e_angle_old)/2.0 # trapezoidal rule
d = self.D_angle * (e_angle - e_angle_old)/dt
self.i += self.I_angle * dt * (e_angle + self.e_angle_old)/2.0 # trapezoidal rule
d = self.D_angle * (e_angle - self.e_angle_old)/dt
u1 = p - i - d
u1 = p + self.i + d
u2 = - u1
e_angle_old = e_angle
self.e_angle_old = e_angle
elif self.mode == 'combined':
# compute angle such that robot faces to target point
v = target_pos[0:2] - x_pred[0:2]
v = self.target_pos[0:2] - x_pred[0:2]
target_angle = math.atan2(v[1], v[0])
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
@ -219,10 +76,10 @@ class PIDController:
if e_pos < 0.05:
self.mode = 'angle'
e_angle_old = 0
e_pos_old = 0
i_angle = 0
i_pos = 0
self.e_angle_old = 0
self.e_pos_old = 0
self.i_angle = 0
self.i_pos = 0
u1 = 0
u2 = 0
else:
@ -235,33 +92,25 @@ class PIDController:
e_angle += np.pi
p_angle = self.P_angle * e_angle
i_angle += self.I_angle * dt * (e_angle + e_angle_old) / 2.0 # trapezoidal rule
d_angle = self.D_angle * (e_angle - e_angle_old) / dt
self.i_angle += self.I_angle * dt * (e_angle + self.e_angle_old) / 2.0 # trapezoidal rule
d_angle = self.D_angle * (e_angle - self.e_angle_old) / dt
p_pos = self.P_pos * e_pos
i_pos += self.I_pos * dt * (e_pos + e_pos_old) / 2.0 # trapezoidal rule
d_pos = self.D_pos * (e_pos - e_pos_old) / dt
self.i_pos += self.I_pos * dt * (e_pos + self.e_pos_old) / 2.0 # trapezoidal rule
d_pos = self.D_pos * (e_pos - self.e_pos_old) / dt
if forward:
print("forward")
u1 = p_angle + p_pos - i_angle - i_pos - d_angle - d_pos
u2 = - p_angle + p_pos + i_angle - i_pos + d_angle - d_pos
u1 = p_angle + p_pos + self.i_angle + self.i_pos + d_angle + d_pos
u2 = - p_angle + p_pos - self.i_angle + self.i_pos - d_angle + d_pos
else:
print("backward")
u1 = p_angle - p_pos - i_angle + i_pos - d_angle + d_pos
u2 = - p_angle - p_pos + i_angle + i_pos + d_angle + d_pos
u1 = p_angle - p_pos + self.i_angle - self.i_pos + d_angle - d_pos
u2 = - p_angle - p_pos - self.i_angle - self.i_pos - d_angle - d_pos
e_pos_old = e_pos
e_angle_old = e_angle
self.e_pos_old = e_pos
self.e_angle_old = e_angle
else:
u1 = 0.0
u2 = 0.0
#print(f"u = ({u1}, {u2})")
robot.send_cmd(u1, u2)
self.t = time.time() # save time when the most recent control was applied
time.sleep(0.1)
def get_measurement(self, robot_id):
return self.estimator.get_robot_state_estimate(robot_id)
return u1, u2

View File

@ -1,12 +1,19 @@
import socket
import threading
import json
class Robot:
def __init__(self, marker_id, ip):
def __init__(self, marker_id, ip, measurement_server=('127.0.0.1', 42424)):
self.id = marker_id
self.pos = None
self.euler = None
self.t_last_measurement = None
self.x = None
self.y = None
self.angle = None
self.ip = ip
self.port = 1234
self.socket = socket.socket()
# currently active control
@ -15,18 +22,47 @@ class Robot:
self.connected = False
self.measurement_server = measurement_server
self.measurement_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP socket
self.measurement_thread = threading.Thread(target=self.receive_measurements)
# mark thread as daemon -> it terminates automatically when program shuts down
self.measurement_thread.daemon = True
self.receiving = False
def connect(self):
# connect to robot
try:
print("connecting to robot {} with ip {} ...".format(self.id, self.ip))
self.socket.connect((self.ip, 1234)) # connect to robot
print(f"connecting to robot {self.ip} at {self.ip}:{self.port} ...")
self.socket.connect((self.ip, self.port)) # connect to robot
print("connected!")
self.connected = True
except socket.error:
print("could not connect to robot {} with ip {}".format(self.id, self.ip))
print(f"error: could not connect to robot {self.id} at {self.ip}:{self.port}")
# connect to measurement server
print(f"connecting to measurement server on {self.measurement_server} ...")
try:
self.measurement_socket.connect(self.measurement_server)
self.measurement_socket.sendall(f"{self.id}\n".encode())
self.measurement_socket.settimeout(0.1)
# check if we receive data from the measurement server
response = self.measurement_socket.recv(1024)
if 'error' not in str(response):
print("... connected! -> start listening for measurements")
self.measurement_socket.settimeout(None)
# if so we start the measurement thread
self.measurement_thread.start()
else:
print(f"error: cannot communicate with the measurement server.\n The response was: {response}")
except socket.timeout:
print(f"error: the measurement server did not respond with data.")
except ConnectionRefusedError:
print(f"error: could not connect to measurement server at {self.measurement_server}.")
def send_cmd(self, u1=0.0, u2=0.0):
if self.socket:
if self.socket and self.connected:
try:
self.socket.send(f'({u1},{u2})\n'.encode())
except BrokenPipeError:
@ -35,3 +71,60 @@ class Robot:
except ConnectionResetError:
print(f"error: connection to robot {self.id} with ip {self.ip} lost")
pass
else:
print(f"error: robot {self.id} is not connected to {self.ip}")
def receive_measurements(self):
self.receiving = True
while self.receiving:
received = str(self.measurement_socket.recv(1024), "utf-8")
if len(received) > 0:
last_received = received.split('\n')[-2]
measurement = json.loads(last_received)
self.t_last_measurement = measurement['t']
self.x = measurement['x']
self.y = measurement['y']
self.angle = measurement['angle']
else:
self.receiving = False
print(f"measurement server stopped sending data for robot {self.id}")
def get_measurement(self):
return self.t_last_measurement, self.x, self.y, self.angle
def __str__(self):
connection_state = '' if self.connected else 'not'
state = self.get_measurement()
last_measurement = f'last measurement = {state}' if None not in state else 'no measurements available'
return f"Robot {self.id}: ip = {self.ip}:{self.port} ({connection_state} connected) " + last_measurement
def __repr__(self):
return str(self)
class ControlledRobot(Robot):
def __init__(self, marker_id, ip):
super().__init__(marker_id, ip)
self.controller = None
def start_control(self):
if self.controller is not None:
self.controller.start()
else:
raise Exception("Error: Cannot start control: there is not controller attached to the robot!")
def stop_control(self):
if self.controller is not None:
self.controller.stop()
else:
raise Exception("Error: Cannot stop control: there is not controller attached to the robot!")
def attach_controller(self, controller):
self.controller = controller
self.controller.attach_robot(self)
def move_to_pos(self, target_pos):
if self.controller is not None:
self.controller.set_target_position(target_pos)
else:
raise Exception("Error: Cannot move to position: there is not controller attached to the robot!")