forked from Telos4/LoRa-Workshop
95 lines
2.9 KiB
Python
95 lines
2.9 KiB
Python
#
|
|
# lorawan packet: mhdr(1) mac_payload(1..N) mic(4)
|
|
#
|
|
from .MalformedPacketException import MalformedPacketException
|
|
from .MHDR import MHDR
|
|
from .Direction import Direction
|
|
from .MacPayload import MacPayload
|
|
|
|
|
|
class PhyPayload:
|
|
|
|
def __init__(self, nwkey, appkey):
|
|
self.nwkey = nwkey
|
|
self.appkey = appkey
|
|
|
|
def read(self, packet):
|
|
if len(packet) < 12:
|
|
raise MalformedPacketException("Invalid lorawan packet")
|
|
|
|
self.mhdr = MHDR(packet[0])
|
|
self.set_direction()
|
|
self.mac_payload = MacPayload()
|
|
self.mac_payload.read(self.get_mhdr().get_mtype(), packet[1:-4])
|
|
self.mic = packet[-4:]
|
|
|
|
|
|
def create(self, mhdr, args):
|
|
self.mhdr = MHDR(mhdr)
|
|
self.set_direction()
|
|
self.mac_payload = MacPayload()
|
|
self.mac_payload.create(self.get_mhdr().get_mtype(), self.appkey, args)
|
|
self.mic = None
|
|
|
|
def length(self):
|
|
return len(self.to_raw())
|
|
|
|
def to_raw(self):
|
|
phy_payload = [self.get_mhdr().to_raw()]
|
|
phy_payload += self.mac_payload.to_raw()
|
|
phy_payload += self.get_mic()
|
|
return phy_payload
|
|
|
|
def get_mhdr(self):
|
|
return self.mhdr
|
|
|
|
def set_mhdr(self, mhdr):
|
|
self.mhdr = mhdr
|
|
|
|
def get_direction(self):
|
|
return self.direction.get()
|
|
|
|
def set_direction(self):
|
|
self.direction = Direction(self.get_mhdr())
|
|
|
|
def get_mac_payload(self):
|
|
return self.mac_payload
|
|
|
|
def set_mac_payload(self, mac_payload):
|
|
self.mac_payload = mac_payload
|
|
|
|
def get_mic(self):
|
|
if self.mic == None:
|
|
self.set_mic(self.compute_mic())
|
|
return self.mic
|
|
|
|
def set_mic(self, mic):
|
|
self.mic = mic
|
|
|
|
def compute_mic(self):
|
|
if self.get_mhdr().get_mtype() == MHDR.JOIN_ACCEPT:
|
|
return self.mac_payload.frm_payload.encrypt_payload(self.appkey, self.get_direction(), self.get_mhdr())[-4:]
|
|
else:
|
|
return self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr())
|
|
|
|
def valid_mic(self):
|
|
if self.get_mhdr().get_mtype() == MHDR.JOIN_ACCEPT:
|
|
return self.get_mic() == self.mac_payload.frm_payload.encrypt_payload(self.appkey, self.get_direction(), self.get_mhdr())[-4:]
|
|
else:
|
|
return self.get_mic() == self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr())
|
|
|
|
def get_devaddr(self):
|
|
if self.get_mhdr().get_mtype() == MHDR.JOIN_ACCEPT:
|
|
return self.mac_payload.frm_payload.get_devaddr()
|
|
else:
|
|
return self.mac_payload.fhdr.get_devaddr()
|
|
|
|
def get_payload(self):
|
|
return self.mac_payload.frm_payload.decrypt_payload(self.appkey, self.get_direction(), self.mic)
|
|
|
|
def derive_nwskey(self, devnonce):
|
|
return self.mac_payload.frm_payload.derive_nwskey(self.appkey, devnonce)
|
|
|
|
def derive_appskey(self, devnonce):
|
|
return self.mac_payload.frm_payload.derive_appskey(self.appkey, devnonce)
|