from time import sleep from utime import sleep_ms import gc PA_OUTPUT_RFO_PIN = 0 PA_OUTPUT_PA_BOOST_PIN = 1 # registers REG_FIFO = 0x00 REG_OP_MODE = 0x01 REG_FRF_MSB = 0x06 REG_FRF_MID = 0x07 REG_FRF_LSB = 0x08 REG_PA_CONFIG = 0x09 REG_LNA = 0x0c REG_FIFO_ADDR_PTR = 0x0d REG_IMAGE_CAL = 0x3b REG_TEMP = 0x3c REG_FIFO_TX_BASE_ADDR = 0x0e FifoTxBaseAddr = 0x00 # FifoTxBaseAddr = 0x80 REG_FIFO_RX_BASE_ADDR = 0x0f FifoRxBaseAddr = 0x00 REG_FIFO_RX_CURRENT_ADDR = 0x10 REG_IRQ_FLAGS_MASK = 0x11 REG_IRQ_FLAGS = 0x12 REG_RX_NB_BYTES = 0x13 REG_PKT_RSSI_VALUE = 0x1a REG_PKT_SNR_VALUE = 0x1b REG_MODEM_CONFIG_1 = 0x1d REG_MODEM_CONFIG_2 = 0x1e REG_PREAMBLE_MSB = 0x20 REG_PREAMBLE_LSB = 0x21 REG_PAYLOAD_LENGTH = 0x22 REG_FIFO_RX_BYTE_ADDR = 0x25 REG_MODEM_CONFIG_3 = 0x26 REG_RSSI_WIDEBAND = 0x2c REG_DETECTION_OPTIMIZE = 0x31 REG_DETECTION_THRESHOLD = 0x37 REG_SYNC_WORD = 0x39 REG_DIO_MAPPING_1 = 0x40 REG_VERSION = 0x42 # modes MODE_LONG_RANGE_MODE = 0x80 # bit 7: 1 => LoRa mode MODE_SLEEP = 0x00 MODE_STDBY = 0x01 MODE_TX = 0x03 MODE_FSRX = 0x04 MODE_RX_CONTINUOUS = 0x05 MODE_RX_SINGLE = 0x06 # PA config PA_BOOST = 0x80 # IRQ masks IRQ_TX_DONE_MASK = 0x08 IRQ_PAYLOAD_CRC_ERROR_MASK = 0x20 IRQ_RX_DONE_MASK = 0x40 IRQ_RX_TIME_OUT_MASK = 0x80 # Buffer size MAX_PKT_LENGTH = 255 def mybin(val): return "0b{:0>{w}}".format(bin(val)[2:],w=8) class SX127x: def __init__(self, name = 'SX127x', parameters = {'frequency': 868E6, 'tx_power_level': 2, 'signal_bandwidth': 125E3, 'spreading_factor': 8, 'coding_rate': 5, 'preamble_length': 8, 'implicitHeader': False, 'sync_word': 0x12, 'enable_CRC': False}, onReceive = None): self.name = name self.parameters = parameters self._onReceive = onReceive self._lock = False def init(self, parameters = None): if parameters: self.parameters = parameters init_try = True re_try = 0 # check version while(init_try and re_try < 5): version = self.readRegister(REG_VERSION) re_try = re_try + 1 if(version != 0): init_try = False; if version != 0x12: raise Exception('Invalid version.') # put in LoRa and sleep mode self.sleep() # config self.setFrequency(self.parameters['frequency']) self.setSignalBandwidth(self.parameters['signal_bandwidth']) # set LNA boost self.writeRegister(REG_LNA, self.readRegister(REG_LNA) | 0x03) # set auto AGC self.writeRegister(REG_MODEM_CONFIG_3, 0x04) self.setTxPower(self.parameters['tx_power_level']) self._implicitHeaderMode = None self.implicitHeaderMode(self.parameters['implicitHeader']) self.setSpreadingFactor(self.parameters['spreading_factor']) self.setCodingRate(self.parameters['coding_rate']) self.setPreambleLength(self.parameters['preamble_length']) self.setSyncWord(self.parameters['sync_word']) self.enableCRC(self.parameters['enable_CRC']) # set LowDataRateOptimize flag if symbol time > 16ms (default disable on reset) # self.writeRegister(REG_MODEM_CONFIG_3, self.readRegister(REG_MODEM_CONFIG_3) & 0xF7) # default disable on reset if 1000 / (self.parameters['signal_bandwidth'] / 2**self.parameters['spreading_factor']) > 16: self.writeRegister(REG_MODEM_CONFIG_3, self.readRegister(REG_MODEM_CONFIG_3) | 0x08) # set base addresses self.writeRegister(REG_FIFO_TX_BASE_ADDR, FifoTxBaseAddr) self.writeRegister(REG_FIFO_RX_BASE_ADDR, FifoRxBaseAddr) self.standby() def beginPacket(self, implicitHeaderMode = False): self.standby() self.implicitHeaderMode(implicitHeaderMode) # reset FIFO address and paload length self.writeRegister(REG_FIFO_ADDR_PTR, FifoTxBaseAddr) self.writeRegister(REG_PAYLOAD_LENGTH, 0) def endPacket(self): # put in TX mode self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_TX) # wait for TX done, standby automatically on TX_DONE while (self.readRegister(REG_IRQ_FLAGS) & IRQ_TX_DONE_MASK) == 0: pass # clear IRQ's self.writeRegister(REG_IRQ_FLAGS, IRQ_TX_DONE_MASK) self.collect_garbage() def write(self, buffer): currentLength = self.readRegister(REG_PAYLOAD_LENGTH) size = len(buffer) # check size size = min(size, (MAX_PKT_LENGTH - FifoTxBaseAddr - currentLength)) # write data for i in range(size): self.writeRegister(REG_FIFO, buffer[i]) # update length self.writeRegister(REG_PAYLOAD_LENGTH, currentLength + size) return size def aquire_lock(self, lock = False): self._lock = False def println(self, string, implicitHeader = False): """ function for sending string data """ self.println_raw(string.encode(), implicitHeader) def println_raw(self, data, implicitHeader = False): """ function for sending raw binary data data should be an indexable array of bytes (e.g. bytearray) """ self.aquire_lock(True) # wait until RX_Done, lock and begin writing. self.beginPacket(implicitHeader) self.write(data) self.endPacket() self.aquire_lock(False) # unlock when done writing def getIrqFlags(self): irqFlags = self.readRegister(REG_IRQ_FLAGS) self.writeRegister(REG_IRQ_FLAGS, irqFlags) return irqFlags def packetRssi(self): return (self.readRegister(REG_PKT_RSSI_VALUE) - (164 if self._frequency < 868E6 else 157)) def packetSnr(self): return (self.readRegister(REG_PKT_SNR_VALUE)) * 0.25 def standby(self): self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_STDBY) def fsrx(self): self.writeRegister(REG_OP_MODE, MODE_FSRX) def sleep(self): self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_SLEEP) def setTxPower(self, level, outputPin = PA_OUTPUT_PA_BOOST_PIN): if (outputPin == PA_OUTPUT_RFO_PIN): # RFO level = min(max(level, 0), 14) self.writeRegister(REG_PA_CONFIG, 0x70 | level) else: # PA BOOST level = min(max(level, 2), 17) self.writeRegister(REG_PA_CONFIG, PA_BOOST | (level - 2)) def setFrequency(self, frequency): self._frequency = frequency frfs = {169E6: (42, 64, 0), 433E6: (108, 64, 0), 434E6: (108, 128, 0), 866E6: (216, 128, 0), 868E6: (217, 0, 0), 868.1E6: (217, 6, 102), 915E6: (228, 192, 0)} self.writeRegister(REG_FRF_MSB, frfs[frequency][0]) self.writeRegister(REG_FRF_MID, frfs[frequency][1]) self.writeRegister(REG_FRF_LSB, frfs[frequency][2]) def setSpreadingFactor(self, sf): sf = min(max(sf, 6), 12) self.writeRegister(REG_DETECTION_OPTIMIZE, 0xc5 if sf == 6 else 0xc3) self.writeRegister(REG_DETECTION_THRESHOLD, 0x0c if sf == 6 else 0x0a) self.writeRegister(REG_MODEM_CONFIG_2, (self.readRegister(REG_MODEM_CONFIG_2) & 0x0f) | ((sf << 4) & 0xf0)) def setSignalBandwidth(self, sbw): bins = (7.8E3, 10.4E3, 15.6E3, 20.8E3, 31.25E3, 41.7E3, 62.5E3, 125E3, 250E3) bw = 9 for i in range(len(bins)): if sbw <= bins[i]: bw = i break # bw = bins.index(sbw) self.writeRegister(REG_MODEM_CONFIG_1, (self.readRegister(REG_MODEM_CONFIG_1) & 0x0f) | (bw << 4)) def setCodingRate(self, denominator): denominator = min(max(denominator, 5), 8) cr = denominator - 4 self.writeRegister(REG_MODEM_CONFIG_1, (self.readRegister(REG_MODEM_CONFIG_1) & 0xf1) | (cr << 1)) def setPreambleLength(self, length): self.writeRegister(REG_PREAMBLE_MSB, (length >> 8) & 0xff) self.writeRegister(REG_PREAMBLE_LSB, (length >> 0) & 0xff) def enableCRC(self, enable_CRC = False): modem_config_2 = self.readRegister(REG_MODEM_CONFIG_2) config = modem_config_2 | 0x04 if enable_CRC else modem_config_2 & 0xfb self.writeRegister(REG_MODEM_CONFIG_2, config) def setSyncWord(self, sw): self.writeRegister(REG_SYNC_WORD, sw) # def enable_Rx_Done_IRQ(self, enable = True): # if enable: # self.writeRegister(REG_IRQ_FLAGS_MASK, self.readRegister(REG_IRQ_FLAGS_MASK) & ~IRQ_RX_DONE_MASK) # else: # self.writeRegister(REG_IRQ_FLAGS_MASK, self.readRegister(REG_IRQ_FLAGS_MASK) | IRQ_RX_DONE_MASK) # def dumpRegisters(self): # for i in range(128): # print("0x{0:02x}: {1:02x}".format(i, self.readRegister(i))) def implicitHeaderMode(self, implicitHeaderMode = False): if self._implicitHeaderMode != implicitHeaderMode: # set value only if different. self._implicitHeaderMode = implicitHeaderMode modem_config_1 = self.readRegister(REG_MODEM_CONFIG_1) config = modem_config_1 | 0x01 if implicitHeaderMode else modem_config_1 & 0xfe self.writeRegister(REG_MODEM_CONFIG_1, config) def onReceive(self, callback): self._onReceive = callback if self.pin_RxDone: if callback: self.writeRegister(REG_DIO_MAPPING_1, 0x00) self.pin_RxDone.set_handler_for_irq_on_rising_edge(handler = self.handleOnReceive) else: self.pin_RxDone.detach_irq() def receive(self, size = 0): self.implicitHeaderMode(size > 0) if size > 0: self.writeRegister(REG_PAYLOAD_LENGTH, size & 0xff) # The last packet always starts at FIFO_RX_CURRENT_ADDR # no need to reset FIFO_ADDR_PTR self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_CONTINUOUS) def handleOnReceive(self, event_source): self.aquire_lock(True) # lock until TX_Done irqFlags = self.getIrqFlags() if (irqFlags == IRQ_RX_DONE_MASK): # RX_DONE only, irqFlags should be 0x40 # automatically standby when RX_DONE if self._onReceive: payload = self.read_payload() self._onReceive(self, payload) elif self.readRegister(REG_OP_MODE) != (MODE_LONG_RANGE_MODE | MODE_RX_SINGLE): # no packet received. # reset FIFO address / # enter single RX mode self.writeRegister(REG_FIFO_ADDR_PTR, FifoRxBaseAddr) self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE) self.aquire_lock(False) # unlock in any case. self.collect_garbage() return True # self.aquire_lock(True) # lock until TX_Done # # irqFlags = self.readRegister(REG_IRQ_FLAGS) # should be 0x50 # self.writeRegister(REG_IRQ_FLAGS, irqFlags) # # if (irqFlags & IRQ_RX_DONE_MASK) == 0: # check `RxDone` # self.aquire_lock(False) # return # `RxDone` is not set # # # check `PayloadCrcError` bit # crcOk = not bool (irqFlags & IRQ_PAYLOAD_CRC_ERROR_MASK) # # # set FIFO address to current RX address # self.writeRegister(REG_FIFO_ADDR_PTR, self.readRegister(REG_FIFO_RX_CURRENT_ADDR)) # # if self._onReceive: # payload = self.read_payload() # print(payload) # self.aquire_lock(False) # unlock when done reading # # self._onReceive(self, payload) # # self.aquire_lock(False) # unlock in any case. def receivedPacket(self, size = 0): irqFlags = self.getIrqFlags() self.implicitHeaderMode(size > 0) if size > 0: self.writeRegister(REG_PAYLOAD_LENGTH, size & 0xff) # if (irqFlags & IRQ_RX_DONE_MASK) and \ # (irqFlags & IRQ_RX_TIME_OUT_MASK == 0) and \ # (irqFlags & IRQ_PAYLOAD_CRC_ERROR_MASK == 0): if (irqFlags == IRQ_RX_DONE_MASK): # RX_DONE only, irqFlags should be 0x40 # automatically standby when RX_DONE return True elif self.readRegister(REG_OP_MODE) != (MODE_LONG_RANGE_MODE | MODE_RX_SINGLE): # no packet received. # reset FIFO address / # enter single RX mode self.writeRegister(REG_FIFO_ADDR_PTR, FifoRxBaseAddr) self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE) def read_payload_raw(self): """ get the payload of the most recent message this function returns the payload data as a bytearray """ # set FIFO address to current RX address # fifo_rx_current_addr = self.readRegister(REG_FIFO_RX_CURRENT_ADDR) self.writeRegister(REG_FIFO_ADDR_PTR, self.readRegister(REG_FIFO_RX_CURRENT_ADDR)) # read packet length packetLength = self.readRegister(REG_PAYLOAD_LENGTH) if self._implicitHeaderMode else \ self.readRegister(REG_RX_NB_BYTES) payload = bytearray() for i in range(packetLength): payload.append(self.readRegister(REG_FIFO)) self.collect_garbage() return payload def read_payload(self): """ get the payload of the most recent message this function returns the payload data as a bytes (i.e. a printable string) """ return bytes(self.read_payload_raw()) def printTemperature(self): # only work if LoRa mode is off # datasheet page 89: # 1. set device to Standby and wait for oscillator startup self.standby() sleep_ms(1000) # 2. set device to FSRX mode self.fsrx() # DEBUG: read TempMonitorOff state reg_image_cal = self.readRegister(REG_IMAGE_CAL) print("reg_image_cal before = {}".format(mybin(reg_image_cal))) # 3. Set TempMonitorOff = 0 (enables the sensor) self.writeRegister(REG_IMAGE_CAL, reg_image_cal & 0xFE) reg_image_cal = self.readRegister(REG_IMAGE_CAL) print("reg_image_cal after = {}".format(mybin(reg_image_cal))) # 4. Wait for 140 ms sleep_ms(140) # 5. Set TempMonitorOff = 1 self.writeRegister(REG_IMAGE_CAL, reg_image_cal | 0x01) reg_image_cal = self.readRegister(REG_IMAGE_CAL) print("reg_image_cal after 2 = {}".format(mybin(reg_image_cal))) # 6. Set device back to Sleep or Standby mode self.standby() # 7. Access temperature in RegTemp temp = self.readRegister(REG_TEMP) print("temperature = {} = {}".format(mybin(temp), temp)) def readRegister(self, address, byteorder = 'big', signed = False): response = self.transfer(self.pin_ss, address & 0x7f) return int.from_bytes(response, byteorder) def writeRegister(self, address, value): self.transfer(self.pin_ss, address | 0x80, value) def collect_garbage(self): gc.collect() #print('[Memory - free: {} allocated: {}]'.format(gc.mem_free(), gc.mem_alloc()))