uPyLora library by lemariva added

This commit is contained in:
Simon Pirkelmann 2019-11-19 20:48:05 +01:00
parent 4aec0e11b1
commit a7d723589f
6 changed files with 939 additions and 0 deletions

201
micropython/uPyLora/LICENSE Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,22 @@
This code is based on Lemariva's [uPyLora repository](https://github.com/lemariva/uPyLora)
I made a few minor changes to make it work with LoRaWAN. In particular, support for the 868.1MHz frequency (used for channel 0 according to the TTN EU868 frequency plan) was added, as well as a function for sending raw binary data (bytearrays) instead of strings.
# uPyLora
ESP32 using MicroPython meets lora.
# Setup
* `LoRaPingPong.py`: sends ping-pong messages between the nodes (bidirectional communication)
* `LoRaReceiver.py` and `LoraSender.py`: unidirectional communication between the nodes (Note: deploy the `LoRaReceiver.py` on one node and the `LoraSender.py` on another node)
# Hardware
* [Wemos® TTGO LORA32 868/915Mhz](https://www.banggood.com/2Pcs-Wemos-TTGO-LORA32-868915Mhz-ESP32-LoRa-OLED-0_96-Inch-Blue-Display-p-1239769.html?p=QW0903761303201409LG) board.
# Revision
* 0.1 first commit
# Licenses
* Apache 2.0
# References
* Basically based on: [Wei1234c GitHub](https://github.com/Wei1234c/SX127x_driver_for_MicroPython_on_ESP8266). The original project was cleaned and made compatible with the [Wemos® TTGO LORA32 868/915Mhz](https://www.banggood.com/2Pcs-Wemos-TTGO-LORA32-868915Mhz-ESP32-LoRa-OLED-0_96-Inch-Blue-Display-p-1239769.html?p=QW0903761303201409LG) board.

View File

@ -0,0 +1,18 @@
import sys
import os
import time
import machine
import ubinascii
def mac2eui(mac):
mac = mac[0:6] + 'fffe' + mac[6:]
return hex(int(mac[0:2], 16) ^ 2)[2:] + mac[2:]
def get_millis():
millisecond = time.ticks_ms()
return millisecond
def get_nodename():
uuid = ubinascii.hexlify(machine.unique_id()).decode()
node_name = "ESP_" + uuid
return node_name

View File

@ -0,0 +1,126 @@
from time import sleep
class Controller:
class Mock:
pass
ON_BOARD_LED_PIN_NO = None
ON_BOARD_LED_HIGH_IS_ON = True
GPIO_PINS = []
PIN_ID_FOR_LORA_RESET = None
PIN_ID_FOR_LORA_SS = None
PIN_ID_SCK = None
PIN_ID_MOSI = None
PIN_ID_MISO = None
PIN_ID_FOR_LORA_DIO0 = None
PIN_ID_FOR_LORA_DIO1 = None
PIN_ID_FOR_LORA_DIO2 = None
PIN_ID_FOR_LORA_DIO3 = None
PIN_ID_FOR_LORA_DIO4 = None
PIN_ID_FOR_LORA_DIO5 = None
def __init__(self,
pin_id_led = ON_BOARD_LED_PIN_NO,
on_board_led_high_is_on = ON_BOARD_LED_HIGH_IS_ON,
pin_id_reset = PIN_ID_FOR_LORA_RESET,
blink_on_start = (2, 0.5, 0.5)):
self.pin_led = self.prepare_pin(pin_id_led)
self.on_board_led_high_is_on = on_board_led_high_is_on
self.pin_reset = self.prepare_pin(pin_id_reset)
self.reset_pin(self.pin_reset)
self.transceivers = {}
self.blink_led(*blink_on_start)
def add_transceiver(self,
transceiver,
pin_id_ss = PIN_ID_FOR_LORA_SS,
pin_id_RxDone = PIN_ID_FOR_LORA_DIO0,
pin_id_RxTimeout = PIN_ID_FOR_LORA_DIO1,
pin_id_ValidHeader = PIN_ID_FOR_LORA_DIO2,
pin_id_CadDone = PIN_ID_FOR_LORA_DIO3,
pin_id_CadDetected = PIN_ID_FOR_LORA_DIO4,
pin_id_PayloadCrcError = PIN_ID_FOR_LORA_DIO5):
transceiver.blink_led = self.blink_led
transceiver.pin_ss = self.prepare_pin(pin_id_ss)
transceiver.pin_RxDone = self.prepare_irq_pin(pin_id_RxDone)
transceiver.pin_RxTimeout = self.prepare_irq_pin(pin_id_RxTimeout)
transceiver.pin_ValidHeader = self.prepare_irq_pin(pin_id_ValidHeader)
transceiver.pin_CadDone = self.prepare_irq_pin(pin_id_CadDone)
transceiver.pin_CadDetected = self.prepare_irq_pin(pin_id_CadDetected)
transceiver.pin_PayloadCrcError = self.prepare_irq_pin(pin_id_PayloadCrcError)
self.spi = self.prepare_spi(self.get_spi())
transceiver.transfer = self.spi.transfer
transceiver.init()
self.transceivers[transceiver.name] = transceiver
return transceiver
def prepare_pin(self, pin_id, in_out = None):
reason = '''
# a pin should provide:
# .pin_id
# .low()
# .high()
# .value() # read input.
# .irq() # (ESP8266/ESP32 only) ref to the irq function of real pin object.
'''
raise NotImplementedError(reason)
def prepare_irq_pin(self, pin_id):
reason = '''
# a irq_pin should provide:
# .set_handler_for_irq_on_rising_edge() # to set trigger and handler.
# .detach_irq()
'''
raise NotImplementedError(reason)
def get_spi(self):
reason = '''
# initialize SPI interface
'''
raise NotImplementedError(reason)
def prepare_spi(self, spi):
reason = '''
# a spi should provide:
# .close()
# .transfer(pin_ss, address, value = 0x00)
'''
raise NotImplementedError(reason)
def led_on(self, on = True):
self.pin_led.high() if self.on_board_led_high_is_on == on else self.pin_led.low()
def blink_led(self, times = 1, on_seconds = 0.1, off_seconds = 0.1):
for i in range(times):
self.led_on(True)
sleep(on_seconds)
self.led_on(False)
sleep(off_seconds)
def reset_pin(self, pin, duration_low = 0.05, duration_high = 0.05):
pin.low()
sleep(duration_low)
pin.high()
sleep(duration_high)
def __exit__(self):
self.spi.close()

View File

@ -0,0 +1,110 @@
from machine import Pin, SPI, reset
from controller import Controller
class ESP32Controller(Controller):
# LoRa config
PIN_ID_FOR_LORA_RESET = 14
PIN_ID_FOR_LORA_SS = 18
PIN_ID_SCK = 5
PIN_ID_MOSI = 27
PIN_ID_MISO = 19
PIN_ID_FOR_LORA_DIO0 = 26
PIN_ID_FOR_LORA_DIO1 = None
PIN_ID_FOR_LORA_DIO2 = None
PIN_ID_FOR_LORA_DIO3 = None
PIN_ID_FOR_LORA_DIO4 = None
PIN_ID_FOR_LORA_DIO5 = None
# ESP config
ON_BOARD_LED_PIN_NO = 2
ON_BOARD_LED_HIGH_IS_ON = True
GPIO_PINS = (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 21, 22,
23, 25, 26, 27, 32, 34, 35, 36, 37, 38, 39)
def __init__(self,
pin_id_led = ON_BOARD_LED_PIN_NO,
on_board_led_high_is_on = ON_BOARD_LED_HIGH_IS_ON,
pin_id_reset = PIN_ID_FOR_LORA_RESET,
blink_on_start = (2, 0.5, 0.5)):
super().__init__(pin_id_led,
on_board_led_high_is_on,
pin_id_reset,
blink_on_start)
def prepare_pin(self, pin_id, in_out = Pin.OUT):
if pin_id is not None:
pin = Pin(pin_id, in_out)
new_pin = Controller.Mock()
new_pin.pin_id = pin_id
new_pin.value = pin.value
if in_out == Pin.OUT:
new_pin.low = lambda : pin.value(0)
new_pin.high = lambda : pin.value(1)
else:
new_pin.irq = pin.irq
return new_pin
def prepare_irq_pin(self, pin_id):
pin = self.prepare_pin(pin_id, Pin.IN)
if pin:
pin.set_handler_for_irq_on_rising_edge = lambda handler: pin.irq(handler = handler, trigger = Pin.IRQ_RISING)
pin.detach_irq = lambda : pin.irq(handler = None, trigger = 0)
return pin
def get_spi(self):
spi = None
try:
spi = SPI(baudrate = 10000000, polarity = 0, phase = 0, bits = 8, firstbit = SPI.MSB,
sck = Pin(self.PIN_ID_SCK, Pin.OUT, Pin.PULL_DOWN),
mosi = Pin(self.PIN_ID_MOSI, Pin.OUT, Pin.PULL_UP),
miso = Pin(self.PIN_ID_MISO, Pin.IN, Pin.PULL_UP))
#spi.init()
except Exception as e:
print(e)
if spi:
spi.deinit()
spi = None
reset() # in case SPI is already in use, need to reset.
return spi
def prepare_spi(self, spi):
if spi:
new_spi = Controller.Mock()
def transfer(pin_ss, address, value = 0x00):
response = bytearray(1)
pin_ss.low()
spi.write(bytes([address]))
spi.write_readinto(bytes([value]), response)
pin_ss.high()
return response
new_spi.transfer = transfer
new_spi.close = spi.deinit
return new_spi
def __exit__(self):
self.spi.close()

View File

@ -0,0 +1,462 @@
from time import sleep
from utime import sleep_ms
import gc
PA_OUTPUT_RFO_PIN = 0
PA_OUTPUT_PA_BOOST_PIN = 1
# registers
REG_FIFO = 0x00
REG_OP_MODE = 0x01
REG_FRF_MSB = 0x06
REG_FRF_MID = 0x07
REG_FRF_LSB = 0x08
REG_PA_CONFIG = 0x09
REG_LNA = 0x0c
REG_FIFO_ADDR_PTR = 0x0d
REG_IMAGE_CAL = 0x3b
REG_TEMP = 0x3c
REG_FIFO_TX_BASE_ADDR = 0x0e
FifoTxBaseAddr = 0x00
# FifoTxBaseAddr = 0x80
REG_FIFO_RX_BASE_ADDR = 0x0f
FifoRxBaseAddr = 0x00
REG_FIFO_RX_CURRENT_ADDR = 0x10
REG_IRQ_FLAGS_MASK = 0x11
REG_IRQ_FLAGS = 0x12
REG_RX_NB_BYTES = 0x13
REG_PKT_RSSI_VALUE = 0x1a
REG_PKT_SNR_VALUE = 0x1b
REG_MODEM_CONFIG_1 = 0x1d
REG_MODEM_CONFIG_2 = 0x1e
REG_PREAMBLE_MSB = 0x20
REG_PREAMBLE_LSB = 0x21
REG_PAYLOAD_LENGTH = 0x22
REG_FIFO_RX_BYTE_ADDR = 0x25
REG_MODEM_CONFIG_3 = 0x26
REG_RSSI_WIDEBAND = 0x2c
REG_DETECTION_OPTIMIZE = 0x31
REG_DETECTION_THRESHOLD = 0x37
REG_SYNC_WORD = 0x39
REG_DIO_MAPPING_1 = 0x40
REG_VERSION = 0x42
# modes
MODE_LONG_RANGE_MODE = 0x80 # bit 7: 1 => LoRa mode
MODE_SLEEP = 0x00
MODE_STDBY = 0x01
MODE_TX = 0x03
MODE_FSRX = 0x04
MODE_RX_CONTINUOUS = 0x05
MODE_RX_SINGLE = 0x06
# PA config
PA_BOOST = 0x80
# IRQ masks
IRQ_TX_DONE_MASK = 0x08
IRQ_PAYLOAD_CRC_ERROR_MASK = 0x20
IRQ_RX_DONE_MASK = 0x40
IRQ_RX_TIME_OUT_MASK = 0x80
# Buffer size
MAX_PKT_LENGTH = 255
def mybin(val):
return "0b{:0>{w}}".format(bin(val)[2:],w=8)
class SX127x:
def __init__(self,
name = 'SX127x',
parameters = {'frequency': 868E6, 'tx_power_level': 2, 'signal_bandwidth': 125E3,
'spreading_factor': 8, 'coding_rate': 5, 'preamble_length': 8,
'implicitHeader': False, 'sync_word': 0x12, 'enable_CRC': False},
onReceive = None):
self.name = name
self.parameters = parameters
self._onReceive = onReceive
self._lock = False
def init(self, parameters = None):
if parameters: self.parameters = parameters
init_try = True
re_try = 0
# check version
while(init_try and re_try < 5):
version = self.readRegister(REG_VERSION)
re_try = re_try + 1
if(version != 0):
init_try = False;
if version != 0x12:
raise Exception('Invalid version.')
# put in LoRa and sleep mode
self.sleep()
# config
self.setFrequency(self.parameters['frequency'])
self.setSignalBandwidth(self.parameters['signal_bandwidth'])
# set LNA boost
self.writeRegister(REG_LNA, self.readRegister(REG_LNA) | 0x03)
# set auto AGC
self.writeRegister(REG_MODEM_CONFIG_3, 0x04)
self.setTxPower(self.parameters['tx_power_level'])
self._implicitHeaderMode = None
self.implicitHeaderMode(self.parameters['implicitHeader'])
self.setSpreadingFactor(self.parameters['spreading_factor'])
self.setCodingRate(self.parameters['coding_rate'])
self.setPreambleLength(self.parameters['preamble_length'])
self.setSyncWord(self.parameters['sync_word'])
self.enableCRC(self.parameters['enable_CRC'])
# set LowDataRateOptimize flag if symbol time > 16ms (default disable on reset)
# self.writeRegister(REG_MODEM_CONFIG_3, self.readRegister(REG_MODEM_CONFIG_3) & 0xF7) # default disable on reset
if 1000 / (self.parameters['signal_bandwidth'] / 2**self.parameters['spreading_factor']) > 16:
self.writeRegister(REG_MODEM_CONFIG_3, self.readRegister(REG_MODEM_CONFIG_3) | 0x08)
# set base addresses
self.writeRegister(REG_FIFO_TX_BASE_ADDR, FifoTxBaseAddr)
self.writeRegister(REG_FIFO_RX_BASE_ADDR, FifoRxBaseAddr)
self.standby()
def beginPacket(self, implicitHeaderMode = False):
self.standby()
self.implicitHeaderMode(implicitHeaderMode)
# reset FIFO address and paload length
self.writeRegister(REG_FIFO_ADDR_PTR, FifoTxBaseAddr)
self.writeRegister(REG_PAYLOAD_LENGTH, 0)
def endPacket(self):
# put in TX mode
self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_TX)
# wait for TX done, standby automatically on TX_DONE
while (self.readRegister(REG_IRQ_FLAGS) & IRQ_TX_DONE_MASK) == 0:
pass
# clear IRQ's
self.writeRegister(REG_IRQ_FLAGS, IRQ_TX_DONE_MASK)
self.collect_garbage()
def write(self, buffer):
currentLength = self.readRegister(REG_PAYLOAD_LENGTH)
size = len(buffer)
# check size
size = min(size, (MAX_PKT_LENGTH - FifoTxBaseAddr - currentLength))
# write data
for i in range(size):
self.writeRegister(REG_FIFO, buffer[i])
# update length
self.writeRegister(REG_PAYLOAD_LENGTH, currentLength + size)
return size
def aquire_lock(self, lock = False):
self._lock = False
def println(self, string, implicitHeader = False):
self.aquire_lock(True) # wait until RX_Done, lock and begin writing.
self.beginPacket(implicitHeader)
self.write(string.encode())
self.endPacket()
self.aquire_lock(False) # unlock when done writing
def println_raw(self, data, implicitHeader = False):
""" function for sending raw binary data
data should be an indexable array of bytes (e.g. bytearray)
"""
self.aquire_lock(True) # wait until RX_Done, lock and begin writing.
self.beginPacket(implicitHeader)
self.write(data)
self.endPacket()
self.aquire_lock(False) # unlock when done writing
def getIrqFlags(self):
irqFlags = self.readRegister(REG_IRQ_FLAGS)
self.writeRegister(REG_IRQ_FLAGS, irqFlags)
return irqFlags
def packetRssi(self):
return (self.readRegister(REG_PKT_RSSI_VALUE) - (164 if self._frequency < 868E6 else 157))
def packetSnr(self):
return (self.readRegister(REG_PKT_SNR_VALUE)) * 0.25
def standby(self):
self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_STDBY)
def fsrx(self):
self.writeRegister(REG_OP_MODE, MODE_FSRX)
def sleep(self):
self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_SLEEP)
def setTxPower(self, level, outputPin = PA_OUTPUT_PA_BOOST_PIN):
if (outputPin == PA_OUTPUT_RFO_PIN):
# RFO
level = min(max(level, 0), 14)
self.writeRegister(REG_PA_CONFIG, 0x70 | level)
else:
# PA BOOST
level = min(max(level, 2), 17)
self.writeRegister(REG_PA_CONFIG, PA_BOOST | (level - 2))
def setFrequency(self, frequency):
self._frequency = frequency
frfs = {169E6: (42, 64, 0),
433E6: (108, 64, 0),
434E6: (108, 128, 0),
866E6: (216, 128, 0),
868E6: (217, 0, 0),
868.1E6: (217, 6, 102),
915E6: (228, 192, 0)}
self.writeRegister(REG_FRF_MSB, frfs[frequency][0])
self.writeRegister(REG_FRF_MID, frfs[frequency][1])
self.writeRegister(REG_FRF_LSB, frfs[frequency][2])
def setSpreadingFactor(self, sf):
sf = min(max(sf, 6), 12)
self.writeRegister(REG_DETECTION_OPTIMIZE, 0xc5 if sf == 6 else 0xc3)
self.writeRegister(REG_DETECTION_THRESHOLD, 0x0c if sf == 6 else 0x0a)
self.writeRegister(REG_MODEM_CONFIG_2, (self.readRegister(REG_MODEM_CONFIG_2) & 0x0f) | ((sf << 4) & 0xf0))
def setSignalBandwidth(self, sbw):
bins = (7.8E3, 10.4E3, 15.6E3, 20.8E3, 31.25E3, 41.7E3, 62.5E3, 125E3, 250E3)
bw = 9
for i in range(len(bins)):
if sbw <= bins[i]:
bw = i
break
# bw = bins.index(sbw)
self.writeRegister(REG_MODEM_CONFIG_1, (self.readRegister(REG_MODEM_CONFIG_1) & 0x0f) | (bw << 4))
def setCodingRate(self, denominator):
denominator = min(max(denominator, 5), 8)
cr = denominator - 4
self.writeRegister(REG_MODEM_CONFIG_1, (self.readRegister(REG_MODEM_CONFIG_1) & 0xf1) | (cr << 1))
def setPreambleLength(self, length):
self.writeRegister(REG_PREAMBLE_MSB, (length >> 8) & 0xff)
self.writeRegister(REG_PREAMBLE_LSB, (length >> 0) & 0xff)
def enableCRC(self, enable_CRC = False):
modem_config_2 = self.readRegister(REG_MODEM_CONFIG_2)
config = modem_config_2 | 0x04 if enable_CRC else modem_config_2 & 0xfb
self.writeRegister(REG_MODEM_CONFIG_2, config)
def setSyncWord(self, sw):
self.writeRegister(REG_SYNC_WORD, sw)
# def enable_Rx_Done_IRQ(self, enable = True):
# if enable:
# self.writeRegister(REG_IRQ_FLAGS_MASK, self.readRegister(REG_IRQ_FLAGS_MASK) & ~IRQ_RX_DONE_MASK)
# else:
# self.writeRegister(REG_IRQ_FLAGS_MASK, self.readRegister(REG_IRQ_FLAGS_MASK) | IRQ_RX_DONE_MASK)
# def dumpRegisters(self):
# for i in range(128):
# print("0x{0:02x}: {1:02x}".format(i, self.readRegister(i)))
def implicitHeaderMode(self, implicitHeaderMode = False):
if self._implicitHeaderMode != implicitHeaderMode: # set value only if different.
self._implicitHeaderMode = implicitHeaderMode
modem_config_1 = self.readRegister(REG_MODEM_CONFIG_1)
config = modem_config_1 | 0x01 if implicitHeaderMode else modem_config_1 & 0xfe
self.writeRegister(REG_MODEM_CONFIG_1, config)
def onReceive(self, callback):
self._onReceive = callback
if self.pin_RxDone:
if callback:
self.writeRegister(REG_DIO_MAPPING_1, 0x00)
self.pin_RxDone.set_handler_for_irq_on_rising_edge(handler = self.handleOnReceive)
else:
self.pin_RxDone.detach_irq()
def receive(self, size = 0):
self.implicitHeaderMode(size > 0)
if size > 0: self.writeRegister(REG_PAYLOAD_LENGTH, size & 0xff)
# The last packet always starts at FIFO_RX_CURRENT_ADDR
# no need to reset FIFO_ADDR_PTR
self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_CONTINUOUS)
def handleOnReceive(self, event_source):
self.aquire_lock(True) # lock until TX_Done
irqFlags = self.getIrqFlags()
if (irqFlags == IRQ_RX_DONE_MASK): # RX_DONE only, irqFlags should be 0x40
# automatically standby when RX_DONE
if self._onReceive:
payload = self.read_payload()
self._onReceive(self, payload)
elif self.readRegister(REG_OP_MODE) != (MODE_LONG_RANGE_MODE | MODE_RX_SINGLE):
# no packet received.
# reset FIFO address / # enter single RX mode
self.writeRegister(REG_FIFO_ADDR_PTR, FifoRxBaseAddr)
self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE)
self.aquire_lock(False) # unlock in any case.
self.collect_garbage()
return True
# self.aquire_lock(True) # lock until TX_Done
#
# irqFlags = self.readRegister(REG_IRQ_FLAGS) # should be 0x50
# self.writeRegister(REG_IRQ_FLAGS, irqFlags)
#
# if (irqFlags & IRQ_RX_DONE_MASK) == 0: # check `RxDone`
# self.aquire_lock(False)
# return # `RxDone` is not set
#
# # check `PayloadCrcError` bit
# crcOk = not bool (irqFlags & IRQ_PAYLOAD_CRC_ERROR_MASK)
#
# # set FIFO address to current RX address
# self.writeRegister(REG_FIFO_ADDR_PTR, self.readRegister(REG_FIFO_RX_CURRENT_ADDR))
#
# if self._onReceive:
# payload = self.read_payload()
# print(payload)
# self.aquire_lock(False) # unlock when done reading
#
# self._onReceive(self, payload)
#
# self.aquire_lock(False) # unlock in any case.
def receivedPacket(self, size = 0):
irqFlags = self.getIrqFlags()
self.implicitHeaderMode(size > 0)
if size > 0: self.writeRegister(REG_PAYLOAD_LENGTH, size & 0xff)
# if (irqFlags & IRQ_RX_DONE_MASK) and \
# (irqFlags & IRQ_RX_TIME_OUT_MASK == 0) and \
# (irqFlags & IRQ_PAYLOAD_CRC_ERROR_MASK == 0):
if (irqFlags == IRQ_RX_DONE_MASK): # RX_DONE only, irqFlags should be 0x40
# automatically standby when RX_DONE
return True
elif self.readRegister(REG_OP_MODE) != (MODE_LONG_RANGE_MODE | MODE_RX_SINGLE):
# no packet received.
# reset FIFO address / # enter single RX mode
self.writeRegister(REG_FIFO_ADDR_PTR, FifoRxBaseAddr)
self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE)
def read_payload(self):
# set FIFO address to current RX address
# fifo_rx_current_addr = self.readRegister(REG_FIFO_RX_CURRENT_ADDR)
self.writeRegister(REG_FIFO_ADDR_PTR, self.readRegister(REG_FIFO_RX_CURRENT_ADDR))
# read packet length
packetLength = self.readRegister(REG_PAYLOAD_LENGTH) if self._implicitHeaderMode else \
self.readRegister(REG_RX_NB_BYTES)
payload = bytearray()
for i in range(packetLength):
payload.append(self.readRegister(REG_FIFO))
self.collect_garbage()
return bytes(payload)
def printTemperature(self):
# only work if LoRa mode is off
# datasheet page 89:
# 1. set device to Standby and wait for oscillator startup
self.standby()
sleep_ms(1000)
# 2. set device to FSRX mode
self.fsrx()
# DEBUG: read TempMonitorOff state
reg_image_cal = self.readRegister(REG_IMAGE_CAL)
print("reg_image_cal before = {}".format(mybin(reg_image_cal)))
# 3. Set TempMonitorOff = 0 (enables the sensor)
self.writeRegister(REG_IMAGE_CAL, reg_image_cal & 0xFE)
reg_image_cal = self.readRegister(REG_IMAGE_CAL)
print("reg_image_cal after = {}".format(mybin(reg_image_cal)))
# 4. Wait for 140 ms
sleep_ms(140)
# 5. Set TempMonitorOff = 1
self.writeRegister(REG_IMAGE_CAL, reg_image_cal | 0x01)
reg_image_cal = self.readRegister(REG_IMAGE_CAL)
print("reg_image_cal after 2 = {}".format(mybin(reg_image_cal)))
# 6. Set device back to Sleep or Standby mode
self.standby()
# 7. Access temperature in RegTemp
temp = self.readRegister(REG_TEMP)
print("temperature = {} = {}".format(mybin(temp), temp))
def readRegister(self, address, byteorder = 'big', signed = False):
response = self.transfer(self.pin_ss, address & 0x7f)
return int.from_bytes(response, byteorder)
def writeRegister(self, address, value):
self.transfer(self.pin_ss, address | 0x80, value)
def collect_garbage(self):
gc.collect()
#print('[Memory - free: {} allocated: {}]'.format(gc.mem_free(), gc.mem_alloc()))