added uLoRaWAN library

pull/1/head
Simon Pirkelmann 2019-11-30 20:19:57 +01:00
parent cc9e9652cb
commit e02cdb5242
11 changed files with 638 additions and 0 deletions

View File

@ -0,0 +1,75 @@
from struct import pack, unpack
import ucryptolib
MODE_ECB = 1
class AES_CMAC:
def gen_subkey(self, K):
AES_128 = ucryptolib.aes(bytearray(K), MODE_ECB)
L = AES_128.encrypt(('\x00'*16).encode())
LHigh = unpack('>Q',L[:8])[0]
LLow = unpack('>Q',L[8:])[0]
K1High = ((LHigh << 1) | ( LLow >> 63 )) & 0xFFFFFFFFFFFFFFFF
K1Low = (LLow << 1) & 0xFFFFFFFFFFFFFFFF
if (LHigh >> 63):
K1Low ^= 0x87
K2High = ((K1High << 1) | (K1Low >> 63)) & 0xFFFFFFFFFFFFFFFF
K2Low = ((K1Low << 1)) & 0xFFFFFFFFFFFFFFFF
if (K1High >> 63):
K2Low ^= 0x87
K1 = pack('>QQ', K1High, K1Low)
K2 = pack('>QQ', K2High, K2Low)
return K1, K2
def xor_128(self, N1, N2):
J = b''
for i in range(len(N1)):
J += bytes([N1[i] ^ N2[i]])
return J
def pad(self, N):
const_Bsize = 16
padLen = 16-len(N)
return N + b'\x80' + b'\x00'*(padLen-1)
def encode(self, K, M):
const_Bsize = 16
const_Zero = b'\x00'*16
AES_128 = ucryptolib.aes(bytearray(K), MODE_ECB)
K1, K2 = self.gen_subkey(K)
n = int(len(M)/const_Bsize)
if n == 0:
n = 1
flag = False
else:
if (len(M) % const_Bsize) == 0:
flag = True
else:
n += 1
flag = False
M_n = M[(n-1)*const_Bsize:]
if flag is True:
M_last = self.xor_128(M_n,K1)
else:
M_last = self.xor_128(self.pad(M_n),K2)
X = const_Zero
for i in range(n-1):
M_i = M[(i)*const_Bsize:][:16]
Y = self.xor_128(X, M_i)
X = AES_128.encrypt(Y)
Y = self.xor_128(M_last, X)
T = AES_128.encrypt(Y)
return T

View File

@ -0,0 +1,104 @@
#
# frm_payload: data(0..N)
#
from .AES_CMAC import AES_CMAC
import math
import ucryptolib
MODE_ECB = 1
class DataPayload:
def read(self, mac_payload, payload):
self.mac_payload = mac_payload
self.payload = payload
def create(self, mac_payload, key, args):
self.mac_payload = mac_payload
self.set_payload(key, 0x00, args['data'])
def length(self):
return len(self.payload)
def to_raw(self):
return self.payload
def set_payload(self, key, direction, data):
self.payload = self.encrypt_payload(key, direction, data)
def compute_mic(self, key, direction, mhdr):
mic = [0x49]
mic += [0x00, 0x00, 0x00, 0x00]
mic += [direction]
mic += self.mac_payload.get_fhdr().get_devaddr()
mic += self.mac_payload.get_fhdr().get_fcnt()
mic += [0x00]
mic += [0x00]
mic += [0x00]
mic += [1 + self.mac_payload.length()]
mic += [mhdr.to_raw()]
mic += self.mac_payload.to_raw()
cmac = AES_CMAC()
computed_mic = cmac.encode(bytes(key), bytes(mic))[:4]
return list(map(int, computed_mic))
def decrypt_payload(self, key, direction, mic):
print(self.payload, key, direction, mic)
k = int(math.ceil(len(self.payload) / 16.0))
a = []
for i in range(k):
a += [0x01]
a += [0x00, 0x00, 0x00, 0x00]
a += [direction]
a += self.mac_payload.get_fhdr().get_devaddr()
a += self.mac_payload.get_fhdr().get_fcnt()
a += [0x00] # fcnt 32bit
a += [0x00] # fcnt 32bit
a += [0x00]
a += [i+1]
cipher = ucryptolib.aes(bytearray(key), MODE_ECB)
s = cipher.encrypt(bytes(a))
padded_payload = []
for i in range(k):
idx = (i + 1) * 16
padded_payload += (self.payload[idx - 16:idx] + ([0x00] * 16))[:16]
payload = []
for i in range(len(self.payload)):
print(s[i], padded_payload[i], s[i] ^ padded_payload[i])
payload += [s[i] ^ padded_payload[i]]
return list(map(int, payload))
def encrypt_payload(self, key, direction, data):
k = int(math.ceil(len(data) / 16.0))
a = []
for i in range(k):
a += [0x01]
a += [0x00, 0x00, 0x00, 0x00]
a += [direction]
a += self.mac_payload.get_fhdr().get_devaddr()
a += self.mac_payload.get_fhdr().get_fcnt()
a += [0x00] # fcnt 32bit
a += [0x00] # fcnt 32bit
a += [0x00]
a += [i+1]
cipher = ucryptolib.aes(bytearray(key), MODE_ECB)
s = cipher.encrypt(bytes(a))
padded_payload = []
for i in range(k):
idx = (i + 1) * 16
padded_payload += (data[idx - 16:idx] + ([0x00] * 16))[:16]
payload = []
for i in range(len(data)):
payload += [s[i] ^ padded_payload[i]]
return list(map(int, payload))

View File

@ -0,0 +1,25 @@
from .MHDR import MHDR
class Direction:
UP = 0x00
DOWN = 0x01
DIRECTION = {
MHDR.JOIN_REQUEST: UP,
MHDR.JOIN_ACCEPT: DOWN,
MHDR.UNCONF_DATA_UP: UP,
MHDR.UNCONF_DATA_DOWN: DOWN,
MHDR.CONF_DATA_UP: UP,
MHDR.CONF_DATA_DOWN: DOWN,
MHDR.RFU: UP,
MHDR.PROPRIETARY: UP }
def __init__(self, mhdr):
self.set(mhdr)
def get(self):
return self.direction
def set(self, mhdr):
self.direction = self.DIRECTION[mhdr.get_mtype()]

View File

@ -0,0 +1,66 @@
#
# fhdr: devaddr(4) fctrl(1) fcnt(2) fopts(0..N)
#
from .MalformedPacketException import MalformedPacketException
from .MHDR import MHDR
#from struct import unpack
class FHDR:
def read(self, mac_payload):
if len(mac_payload) < 7:
raise MalformedPacketException("Invalid fhdr")
self.devaddr = mac_payload[:4]
self.fctrl = mac_payload[4]
self.fcnt = mac_payload[5:7]
self.fopts = mac_payload[7:7 + (self.fctrl & 0xf)]
def create(self, mtype, args):
self.devaddr = [0x00, 0x00, 0x00, 0x00]
self.fctrl = 0x00
if 'fcnt' in args:
self.fcnt = args['fcnt'].to_bytes(2, 'little')
else:
self.fcnt = [0x00, 0x00]
self.fopts = []
if mtype == MHDR.UNCONF_DATA_UP or mtype == MHDR.UNCONF_DATA_DOWN or\
mtype == MHDR.CONF_DATA_UP or mtype == MHDR.CONF_DATA_DOWN:
self.devaddr = list(reversed(args['devaddr']))
def length(self):
return 4 + 1 + 2 + (self.fctrl & 0xf)
def to_raw(self):
fhdr = []
fhdr += self.devaddr
fhdr += [self.fctrl]
fhdr += self.fcnt
if self.fopts:
fhdr += self.fopts
return fhdr
def get_devaddr(self):
return self.devaddr
def set_devaddr(self, devaddr):
self.devaddr = devaddr
def get_fctrl(self):
return self.fctrl
def set_fctrl(self, fctrl):
self.fctrl = fctrl
def get_fcnt(self):
return self.fcnt
def set_fcnt(self, fcnt):
self.fcnt = fcnt
def get_fopts(self):
return self.fopts
def set_fopts(self, fopts):
self.fopts = fopts

View File

@ -0,0 +1,100 @@
#
# frm_payload: appnonce(3) netid(3) devaddr(4) dlsettings(1) rxdelay(1) cflist(0..16)
#
from .MalformedPacketException import MalformedPacketException
from .AES_CMAC import AES_CMAC
import ucryptolib
MODE_ECB = 1
class JoinAcceptPayload:
def read(self, payload):
if len(payload) < 12:
raise MalformedPacketException("Invalid join accept")
self.encrypted_payload = payload
def create(self, args):
pass
def length(self):
return len(self.encrypted_payload)
def to_raw(self):
return self.encrypted_payload
def to_clear_raw(self):
return self.payload
def get_appnonce(self):
return self.appnonce
def get_netid(self):
return self.netid
def get_devaddr(self):
return list(map(int, reversed(self.devaddr)))
def get_dlsettings(self):
return self.dlsettings
def get_rxdelay(self):
return self.rxdelay
def get_cflist(self):
return self.cflist
def compute_mic(self, key, direction, mhdr):
mic = []
mic += [mhdr.to_raw()]
mic += self.to_clear_raw()
cmac = AES_CMAC()
computed_mic = cmac.encode(bytes(key), bytes(mic))[:4]
return list(map(int, computed_mic))
def decrypt_payload(self, key, direction, mic):
a = []
a += self.encrypted_payload
a += mic
cipher = ucryptolib.aes(bytearray(key), MODE_ECB)
self.payload = cipher.encrypt(bytes(a))[:-4]
self.appnonce = self.payload[:3]
self.netid = self.payload[3:6]
self.devaddr = self.payload[6:10]
self.dlsettings = self.payload[10]
self.rxdelay = self.payload[11]
self.cflist = None
if self.payload[12:]:
self.cflist = self.payload[12:]
return list(map(int, self.payload))
def encrypt_payload(self, key, direction, mhdr):
a = []
a += self.to_clear_raw()
a += self.compute_mic(key, direction, mhdr)
cipher = ucryptolib.aes(bytearray(key), MODE_ECB)
return list(map(int, cipher.decrypt(bytes(a))))
def derive_nwskey(self, key, devnonce):
a = [0x01]
a += self.get_appnonce()
a += self.get_netid()
a += devnonce
a += [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
cipher = ucryptolib.aes(bytearray(key), MODE_ECB)
return list(map(int, cipher.encrypt(bytes(a))))
def derive_appskey(self, key, devnonce):
a = [0x02]
a += self.get_appnonce()
a += self.get_netid()
a += devnonce
a += [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
cipher = ucryptolib.aes(bytearray(key), MODE_ECB)
return list(map(int, cipher.encrypt(bytes(a))))

View File

@ -0,0 +1,50 @@
#
# frm_payload: appeui(8) deveui(8) devnonce(2)
#
from .MalformedPacketException import MalformedPacketException
from .AES_CMAC import AES_CMAC
class JoinRequestPayload:
def read(self, payload):
if len(payload) != 18:
raise MalformedPacketException("Invalid join request")
self.deveui = payload[8:16]
self.appeui = payload[:8]
self.devnonce = payload[16:18]
def create(self, args):
self.deveui = list(reversed(args['deveui']))
self.appeui = list(reversed(args['appeui']))
self.devnonce = args['devnonce']
def length(self):
return 18
def to_raw(self):
payload = []
payload += self.appeui
payload += self.deveui
payload += self.devnonce
return payload
def get_appeui(self):
return self.appeui
def get_deveui(self):
return self.deveui
def get_devnonce(self):
return self.devnonce
def compute_mic(self, key, direction, mhdr):
mic = [mhdr.to_raw()]
mic += self.to_raw()
cmac = AES_CMAC()
computed_mic = cmac.encode(bytes(key), bytes(mic))[:4]
return list(map(int, computed_mic))
def decrypt_payload(self, key, direction, mic):
return self.to_raw()

View File

@ -0,0 +1,34 @@
from .MalformedPacketException import MalformedPacketException
class MHDR:
LORAWAN_V1 = 0x00
MHDR_TYPE = 0xE0
MHDR_RFU = 0x1C
MHDR_MAJOR = 0x03
JOIN_REQUEST = 0x00
JOIN_ACCEPT = 0x20
UNCONF_DATA_UP = 0x40
UNCONF_DATA_DOWN = 0x60
CONF_DATA_UP = 0x80
CONF_DATA_DOWN = 0xA0
RFU = 0xC0 # rejoin for roaming
PROPRIETARY = 0xE0
def __init__(self, mhdr):
self.mhdr = mhdr
mversion = mhdr & self.MHDR_MAJOR
if mversion != self.LORAWAN_V1:
raise MalformedPacketException("Invalid major version")
def to_raw(self):
return self.mhdr
def get_mversion(self):
return self.mhdr & self.MHDR_MAJOR
def get_mtype(self):
return self.mhdr & self.MHDR_TYPE

View File

@ -0,0 +1,78 @@
#
# mac_payload: fhdr(7..23) fport(1) frm_payload(0..N)
#
from .MalformedPacketException import MalformedPacketException
from .FHDR import FHDR
from .MHDR import MHDR
from .JoinRequestPayload import JoinRequestPayload
from .JoinAcceptPayload import JoinAcceptPayload
from .DataPayload import DataPayload
class MacPayload:
def read(self, mtype, mac_payload):
if len(mac_payload) < 1:
raise MalformedPacketException("Invalid mac payload")
self.fhdr = FHDR()
self.fhdr.read(mac_payload)
self.fport = mac_payload[self.fhdr.length()]
self.frm_payload = None
if mtype == MHDR.JOIN_REQUEST:
self.frm_payload = JoinRequestPayload()
self.frm_payload.read(mac_payload)
if mtype == MHDR.JOIN_ACCEPT:
self.frm_payload = JoinAcceptPayload()
self.frm_payload.read(mac_payload)
if mtype == MHDR.UNCONF_DATA_UP or mtype == MHDR.UNCONF_DATA_DOWN or\
mtype == MHDR.CONF_DATA_UP or mtype == MHDR.CONF_DATA_DOWN:
self.frm_payload = DataPayload()
self.frm_payload.read(self, mac_payload[self.fhdr.length() + 1:])
def create(self, mtype, key, args):
self.fhdr = FHDR()
self.fhdr.create(mtype, args)
self.fport = 0x01
self.frm_payload = None
if mtype == MHDR.JOIN_REQUEST:
self.frm_payload = JoinRequestPayload()
self.frm_payload.create(args)
if mtype == MHDR.JOIN_ACCEPT:
self.frm_payload = JoinAcceptPayload()
self.frm_payload.create(args)
if mtype == MHDR.UNCONF_DATA_UP or mtype == MHDR.UNCONF_DATA_DOWN or\
mtype == MHDR.CONF_DATA_UP or mtype == MHDR.CONF_DATA_DOWN:
self.frm_payload = DataPayload()
self.frm_payload.create(self, key, args)
def length(self):
return len(self.to_raw())
def to_raw(self):
mac_payload = []
if self.fhdr.get_devaddr() != [0x00, 0x00, 0x00, 0x00]:
mac_payload += self.fhdr.to_raw()
if self.frm_payload != None:
if self.fhdr.get_devaddr() != [0x00, 0x00, 0x00, 0x00]:
mac_payload += [self.fport]
mac_payload += self.frm_payload.to_raw()
return mac_payload
def get_fhdr(self):
return self.fhdr
def set_fhdr(self, fhdr):
self.fhdr = fhdr
def get_fport(self):
return self.fport
def set_fport(self, fport):
self.fport = fport
def get_frm_payload(self):
return self.frm_payload
def set_frm_payload(self, frm_payload):
self.frm_payload = frm_payload

View File

@ -0,0 +1,2 @@
class MalformedPacketException(Exception):
"""Custom Exception for LoRaWAN packet info"""

View File

@ -0,0 +1,94 @@
#
# lorawan packet: mhdr(1) mac_payload(1..N) mic(4)
#
from .MalformedPacketException import MalformedPacketException
from .MHDR import MHDR
from .Direction import Direction
from .MacPayload import MacPayload
class PhyPayload:
def __init__(self, nwkey, appkey):
self.nwkey = nwkey
self.appkey = appkey
def read(self, packet):
if len(packet) < 12:
raise MalformedPacketException("Invalid lorawan packet")
self.mhdr = MHDR(packet[0])
self.set_direction()
self.mac_payload = MacPayload()
self.mac_payload.read(self.get_mhdr().get_mtype(), packet[1:-4])
self.mic = packet[-4:]
def create(self, mhdr, args):
self.mhdr = MHDR(mhdr)
self.set_direction()
self.mac_payload = MacPayload()
self.mac_payload.create(self.get_mhdr().get_mtype(), self.appkey, args)
self.mic = None
def length(self):
return len(self.to_raw())
def to_raw(self):
phy_payload = [self.get_mhdr().to_raw()]
phy_payload += self.mac_payload.to_raw()
phy_payload += self.get_mic()
return phy_payload
def get_mhdr(self):
return self.mhdr
def set_mhdr(self, mhdr):
self.mhdr = mhdr
def get_direction(self):
return self.direction.get()
def set_direction(self):
self.direction = Direction(self.get_mhdr())
def get_mac_payload(self):
return self.mac_payload
def set_mac_payload(self, mac_payload):
self.mac_payload = mac_payload
def get_mic(self):
if self.mic == None:
self.set_mic(self.compute_mic())
return self.mic
def set_mic(self, mic):
self.mic = mic
def compute_mic(self):
if self.get_mhdr().get_mtype() == MHDR.JOIN_ACCEPT:
return self.mac_payload.frm_payload.encrypt_payload(self.appkey, self.get_direction(), self.get_mhdr())[-4:]
else:
return self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr())
def valid_mic(self):
if self.get_mhdr().get_mtype() == MHDR.JOIN_ACCEPT:
return self.get_mic() == self.mac_payload.frm_payload.encrypt_payload(self.appkey, self.get_direction(), self.get_mhdr())[-4:]
else:
return self.get_mic() == self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr())
def get_devaddr(self):
if self.get_mhdr().get_mtype() == MHDR.JOIN_ACCEPT:
return self.mac_payload.frm_payload.get_devaddr()
else:
return self.mac_payload.fhdr.get_devaddr()
def get_payload(self):
return self.mac_payload.frm_payload.decrypt_payload(self.appkey, self.get_direction(), self.mic)
def derive_nwskey(self, devnonce):
return self.mac_payload.frm_payload.derive_nwskey(self.appkey, devnonce)
def derive_appskey(self, devnonce):
return self.mac_payload.frm_payload.derive_appskey(self.appkey, devnonce)

View File

@ -0,0 +1,10 @@
"""
This is a port of library LoRaWAN to MicroPython using library ucryptolib.
"""
from .PhyPayload import PhyPayload
def new(nwkey=[], appkey=[]):
return PhyPayload(nwkey, appkey)