switched to using TCP sockets for measurement and event communication

This commit is contained in:
Simon Pirkelmann 2020-11-11 21:33:48 +01:00
parent 8fc5ad9868
commit eff5474ac2
4 changed files with 142 additions and 27 deletions

View File

@ -0,0 +1,47 @@
import socket
import threading
import queue
import json
class EventListener:
def __init__(self, event_server):
self.event_thread = threading.Thread(target=self.receive_events)
self.event_queue = queue.Queue()
# connect to event server
print(f"connecting to event server on {event_server} ...")
self.event_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP socket
try:
self.event_socket.connect(event_server)
self.event_socket.sendall(f"events\n".encode())
self.event_socket.settimeout(0.1)
# check if we receive data from the event server
response = self.event_socket.recv(1024)
if not 'error' in str(response):
print("... connected! -> start listening for events")
self.event_socket.settimeout(None)
# if so we start the event thread
self.event_thread.start()
else:
print(f"error: cannot communicate with the event server.\n The response was: {response}")
except socket.timeout:
print(f"error: the event server did not respond.")
except ConnectionRefusedError:
print(f"error: could not connect to event server at {event_server}.")
def receive_events(self):
receiving = True
while receiving:
received = str(self.event_socket.recv(1024), "utf-8")
if len(received) > 0:
events = received.split('\n')
for event_json in events:
if len(event_json) > 0:
event = json.loads(event_json)
self.event_queue.put(event)
else:
receiving = False
print("event server seems to have shut down -> stop listening")

View File

@ -2,12 +2,14 @@ import socket
HOST, PORT = "localhost", 42424 HOST, PORT = "localhost", 42424
robot_id = 11 robot_id = 12
# SOCK_DGRAM is the socket type to use for UDP sockets sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.connect((HOST, PORT))
sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id
sock.sendto(f"{robot_id}\n".encode(), (HOST, PORT)) #sock.sendall(f"events\n".encode()) # request events
while True: receiving = True
while receiving:
received = str(sock.recv(1024), "utf-8") received = str(sock.recv(1024), "utf-8")
print("Received: {}".format(received)) print("Received: {}".format(received))
receiving = len(received) > 0

View File

@ -1,41 +1,56 @@
import socketserver import socketserver
import threading import threading
import time import time
import json
from aruco_estimator import ArucoEstimator from aruco_estimator import ArucoEstimator
class MeasurementHandler(socketserver.BaseRequestHandler): class MeasurementHandler(socketserver.BaseRequestHandler):
def handle(self) -> None: def handle(self) -> None:
data = self.request[0] self.data = self.request.recv(1024).strip()
socket = self.request[1]
cur_thread = threading.current_thread() cur_thread = threading.current_thread()
print(f"current thread {cur_thread}") print(f"current thread {cur_thread}")
if 'events' in self.data.decode():
self.request.sendall('subscribed to events\n'.encode())
# send input events
while True:
while not self.server.estimator.event_queue.empty():
event = self.server.estimator.event_queue.get()
self.request.sendall((json.dumps(event) + '\n').encode())
else:
# send robot position
try: try:
marker_id = int(data) marker_id = int(self.data)
except ValueError:
marker_id = None
if marker_id is not None:
if marker_id in self.server.estimator.robot_marker_estimates: if marker_id in self.server.estimator.robot_marker_estimates:
while True: while True:
socket.sendto(f"{self.server.estimator.robot_marker_estimates[marker_id]}\n".encode(), self.request.sendall((json.dumps(self.server.estimator.robot_marker_estimates[marker_id]) + '\n').encode())
self.client_address)
time.sleep(1.0 / self.server.max_measurements_per_second) time.sleep(1.0 / self.server.max_measurements_per_second)
else: else:
socket.sendto("error: unknown robot marker id\n".encode(), self.request.sendall("error: unknown robot marker id\n".encode())
self.client_address) else:
except ValueError: self.request.sendall("error: data not understood. expected <robot marker id (int)> or 'events'\n".encode())
socket.sendto("error: data not understood. expected robot marker id (int)\n".encode(), self.client_address)
return return
class MeasurementServer(socketserver.ThreadingMixIn, socketserver.UDPServer): class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
def __init__(self, server_address, RequestHandlerClass, estimator, max_measurements_per_second=30): def __init__(self, server_address, RequestHandlerClass, estimator, max_measurements_per_second=30):
super().__init__(server_address, RequestHandlerClass) super().__init__(server_address, RequestHandlerClass)
self.estimator = estimator self.estimator = estimator
self.max_measurements_per_second = max_measurements_per_second self.max_measurements_per_second = max_measurements_per_second
def handle_error(self, request, client_address):
print("an error occurred -> terminating connection")
if __name__ == "__main__": if __name__ == "__main__":
aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14]) aruco_estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[11, 12, 13, 14])
estimator_thread = threading.Thread(target=aruco_estimator.run_tracking, kwargs={'invert_grayscale': True}) estimator_thread = threading.Thread(target=aruco_estimator.run_tracking, kwargs={'invert_grayscale': True})
estimator_thread.start() estimator_thread.start()
@ -43,4 +58,4 @@ if __name__ == "__main__":
max_measurements_per_second=30) as measurement_server: max_measurements_per_second=30) as measurement_server:
measurement_server.serve_forever() measurement_server.serve_forever()
# receive with: nc 127.0.0.1 42424 -u -> 15 + Enter # receive with: nc 127.0.0.1 42424 -> 12 + Enter

View File

@ -1,10 +1,16 @@
import socket import socket
import threading
import json
class Robot: class Robot:
def __init__(self, marker_id, ip): def __init__(self, marker_id, ip, measurement_server=('127.0.0.1', 42424)):
self.id = marker_id self.id = marker_id
self.pos = None
self.euler = None self.t_last_measurement = None
self.x = None
self.y = None
self.angle = None
self.ip = ip self.ip = ip
self.socket = socket.socket() self.socket = socket.socket()
@ -15,6 +21,11 @@ class Robot:
self.connected = False self.connected = False
self.measurement_server = measurement_server
self.measurement_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP socket
self.measurement_thread = threading.Thread(target=self.receive_measurements)
def connect(self): def connect(self):
# connect to robot # connect to robot
try: try:
@ -23,7 +34,30 @@ class Robot:
print("connected!") print("connected!")
self.connected = True self.connected = True
except socket.error: except socket.error:
print("could not connect to robot {} with ip {}".format(self.id, self.ip)) print("error: could not connect to robot {} with ip {}".format(self.id, self.ip))
# connect to measurement server
print(f"connecting to measurement server on {self.measurement_server} ...")
try:
self.measurement_socket.connect(self.measurement_server)
self.measurement_socket.sendall(f"{self.id}\n".encode())
self.measurement_socket.settimeout(0.1)
# check if we receive data from the measurement server
response = self.measurement_socket.recv(1024)
if not 'error' in str(response):
print("... connected! -> start listening for events")
self.measurement_socket.settimeout(None)
# if so we start the measurement thread
self.measurement_thread.start()
else:
print(f"error: cannot communicate with the measurement server.\n The response was: {response}")
except socket.timeout:
print(f"error: the measurement server did not respond with data.")
except ConnectionRefusedError:
print(f"error: could not connect to measurement server at {self.measurement_server}.")
def send_cmd(self, u1=0.0, u2=0.0): def send_cmd(self, u1=0.0, u2=0.0):
if self.socket: if self.socket:
@ -35,3 +69,20 @@ class Robot:
except ConnectionResetError: except ConnectionResetError:
print(f"error: connection to robot {self.id} with ip {self.ip} lost") print(f"error: connection to robot {self.id} with ip {self.ip} lost")
pass pass
def receive_measurements(self):
receiving = True
while receiving:
received = str(self.measurement_socket.recv(1024), "utf-8")
if len(received) > 0:
measurement = json.loads(received)
self.t_last_measurement = measurement['t']
self.x = measurement['x']
self.y = measurement['y']
self.angle = measurement['angle']
else:
receiving = False
print(f"measurement server stopped sending data for robot {self.id}")
def get_measurement(self):
return (self.t_last_measurement, self.x, self.y, self.angle)