import machine import sys from machine import I2C, Pin import d1motor import time import usocket import esp class Motor: def __init__(self, enable, dir1, dir2): self.enable_pin = machine.Pin(enable, machine.Pin.OUT) self.enable_pwm = machine.PWM(self.enable_pin) self.enable_pwm.freq(250) self.dir1_pin = machine.Pin(dir1, machine.Pin.OUT) self.dir2_pin = machine.Pin(dir2, machine.Pin.OUT) self.direction = 1 # default direction: dir1_pin = HIGH, dir2_pin = LOW self.reverse() def reverse(self): self.direction = not self.direction self.dir1_pin.value(self.direction) self.dir2_pin.value(not self.direction) def speed(self, value): if value > 0: # forward if not self.direction: # switch direction if necessary self.reverse() else: # backward if self.direction: # switch direction if necessary self.reverse() # apply value as pwm signal self.enable_pwm.duty(int(abs(value)*10.23)) class Robot: def __init__(self): i2c = False if i2c: print("setting up I2C ...") #d1 = Pin(5) #d2 = Pin(4) #i2c = I2C(scl=d1, sda=d2) #i2c_scan = i2c.scan() i2c_scan = [] if len(i2c_scan) > 0: i2c_addr = i2c_scan[0] print("i2c scan = {}".format(i2c_addr)) print("setting up motors ...") self.m1 = d1motor.Motor(0, i2c, address=i2c_addr) self.m2 = d1motor.Motor(1, i2c, address=i2c_addr) self.m1.speed(0) self.m2.speed(0) print("motor setup complete!") else: print("error: no i2c interfaces found!") sys.exit(1) else: self.m1 = Motor(14, 13, 12) self.m2 = Motor(5, 0, 4) ip = my_ip[0] # setup socket for remote control self.addr = usocket.getaddrinfo(ip, 1234)[0][-1] def remote_control(self): while True: print("setting up socket communication ...") socket_setup_complete = False while not socket_setup_complete: try: print("trying to create socket on address {}".format(self.addr)) socket = usocket.socket() socket.bind(self.addr) print("socket setup complete!") socket_setup_complete = True except Exception as e: print("could not create socket. error msg: {}\nwaiting 1 sec and retrying...".format(e)) time.sleep(1.0) print("waiting for connections on {} ...".format(self.addr)) socket.listen(1) res = socket.accept() # this blocks until someone connects to the socket comm_socket = res[0] print("connected!") listening = True try: while listening: # expected data: '(t, u1, u2)'\n" # where ui = control for motor i # ui \in [-1.0, 1.0] data = comm_socket.readline() data_str = data.decode() print("Data received: {}".format(data_str)) print("processing data = {}".format(data_str)) l = data_str.strip('()\n').split(',') print("l = {}".format(l)) u1 = int(float(l[0])*100) print("u1 = {}".format(u1)) u2 = int(float(l[1])*100) print("u2 = {}".format(u2)) self.m1.speed(u1) self.m2.speed(u2) except ValueError: print("ValueError: Data has wrong format.") print("Data received: {}".format(data_str)) except IndexError: print("IndexError: Data has wrong format.") print("Data received: {}".format(data_str)) except Exception as e: print("Some other error occured") print("Exception: {}".format(e)) finally: print("Shutting down ...") u1 = u2 = 0 self.m1.speed(u1) self.m2.speed(u2) listening = False comm_socket.close() socket.close() del comm_socket del socket wall_e = Robot() wall_e.remote_control()