From 85e201937073af96665ab9a7953cda1d7a17c922 Mon Sep 17 00:00:00 2001 From: spirkelmann Date: Wed, 26 Jun 2019 10:48:23 +0200 Subject: [PATCH] simplified structure of the program and added motor class for L293D driver --- micropython_firmware/main.py | 218 ++++++++++++++--------------------- 1 file changed, 86 insertions(+), 132 deletions(-) diff --git a/micropython_firmware/main.py b/micropython_firmware/main.py index 9790ee5..b949b4a 100644 --- a/micropython_firmware/main.py +++ b/micropython_firmware/main.py @@ -4,41 +4,68 @@ from machine import I2C, Pin import d1motor -import utime +import time import usocket -import uselect import esp +class Motor: + def __init__(self, enable, dir1, dir2): + self.enable_pin = machine.Pin(enable, machine.Pin.OUT) + self.enable_pwm = machine.PWM(self.enable_pin) + self.enable_pwm.freq(500) + + self.dir1_pin = machine.Pin(dir1, machine.Pin.OUT) + self.dir2_pin = machine.Pin(dir2, machine.Pin.OUT) + + self.direction = 1 # default direction: dir1_pin = HIGH, dir2_pin = LOW + self.reverse() + + def reverse(self): + self.direction = not self.direction + self.dir1_pin.value(self.direction) + self.dir2_pin.value(not self.direction) + + def speed(self, value): + if value > 0: # forward + if not self.direction: # switch direction if necessary + self.reverse() + else: # backward + if self.direction: # switch direction if necessary + self.reverse() + # apply value as pwm signal + self.enable_pwm.duty(abs(value)*10) + + class Robot: def __init__(self): - print("setting up I2C ...") - d1 = Pin(5) - d2 = Pin(4) - i2c = I2C(scl=d1, sda=d2) - i2c_scan = i2c.scan() - if len(i2c_scan) > 0: - i2c_addr = i2c_scan[0] - print("i2c scan = {}".format(i2c_addr)) - print("setting up motors ...") - self.m1 = d1motor.Motor(0, i2c, address=i2c_addr) - self.m2 = d1motor.Motor(1, i2c, address=i2c_addr) - self.m1.speed(0) - self.m2.speed(0) - print("motor setup complete!") + i2c = False + if i2c: + print("setting up I2C ...") + #d1 = Pin(5) + #d2 = Pin(4) + #i2c = I2C(scl=d1, sda=d2) + #i2c_scan = i2c.scan() + i2c_scan = [] + if len(i2c_scan) > 0: + i2c_addr = i2c_scan[0] + print("i2c scan = {}".format(i2c_addr)) + print("setting up motors ...") + self.m1 = d1motor.Motor(0, i2c, address=i2c_addr) + self.m2 = d1motor.Motor(1, i2c, address=i2c_addr) + self.m1.speed(0) + self.m2.speed(0) + print("motor setup complete!") + else: + print("error: no i2c interfaces found!") + sys.exit(1) else: - print("error: no i2c interfaces found!") - sys.exit(1) + self.m1 = Motor(14, 13, 12) + self.m2 = Motor(5, 0, 4) ip = my_ip[0] # setup socket for remote control self.addr = usocket.getaddrinfo(ip, 1234)[0][-1] - self.poller = uselect.poll() - self.poller_timeout = 2 # timeout in ms - - self.control_queue = [] - - def remote_control(self): while True: print("setting up socket communication ...") @@ -52,124 +79,51 @@ class Robot: socket_setup_complete = True except Exception as e: print("could not create socket. error msg: {}\nwaiting 1 sec and retrying...".format(e)) - utime.sleep(1.0) + time.sleep(1.0) print("waiting for connections on {} ...".format(self.addr)) socket.listen(1) res = socket.accept() # this blocks until someone connects to the socket comm_socket = res[0] - self.poller.register(comm_socket, uselect.POLLIN) print("connected!") listening = True - - duration_current = 0 - t_current = utime.ticks_ms() - duration_next = None - stopped = True - timeouts = 0 - while listening: - elapsed = utime.ticks_ms() - remaining = duration_current - (elapsed-t_current) - if remaining >= 0: - timeouts = 0 - print("start of loop\n I have {} ms until next control needs to be applied".format(remaining)) - else: - # current control timed out -> applying next control - if len(self.control_queue) > 0: - print("previous control applied for {} ms too long".format(elapsed - t_current - duration_current)) - u_next = self.control_queue.pop(0) - #print("duration of previous control = {}".format((elapsed - t_current)/1000.0)) - #print("applying new control (duration, u1, u2) = ({}, {}, {})".format(duration_next, u1_next, u2_next)) - # if so, apply it - self.m1.speed(u_next[0]) - self.m2.speed(u_next[1]) - - t_current = utime.ticks_ms() - duration_current = duration_next - - stopped = False - elif not stopped: - print("previous control applied for {} ms too long".format(elapsed - t_current - duration_current)) - #print("duration of previous control = {}".format((elapsed - t_current)/1000.0)) - # no new control available -> shutdown - print("no new control available -> stopping") - - self.m1.speed(0) - self.m2.speed(0) - - t_current = utime.ticks_ms() - duration_current = 0 # as soon as new control will become available we directly want to apply it immediately - - stopped = True - - #elif timeouts < 10: - # print("start of loop\n I have {} ms until next control needs to be applied, timeouts = {}".format(remaining, timeouts)) - # timeouts = timeouts + 1 - - trecv_start = utime.ticks_ms() - - # expected data: '(t, u1_0, u2_0, u1_1, u2_1, ...)'\n" - # where ui = control for motor i - # ui \in [-1.0, 1.0] - #print("poller waiting..") - poll_res = self.poller.poll(self.poller_timeout) # wait 100 milliseconds for socket data - if poll_res: - print("new data available") - try: + try: + while listening: + # expected data: '(t, u1, u2)'\n" + # where ui = control for motor i + # ui \in [-1.0, 1.0] data = comm_socket.readline() data_str = data.decode() - #print("Data received: {}".format(data_str)) - #print("processing data = {}".format(data_str)) + print("Data received: {}".format(data_str)) + print("processing data = {}".format(data_str)) l = data_str.strip('()\n').split(',') - #print("l = {}".format(l)) - duration_next = int(float(l[0])*1000) - #print("duration = {}".format(duration_next)) - self.control_queue = [] - print("putting data into queue") - for i in range((len(l)-1)/2): - u1_next = int(float(l[2*i+1])*100) - print("u1 = {}".format(u1_next)) - u2_next = int(float(l[2*i+2])*100) - print("u2 = {}".format(u2_next)) - self.control_queue.append((u1_next, u2_next)) - except ValueError: - print("ValueError: Data has wrong format.") - print("Data received: {}".format(data_str)) - print("Shutting down ...") - self.control_queue = [] - duration_current = 0 - listening = False - comm_socket.close() - socket.close() - del comm_socket - del socket - print("disconnected!") - except IndexError: - print("IndexError: Data has wrong format.") - print("Data received: {}".format(data_str)) - print("Shutting down ...") - self.control_queue = [] - duration_current = 0 - listening = False - comm_socket.close() - socket.close() - del comm_socket - del socket - print("disconnected!") - except Exception as e: - print("Some other error occured") - print("Exception: {}".format(e)) - print("Shutting down ...") - self.control_queue = [] - duration_current = 0 - listening = False - comm_socket.close() - socket.close() - del comm_socket - del socket - print("disconnected!") - trecv_end = utime.ticks_ms() - print("communication (incl. polling) took {} ms".format(trecv_end - trecv_start)) + print("l = {}".format(l)) + u1 = int(float(l[0])*100) + print("u1 = {}".format(u1)) + u2 = int(float(l[1])*100) + print("u2 = {}".format(u2)) + + self.m1.speed(u1) + self.m2.speed(u2) + except ValueError: + print("ValueError: Data has wrong format.") + print("Data received: {}".format(data_str)) + except IndexError: + print("IndexError: Data has wrong format.") + print("Data received: {}".format(data_str)) + except Exception as e: + print("Some other error occured") + print("Exception: {}".format(e)) + finally: + print("Shutting down ...") + u1 = u2 = 0 + self.m1.speed(u1) + self.m2.speed(u2) + listening = False + comm_socket.close() + socket.close() + del comm_socket + del socket wall_e = Robot() wall_e.remote_control()