79 lines
2.7 KiB
Python
79 lines
2.7 KiB
Python
|
from uPyLora import config_lora
|
||
|
from uPyLora.sx127x import SX127x
|
||
|
from uPyLora.controller_esp32 import ESP32Controller
|
||
|
|
||
|
class LoRaTransceiver:
|
||
|
def __init__(self, spreadingfactor=9, bandwidth=125E3, frequency=868E6, syncword=0x12, display=None):
|
||
|
# set up radio module
|
||
|
# SX127x LoRa module configured for LoRaWAN Channel 0
|
||
|
self.controller = ESP32Controller()
|
||
|
self.lora = self.controller.add_transceiver(SX127x(name='LoRa'), pin_id_ss=ESP32Controller.PIN_ID_FOR_LORA_SS, pin_id_RxDone=ESP32Controller.PIN_ID_FOR_LORA_DIO0)
|
||
|
|
||
|
self.setSpreadingFactor(spreadingfactor)
|
||
|
self.setSignalBandwidth(bandwidth)
|
||
|
self.setFrequency(frequency)
|
||
|
self.setSyncWord(syncword)
|
||
|
|
||
|
self.display = display
|
||
|
|
||
|
def setSpreadingFactor(self, spreadingfactor):
|
||
|
""" set spreading factor
|
||
|
possible values are 7-12
|
||
|
"""
|
||
|
self.lora.setSpreadingFactor(spreadingfactor)
|
||
|
|
||
|
def setFrequency(self, frequency):
|
||
|
""" set frequency (= channel used)
|
||
|
for now only 868 Mhz and 868.1 MHz are supported
|
||
|
"""
|
||
|
self.lora.setFrequency(frequency)
|
||
|
|
||
|
def setSignalBandwidth(self, bandwidth):
|
||
|
""" set the bandwidth
|
||
|
possible values should be 125 or 250 kHz according to EU 868 frequency plan
|
||
|
"""
|
||
|
self.lora.setSignalBandwidth(bandwidth)
|
||
|
|
||
|
def setSyncWord(self, syncword):
|
||
|
""" set the sync word
|
||
|
should be 0x12 for plain LoRa and 0x34 for LoRaWAN
|
||
|
"""
|
||
|
self.lora.setSyncWord(syncword)
|
||
|
|
||
|
def send(self, data):
|
||
|
""" send raw data
|
||
|
"""
|
||
|
print("Sending {}".format(data))
|
||
|
self.lora.println_raw(data)
|
||
|
|
||
|
if self.display is not None:
|
||
|
self.display.show_text_wrap("Message {} sent.".format(data))
|
||
|
|
||
|
def send_string(self, message):
|
||
|
""" send a message
|
||
|
message should be a string
|
||
|
"""
|
||
|
print("Sending {}".format(message))
|
||
|
self.lora.println(message)
|
||
|
|
||
|
if self.display is not None:
|
||
|
self.display.show_text_wrap("Message {} sent.".format(message))
|
||
|
|
||
|
def recv(self):
|
||
|
""" start listening for messages and output them on the console """
|
||
|
if self.display is not None:
|
||
|
self.display.show_text_wrap("Receiving ...")
|
||
|
|
||
|
while True:
|
||
|
# check for new message
|
||
|
if self.lora.receivedPacket():
|
||
|
self.lora.blink_led()
|
||
|
|
||
|
try:
|
||
|
payload = self.lora.read_payload()
|
||
|
if self.display is not None:
|
||
|
self.display.show_text_wrap("Received: {0} RSSI: {1}".format(payload.decode(), lora.packetRssi()))
|
||
|
|
||
|
except Exception as e:
|
||
|
print(e)
|