import sys import socket import time import numpy as np import pygame from threading import Thread, Lock import scipy.integrate single_robot = True class Server: def __init__(self): self.ip = '127.0.0.1' self.recv_port = 1337 # port for receiving data self.send_port = 1338 # port for sending data self.recv_addr = socket.getaddrinfo(self.ip, self.recv_port)[0][-1] if not single_robot: self.robots = [] else: self.rob = Robot() self.receiver = Thread(target=self.remote_control) self.receiver.start() def remote_control(self): print("setting up socket communication ...") recv_socket = socket.socket() recv_socket.bind(self.recv_addr) while True: print("waiting for connections on {} ...".format(self.recv_addr)) recv_socket.listen(2) (comm_socket, address) = recv_socket.accept() # this blocks until someone connects to the socket print("starting thread for handling communication with {}".format(address)) Thread(target=self.handle_communication, args=(comm_socket, address)).start() def handle_communication(self, socket, address): print("connected!") listening = True if not single_robot: rob = Robot() self.robots.append(rob) try: while listening: # expected data: '(u1, u2)'\n" # where ui = control for motor i # ui \in [-1.0, 1.0] data = socket.recv(4096) data_str = data.decode() #print("Data received: {}".format(data_str)) #print("processing data = {}".format(data_str)) l = data_str.strip('()\n').split(',') #print("l = {}".format(l)) u1 = float(l[0]) #print("u1 = {}".format(u1)) u2 = float(l[1]) #print("u2 = {}".format(u2)) if single_robot: self.rob.set_control(u1, u2) else: rob.set_control(u1, u2) except ValueError: print("ValueError: Data has wrong format.") print("Data received: {}".format(data_str)) except IndexError: print("IndexError: Data has wrong format.") print("Data received: {}".format(data_str)) except Exception as e: print("Some other error occured") print("Exception: {}".format(e)) finally: socket.close() if not single_robot: self.robots.remove(rob) class Robot: def __init__(self): self.x = np.array([0.0, 0.0, 0.0]) self.u1 = 0.0 self.u2 = 0.0 self.r = 0.3 self.R = 0.5 self.d = 0.2 self.mutex = Lock() self.omega_max = 50.0 self.integrator = scipy.integrate.ode(self.f) self.integrator.set_integrator('dopri5') self.simulator = Thread(target=self.simulate) self.simulator.start() def set_control(self, u1, u2): self.mutex.acquire() try: self.u1 = u1 self.u2 = u2 finally: self.mutex.release() def simulate(self): t0 = time.time() t = time.time()-t0 while True: self.mutex.acquire() try: self.integrator.set_f_params(self.omega_max * np.array([self.u1, self.u2])) finally: self.mutex.release() self.integrator.set_initial_value(self.x, t) dt = time.time() - t0 - t self.x = self.integrator.integrate(self.integrator.t + dt) t = time.time() - t0 def f(self, t, x, u): r = self.r d = self.d R = self.R theta = x[2] omegar = u[0] omegal = u[1] dx = np.zeros((3,1)) dx[0] = (r/2.0 * np.cos(theta) - r * d/(2.0 * R) * np.sin(theta)) * omegar + \ (r/2.0 * np.cos(theta) + r * d/(2.0 * R) * np.sin(theta)) * omegal dx[1] = (r/2.0 * np.sin(theta) + r * d/(2.0 * R) * np.cos(theta)) * omegar + \ (r/2.0 * np.sin(theta) - r * d/(2.0 * R) * np.cos(theta)) * omegal dx[2] = r/(2.0 * R) * (omegar - omegal) return dx def main(args): # wait for connections of socket s = Server() background_colour = (255, 255, 255) (width, height) = (640, 480) screen = pygame.display.set_mode((width, height)) pygame.display.set_caption('Roboflut') screen.fill(background_colour) pygame.display.flip() running = True while running: r = s.rob #for r in s.robots: x = int(r.x[0] + width/2.0) y = int(r.x[1] + height / 2.0) phi = r.x[2] phases = ( phi + np.pi) % (2 * np.pi ) - np.pi if x < 0 or x > width or y < 0 or y > height: r.x[0] = 0.0 r.x[1] = 0.0 c = int((phases + 2.0 * np.pi) / (4.0 * np.pi) * 255) #color = (c, c, c) color = (0,0,0) screen.set_at((x, y), color) pygame.display.flip() if __name__ == '__main__': main(sys.argv)