import socket import threading import json import numpy as np import time import math class Robot: def __init__(self, marker_id, ip, measurement_server=('127.0.0.1', 42424)): self.id = marker_id self.t_last_measurement = None self.x = None self.y = None self.angle = None self.ip = ip self.port = 1234 self.socket = socket.socket() # currently active control self.u1 = 0.0 self.u2 = 0.0 self.connected = False self.measurement_server = measurement_server self.measurement_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP socket self.measurement_thread = threading.Thread(target=self.receive_measurements) # mark thread as daemon -> it terminates automatically when program shuts down self.measurement_thread.daemon = True self.receiving = False def connect(self): # connect to robot try: print(f"connecting to robot {self.ip} at {self.ip}:{self.port} ...") self.socket.connect((self.ip, self.port)) # connect to robot print("connected!") self.connected = True except socket.error: print(f"error: could not connect to robot {self.id} at {self.ip}:{self.port}") # connect to measurement server print(f"connecting to measurement server on {self.measurement_server} ...") try: self.measurement_socket.connect(self.measurement_server) self.measurement_socket.sendall(f"{self.id}\n".encode()) self.measurement_socket.settimeout(0.1) # check if we receive data from the measurement server response = self.measurement_socket.recv(1024) if 'error' not in str(response): print("... connected! -> start listening for measurements") self.measurement_socket.settimeout(None) # if so we start the measurement thread self.measurement_thread.start() else: print(f"error: cannot communicate with the measurement server.\n The response was: {response}") except socket.timeout: print(f"error: the measurement server did not respond with data.") except ConnectionRefusedError: print(f"error: could not connect to measurement server at {self.measurement_server}.") def send_cmd(self, u1=0.0, u2=0.0): if self.socket and self.connected: try: self.socket.send(f'({u1},{u2})\n'.encode()) except BrokenPipeError: print(f"error: connection to robot {self.id} with ip {self.ip} lost") pass except ConnectionResetError: print(f"error: connection to robot {self.id} with ip {self.ip} lost") pass else: print(f"error: robot {self.id} is not connected to {self.ip}") def receive_measurements(self): self.receiving = True while self.receiving: received = str(self.measurement_socket.recv(1024), "utf-8") if len(received) > 0: last_received = received.split('\n')[-2] measurement = json.loads(last_received) self.t_last_measurement = measurement['t'] self.x = measurement['x'] self.y = measurement['y'] self.angle = measurement['angle'] else: self.receiving = False print(f"measurement server stopped sending data for robot {self.id}") def get_measurement(self): return self.t_last_measurement, self.x, self.y, self.angle def __str__(self): connection_state = '' if self.connected else ' not' state = self.get_measurement() last_measurement = f'last measurement = {state}' if None not in state else 'no measurements available' return f"Robot {self.id}: ip = {self.ip}:{self.port} ({connection_state} connected ) " + last_measurement def __repr__(self): return str(self) class ControlledRobot(Robot): def __init__(self, marker_id, ip): super().__init__(marker_id, ip) self.controller = None def start_control(self): if self.controller is not None: print("starting control") self.controller.start() else: raise Exception("Error: Cannot start control: there is not controller attached to the robot!") def stop_control(self): if self.controller is not None: print("stopping control") self.controller.stop() else: raise Exception("Error: Cannot stop control: there is not controller attached to the robot!") def attach_controller(self, controller): self.controller = controller self.controller.attach_robot(self) def move_to_pos(self, target_pos, blocking=False): if self.controller is not None: if not blocking: self.controller.set_target_position(target_pos) else: # only return after the robot has reached the target self.stop_control() self.controller.set_target_position(target_pos) self.start_control() close_to_target = False while not close_to_target: current_pos = np.array([self.x, self.y, self.angle]) v = target_pos - current_pos angles_unwrapped = np.unwrap([current_pos[2], target_pos[2]]) # unwrap angle to avoid jump in data e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference e_pos = np.linalg.norm(v[0:2]) #print(f"target_pos = {target_pos}, current_pos = {current_pos}, e_pos = {e_pos}, e_ang = {e_angle}") print(f"e_pos = {e_pos}, e_ang = {e_angle}") close_to_target = e_pos < 0.05 and abs(e_angle) < 0.1 time.sleep(0.1) print("target reached!") self.stop_control() else: raise Exception("Error: Cannot move to position: there is not controller attached to the robot!")