import machine import sys from machine import I2C, Pin import d1motor import utime import usocket import uselect import esp class Robot: def __init__(self): print("setting up I2C ...") d1 = Pin(5) d2 = Pin(4) i2c = I2C(scl=d1, sda=d2) i2c_scan = i2c.scan() if len(i2c_scan) > 0: i2c_addr = i2c_scan[0] print("i2c scan = {}".format(i2c_addr)) print("setting up motors ...") self.m1 = d1motor.Motor(0, i2c, address=i2c_addr) self.m2 = d1motor.Motor(1, i2c, address=i2c_addr) self.m1.speed(0) self.m2.speed(0) print("motor setup complete!") else: print("error: no i2c interfaces found!") sys.exit(1) ip = my_ip[0] # setup socket for remote control self.addr = usocket.getaddrinfo(ip, 1234)[0][-1] self.poller = uselect.poll() self.poller_timeout = 2 # timeout in ms def remote_control(self): while True: print("setting up socket communication ...") socket_setup_complete = False while not socket_setup_complete: try: print("trying to create socket on address {}".format(self.addr)) socket = usocket.socket() socket.bind(self.addr) print("socket setup complete!") socket_setup_complete = True except Exception as e: print("could not create socket. error msg: {}\nwaiting 1 sec and retrying...".format(e)) utime.sleep(1.0) print("waiting for connections on {} ...".format(self.addr)) socket.listen(1) res = socket.accept() # this blocks until someone connects to the socket comm_socket = res[0] self.poller.register(comm_socket, uselect.POLLIN) print("connected!") listening = True duration_current = 0 t_current = utime.ticks_ms() duration_next = None u1_next = None u2_next = None stopped = True timeouts = 0 while listening: elapsed = utime.ticks_ms() remaining = duration_current - (elapsed-t_current) if remaining >= 0: timeouts = 0 print("start of loop\n I have {} ms until next control needs to be applied".format(remaining)) elif timeouts < 10: print("start of loop\n I have {} ms until next control needs to be applied, timeouts = {}".format(remaining, timeouts)) timeouts = timeouts + 1 trecv_start = utime.ticks_ms() # expected data: '(t, u1, u2)'\n" # where ui = control for motor i # ui \in [-1.0, 1.0] #print("poller waiting..") poll_res = self.poller.poll(self.poller_timeout) # wait 100 milliseconds for socket data if poll_res: print("new data available") try: data = comm_socket.readline() data_str = data.decode() #print("Data received: {}".format(data_str)) #print("processing data = {}".format(data_str)) l = data_str.strip('()\n').split(',') #print("l = {}".format(l)) duration_next = int(float(l[0])*1000) #print("duration = {}".format(duration_next)) u1_next = int(float(l[1])*100) #print("u1 = {}".format(u1_next)) u2_next = int(float(l[2])*100) #print("u2 = {}".format(u2_next)) except ValueError: print("ValueError: Data has wrong format.") print("Data received: {}".format(data_str)) print("Shutting down ...") u1_next = u2_next = 0 duration_current = 0 listening = False comm_socket.close() socket.close() del comm_socket del socket print("disconnected!") except IndexError: print("IndexError: Data has wrong format.") print("Data received: {}".format(data_str)) print("Shutting down ...") u1_next = u2_next = 0 duration_current = 0 listening = False comm_socket.close() socket.close() del comm_socket del socket print("disconnected!") except Exception as e: print("Some other error occured") print("Exception: {}".format(e)) print("Shutting down ...") u1_next = u2_next = 0 duration_current = 0 listening = False comm_socket.close() socket.close() del comm_socket del socket print("disconnected!") trecv_end = utime.ticks_ms() print("communication (incl. polling) took {} ms".format(trecv_end - trecv_start)) elapsed = utime.ticks_ms() if elapsed - t_current >= duration_current: # check if duration of current control has timed out if u1_next is not None: # check if new control is available print("previous control applied for {} ms too long".format(elapsed - t_current - duration_current)) #print("duration of previous control = {}".format((elapsed - t_current)/1000.0)) #print("applying new control (duration, u1, u2) = ({}, {}, {})".format(duration_next, u1_next, u2_next)) # if so, apply it self.m1.speed(u1_next) self.m2.speed(u2_next) t_current = utime.ticks_ms() duration_current = duration_next # reset next control u1_next = None u2_next = None duration_next = None stopped = False elif not stopped: #print("previous control applied for {} ms too long".format(elapsed - t_current - duration_current)) #print("duration of previous control = {}".format((elapsed - t_current)/1000.0)) # no new control available -> shutdown #print("no new control available -> stopping") #self.m1.speed(0) #self.m2.speed(0) t_current = utime.ticks_ms() duration_current = 0 # as soon as new control will become available we directly want to apply it immediately stopped = True wall_e = Robot() wall_e.remote_control()