From e02cdb5242849c1ca1e7e80cd0c3127df3a53c0e Mon Sep 17 00:00:00 2001 From: spirkelmann Date: Sat, 30 Nov 2019 20:19:57 +0100 Subject: [PATCH] added uLoRaWAN library --- micropython/uLoRaWAN/AES_CMAC.py | 75 +++++++++++++ micropython/uLoRaWAN/DataPayload.py | 104 ++++++++++++++++++ micropython/uLoRaWAN/Direction.py | 25 +++++ micropython/uLoRaWAN/FHDR.py | 66 +++++++++++ micropython/uLoRaWAN/JoinAcceptPayload.py | 100 +++++++++++++++++ micropython/uLoRaWAN/JoinRequestPayload.py | 50 +++++++++ micropython/uLoRaWAN/MHDR.py | 34 ++++++ micropython/uLoRaWAN/MacPayload.py | 78 +++++++++++++ .../uLoRaWAN/MalformedPacketException.py | 2 + micropython/uLoRaWAN/PhyPayload.py | 94 ++++++++++++++++ micropython/uLoRaWAN/__init__.py | 10 ++ 11 files changed, 638 insertions(+) create mode 100644 micropython/uLoRaWAN/AES_CMAC.py create mode 100644 micropython/uLoRaWAN/DataPayload.py create mode 100644 micropython/uLoRaWAN/Direction.py create mode 100644 micropython/uLoRaWAN/FHDR.py create mode 100644 micropython/uLoRaWAN/JoinAcceptPayload.py create mode 100644 micropython/uLoRaWAN/JoinRequestPayload.py create mode 100644 micropython/uLoRaWAN/MHDR.py create mode 100644 micropython/uLoRaWAN/MacPayload.py create mode 100644 micropython/uLoRaWAN/MalformedPacketException.py create mode 100644 micropython/uLoRaWAN/PhyPayload.py create mode 100644 micropython/uLoRaWAN/__init__.py diff --git a/micropython/uLoRaWAN/AES_CMAC.py b/micropython/uLoRaWAN/AES_CMAC.py new file mode 100644 index 0000000..659c4da --- /dev/null +++ b/micropython/uLoRaWAN/AES_CMAC.py @@ -0,0 +1,75 @@ +from struct import pack, unpack +import ucryptolib +MODE_ECB = 1 + + +class AES_CMAC: + def gen_subkey(self, K): + AES_128 = ucryptolib.aes(bytearray(K), MODE_ECB) + + L = AES_128.encrypt(('\x00'*16).encode()) + + LHigh = unpack('>Q',L[:8])[0] + LLow = unpack('>Q',L[8:])[0] + + K1High = ((LHigh << 1) | ( LLow >> 63 )) & 0xFFFFFFFFFFFFFFFF + K1Low = (LLow << 1) & 0xFFFFFFFFFFFFFFFF + + if (LHigh >> 63): + K1Low ^= 0x87 + + K2High = ((K1High << 1) | (K1Low >> 63)) & 0xFFFFFFFFFFFFFFFF + K2Low = ((K1Low << 1)) & 0xFFFFFFFFFFFFFFFF + + if (K1High >> 63): + K2Low ^= 0x87 + + K1 = pack('>QQ', K1High, K1Low) + K2 = pack('>QQ', K2High, K2Low) + + return K1, K2 + + def xor_128(self, N1, N2): + J = b'' + for i in range(len(N1)): + J += bytes([N1[i] ^ N2[i]]) + return J + + def pad(self, N): + const_Bsize = 16 + padLen = 16-len(N) + return N + b'\x80' + b'\x00'*(padLen-1) + + def encode(self, K, M): + const_Bsize = 16 + const_Zero = b'\x00'*16 + + AES_128 = ucryptolib.aes(bytearray(K), MODE_ECB) + + K1, K2 = self.gen_subkey(K) + n = int(len(M)/const_Bsize) + + if n == 0: + n = 1 + flag = False + else: + if (len(M) % const_Bsize) == 0: + flag = True + else: + n += 1 + flag = False + + M_n = M[(n-1)*const_Bsize:] + if flag is True: + M_last = self.xor_128(M_n,K1) + else: + M_last = self.xor_128(self.pad(M_n),K2) + + X = const_Zero + for i in range(n-1): + M_i = M[(i)*const_Bsize:][:16] + Y = self.xor_128(X, M_i) + X = AES_128.encrypt(Y) + Y = self.xor_128(M_last, X) + T = AES_128.encrypt(Y) + return T diff --git a/micropython/uLoRaWAN/DataPayload.py b/micropython/uLoRaWAN/DataPayload.py new file mode 100644 index 0000000..db95df0 --- /dev/null +++ b/micropython/uLoRaWAN/DataPayload.py @@ -0,0 +1,104 @@ +# +# frm_payload: data(0..N) +# +from .AES_CMAC import AES_CMAC +import math +import ucryptolib +MODE_ECB = 1 + + +class DataPayload: + + def read(self, mac_payload, payload): + self.mac_payload = mac_payload + self.payload = payload + + def create(self, mac_payload, key, args): + self.mac_payload = mac_payload + self.set_payload(key, 0x00, args['data']) + + def length(self): + return len(self.payload) + + def to_raw(self): + return self.payload + + def set_payload(self, key, direction, data): + self.payload = self.encrypt_payload(key, direction, data) + + def compute_mic(self, key, direction, mhdr): + mic = [0x49] + mic += [0x00, 0x00, 0x00, 0x00] + mic += [direction] + mic += self.mac_payload.get_fhdr().get_devaddr() + mic += self.mac_payload.get_fhdr().get_fcnt() + mic += [0x00] + mic += [0x00] + mic += [0x00] + mic += [1 + self.mac_payload.length()] + mic += [mhdr.to_raw()] + mic += self.mac_payload.to_raw() + + cmac = AES_CMAC() + computed_mic = cmac.encode(bytes(key), bytes(mic))[:4] + return list(map(int, computed_mic)) + + def decrypt_payload(self, key, direction, mic): + print(self.payload, key, direction, mic) + k = int(math.ceil(len(self.payload) / 16.0)) + + a = [] + for i in range(k): + a += [0x01] + a += [0x00, 0x00, 0x00, 0x00] + a += [direction] + a += self.mac_payload.get_fhdr().get_devaddr() + a += self.mac_payload.get_fhdr().get_fcnt() + a += [0x00] # fcnt 32bit + a += [0x00] # fcnt 32bit + a += [0x00] + a += [i+1] + + cipher = ucryptolib.aes(bytearray(key), MODE_ECB) + s = cipher.encrypt(bytes(a)) + + padded_payload = [] + for i in range(k): + idx = (i + 1) * 16 + padded_payload += (self.payload[idx - 16:idx] + ([0x00] * 16))[:16] + + payload = [] + + for i in range(len(self.payload)): + print(s[i], padded_payload[i], s[i] ^ padded_payload[i]) + payload += [s[i] ^ padded_payload[i]] + + return list(map(int, payload)) + + def encrypt_payload(self, key, direction, data): + k = int(math.ceil(len(data) / 16.0)) + + a = [] + for i in range(k): + a += [0x01] + a += [0x00, 0x00, 0x00, 0x00] + a += [direction] + a += self.mac_payload.get_fhdr().get_devaddr() + a += self.mac_payload.get_fhdr().get_fcnt() + a += [0x00] # fcnt 32bit + a += [0x00] # fcnt 32bit + a += [0x00] + a += [i+1] + + cipher = ucryptolib.aes(bytearray(key), MODE_ECB) + s = cipher.encrypt(bytes(a)) + + padded_payload = [] + for i in range(k): + idx = (i + 1) * 16 + padded_payload += (data[idx - 16:idx] + ([0x00] * 16))[:16] + + payload = [] + for i in range(len(data)): + payload += [s[i] ^ padded_payload[i]] + return list(map(int, payload)) diff --git a/micropython/uLoRaWAN/Direction.py b/micropython/uLoRaWAN/Direction.py new file mode 100644 index 0000000..6bfe305 --- /dev/null +++ b/micropython/uLoRaWAN/Direction.py @@ -0,0 +1,25 @@ +from .MHDR import MHDR + + +class Direction: + + UP = 0x00 + DOWN = 0x01 + DIRECTION = { + MHDR.JOIN_REQUEST: UP, + MHDR.JOIN_ACCEPT: DOWN, + MHDR.UNCONF_DATA_UP: UP, + MHDR.UNCONF_DATA_DOWN: DOWN, + MHDR.CONF_DATA_UP: UP, + MHDR.CONF_DATA_DOWN: DOWN, + MHDR.RFU: UP, + MHDR.PROPRIETARY: UP } + + def __init__(self, mhdr): + self.set(mhdr) + + def get(self): + return self.direction + + def set(self, mhdr): + self.direction = self.DIRECTION[mhdr.get_mtype()] diff --git a/micropython/uLoRaWAN/FHDR.py b/micropython/uLoRaWAN/FHDR.py new file mode 100644 index 0000000..43d2440 --- /dev/null +++ b/micropython/uLoRaWAN/FHDR.py @@ -0,0 +1,66 @@ +# +# fhdr: devaddr(4) fctrl(1) fcnt(2) fopts(0..N) +# +from .MalformedPacketException import MalformedPacketException +from .MHDR import MHDR +#from struct import unpack + + +class FHDR: + + def read(self, mac_payload): + if len(mac_payload) < 7: + raise MalformedPacketException("Invalid fhdr") + + self.devaddr = mac_payload[:4] + self.fctrl = mac_payload[4] + self.fcnt = mac_payload[5:7] + self.fopts = mac_payload[7:7 + (self.fctrl & 0xf)] + + def create(self, mtype, args): + self.devaddr = [0x00, 0x00, 0x00, 0x00] + self.fctrl = 0x00 + if 'fcnt' in args: + self.fcnt = args['fcnt'].to_bytes(2, 'little') + else: + self.fcnt = [0x00, 0x00] + self.fopts = [] + if mtype == MHDR.UNCONF_DATA_UP or mtype == MHDR.UNCONF_DATA_DOWN or\ + mtype == MHDR.CONF_DATA_UP or mtype == MHDR.CONF_DATA_DOWN: + self.devaddr = list(reversed(args['devaddr'])) + + def length(self): + return 4 + 1 + 2 + (self.fctrl & 0xf) + + def to_raw(self): + fhdr = [] + fhdr += self.devaddr + fhdr += [self.fctrl] + fhdr += self.fcnt + if self.fopts: + fhdr += self.fopts + return fhdr + + def get_devaddr(self): + return self.devaddr + + def set_devaddr(self, devaddr): + self.devaddr = devaddr + + def get_fctrl(self): + return self.fctrl + + def set_fctrl(self, fctrl): + self.fctrl = fctrl + + def get_fcnt(self): + return self.fcnt + + def set_fcnt(self, fcnt): + self.fcnt = fcnt + + def get_fopts(self): + return self.fopts + + def set_fopts(self, fopts): + self.fopts = fopts diff --git a/micropython/uLoRaWAN/JoinAcceptPayload.py b/micropython/uLoRaWAN/JoinAcceptPayload.py new file mode 100644 index 0000000..1ab7e56 --- /dev/null +++ b/micropython/uLoRaWAN/JoinAcceptPayload.py @@ -0,0 +1,100 @@ +# +# frm_payload: appnonce(3) netid(3) devaddr(4) dlsettings(1) rxdelay(1) cflist(0..16) +# +from .MalformedPacketException import MalformedPacketException +from .AES_CMAC import AES_CMAC +import ucryptolib +MODE_ECB = 1 + +class JoinAcceptPayload: + + def read(self, payload): + if len(payload) < 12: + raise MalformedPacketException("Invalid join accept") + self.encrypted_payload = payload + + def create(self, args): + pass + + def length(self): + return len(self.encrypted_payload) + + def to_raw(self): + return self.encrypted_payload + + def to_clear_raw(self): + return self.payload + + def get_appnonce(self): + return self.appnonce + + def get_netid(self): + return self.netid + + def get_devaddr(self): + return list(map(int, reversed(self.devaddr))) + + def get_dlsettings(self): + return self.dlsettings + + def get_rxdelay(self): + return self.rxdelay + + def get_cflist(self): + return self.cflist + + def compute_mic(self, key, direction, mhdr): + mic = [] + mic += [mhdr.to_raw()] + mic += self.to_clear_raw() + + cmac = AES_CMAC() + computed_mic = cmac.encode(bytes(key), bytes(mic))[:4] + return list(map(int, computed_mic)) + + def decrypt_payload(self, key, direction, mic): + a = [] + a += self.encrypted_payload + a += mic + + cipher = ucryptolib.aes(bytearray(key), MODE_ECB) + self.payload = cipher.encrypt(bytes(a))[:-4] + + self.appnonce = self.payload[:3] + self.netid = self.payload[3:6] + self.devaddr = self.payload[6:10] + self.dlsettings = self.payload[10] + self.rxdelay = self.payload[11] + self.cflist = None + if self.payload[12:]: + self.cflist = self.payload[12:] + + return list(map(int, self.payload)) + + def encrypt_payload(self, key, direction, mhdr): + a = [] + a += self.to_clear_raw() + a += self.compute_mic(key, direction, mhdr) + + cipher = ucryptolib.aes(bytearray(key), MODE_ECB) + return list(map(int, cipher.decrypt(bytes(a)))) + + def derive_nwskey(self, key, devnonce): + a = [0x01] + a += self.get_appnonce() + a += self.get_netid() + a += devnonce + a += [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + + cipher = ucryptolib.aes(bytearray(key), MODE_ECB) + return list(map(int, cipher.encrypt(bytes(a)))) + + def derive_appskey(self, key, devnonce): + a = [0x02] + a += self.get_appnonce() + a += self.get_netid() + a += devnonce + a += [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + + cipher = ucryptolib.aes(bytearray(key), MODE_ECB) + return list(map(int, cipher.encrypt(bytes(a)))) diff --git a/micropython/uLoRaWAN/JoinRequestPayload.py b/micropython/uLoRaWAN/JoinRequestPayload.py new file mode 100644 index 0000000..9b5c5da --- /dev/null +++ b/micropython/uLoRaWAN/JoinRequestPayload.py @@ -0,0 +1,50 @@ +# +# frm_payload: appeui(8) deveui(8) devnonce(2) +# +from .MalformedPacketException import MalformedPacketException +from .AES_CMAC import AES_CMAC + + +class JoinRequestPayload: + + def read(self, payload): + if len(payload) != 18: + raise MalformedPacketException("Invalid join request") + self.deveui = payload[8:16] + self.appeui = payload[:8] + self.devnonce = payload[16:18] + + def create(self, args): + self.deveui = list(reversed(args['deveui'])) + self.appeui = list(reversed(args['appeui'])) + self.devnonce = args['devnonce'] + + def length(self): + return 18 + + def to_raw(self): + payload = [] + payload += self.appeui + payload += self.deveui + payload += self.devnonce + return payload + + def get_appeui(self): + return self.appeui + + def get_deveui(self): + return self.deveui + + def get_devnonce(self): + return self.devnonce + + def compute_mic(self, key, direction, mhdr): + mic = [mhdr.to_raw()] + mic += self.to_raw() + + cmac = AES_CMAC() + computed_mic = cmac.encode(bytes(key), bytes(mic))[:4] + return list(map(int, computed_mic)) + + def decrypt_payload(self, key, direction, mic): + return self.to_raw() diff --git a/micropython/uLoRaWAN/MHDR.py b/micropython/uLoRaWAN/MHDR.py new file mode 100644 index 0000000..fb78de3 --- /dev/null +++ b/micropython/uLoRaWAN/MHDR.py @@ -0,0 +1,34 @@ +from .MalformedPacketException import MalformedPacketException + + +class MHDR: + + LORAWAN_V1 = 0x00 + + MHDR_TYPE = 0xE0 + MHDR_RFU = 0x1C + MHDR_MAJOR = 0x03 + + JOIN_REQUEST = 0x00 + JOIN_ACCEPT = 0x20 + UNCONF_DATA_UP = 0x40 + UNCONF_DATA_DOWN = 0x60 + CONF_DATA_UP = 0x80 + CONF_DATA_DOWN = 0xA0 + RFU = 0xC0 # rejoin for roaming + PROPRIETARY = 0xE0 + + def __init__(self, mhdr): + self.mhdr = mhdr + mversion = mhdr & self.MHDR_MAJOR + if mversion != self.LORAWAN_V1: + raise MalformedPacketException("Invalid major version") + + def to_raw(self): + return self.mhdr + + def get_mversion(self): + return self.mhdr & self.MHDR_MAJOR + + def get_mtype(self): + return self.mhdr & self.MHDR_TYPE diff --git a/micropython/uLoRaWAN/MacPayload.py b/micropython/uLoRaWAN/MacPayload.py new file mode 100644 index 0000000..5a15e6e --- /dev/null +++ b/micropython/uLoRaWAN/MacPayload.py @@ -0,0 +1,78 @@ +# +# mac_payload: fhdr(7..23) fport(1) frm_payload(0..N) +# +from .MalformedPacketException import MalformedPacketException +from .FHDR import FHDR +from .MHDR import MHDR +from .JoinRequestPayload import JoinRequestPayload +from .JoinAcceptPayload import JoinAcceptPayload +from .DataPayload import DataPayload + + +class MacPayload: + + def read(self, mtype, mac_payload): + if len(mac_payload) < 1: + raise MalformedPacketException("Invalid mac payload") + + self.fhdr = FHDR() + self.fhdr.read(mac_payload) + self.fport = mac_payload[self.fhdr.length()] + self.frm_payload = None + if mtype == MHDR.JOIN_REQUEST: + self.frm_payload = JoinRequestPayload() + self.frm_payload.read(mac_payload) + if mtype == MHDR.JOIN_ACCEPT: + self.frm_payload = JoinAcceptPayload() + self.frm_payload.read(mac_payload) + if mtype == MHDR.UNCONF_DATA_UP or mtype == MHDR.UNCONF_DATA_DOWN or\ + mtype == MHDR.CONF_DATA_UP or mtype == MHDR.CONF_DATA_DOWN: + self.frm_payload = DataPayload() + self.frm_payload.read(self, mac_payload[self.fhdr.length() + 1:]) + + def create(self, mtype, key, args): + self.fhdr = FHDR() + self.fhdr.create(mtype, args) + self.fport = 0x01 + self.frm_payload = None + if mtype == MHDR.JOIN_REQUEST: + self.frm_payload = JoinRequestPayload() + self.frm_payload.create(args) + if mtype == MHDR.JOIN_ACCEPT: + self.frm_payload = JoinAcceptPayload() + self.frm_payload.create(args) + if mtype == MHDR.UNCONF_DATA_UP or mtype == MHDR.UNCONF_DATA_DOWN or\ + mtype == MHDR.CONF_DATA_UP or mtype == MHDR.CONF_DATA_DOWN: + self.frm_payload = DataPayload() + self.frm_payload.create(self, key, args) + + def length(self): + return len(self.to_raw()) + + def to_raw(self): + mac_payload = [] + if self.fhdr.get_devaddr() != [0x00, 0x00, 0x00, 0x00]: + mac_payload += self.fhdr.to_raw() + if self.frm_payload != None: + if self.fhdr.get_devaddr() != [0x00, 0x00, 0x00, 0x00]: + mac_payload += [self.fport] + mac_payload += self.frm_payload.to_raw() + return mac_payload + + def get_fhdr(self): + return self.fhdr + + def set_fhdr(self, fhdr): + self.fhdr = fhdr + + def get_fport(self): + return self.fport + + def set_fport(self, fport): + self.fport = fport + + def get_frm_payload(self): + return self.frm_payload + + def set_frm_payload(self, frm_payload): + self.frm_payload = frm_payload diff --git a/micropython/uLoRaWAN/MalformedPacketException.py b/micropython/uLoRaWAN/MalformedPacketException.py new file mode 100644 index 0000000..a6f6f2c --- /dev/null +++ b/micropython/uLoRaWAN/MalformedPacketException.py @@ -0,0 +1,2 @@ +class MalformedPacketException(Exception): + """Custom Exception for LoRaWAN packet info""" diff --git a/micropython/uLoRaWAN/PhyPayload.py b/micropython/uLoRaWAN/PhyPayload.py new file mode 100644 index 0000000..cd4f5c5 --- /dev/null +++ b/micropython/uLoRaWAN/PhyPayload.py @@ -0,0 +1,94 @@ +# +# lorawan packet: mhdr(1) mac_payload(1..N) mic(4) +# +from .MalformedPacketException import MalformedPacketException +from .MHDR import MHDR +from .Direction import Direction +from .MacPayload import MacPayload + + +class PhyPayload: + + def __init__(self, nwkey, appkey): + self.nwkey = nwkey + self.appkey = appkey + + def read(self, packet): + if len(packet) < 12: + raise MalformedPacketException("Invalid lorawan packet") + + self.mhdr = MHDR(packet[0]) + self.set_direction() + self.mac_payload = MacPayload() + self.mac_payload.read(self.get_mhdr().get_mtype(), packet[1:-4]) + self.mic = packet[-4:] + + + def create(self, mhdr, args): + self.mhdr = MHDR(mhdr) + self.set_direction() + self.mac_payload = MacPayload() + self.mac_payload.create(self.get_mhdr().get_mtype(), self.appkey, args) + self.mic = None + + def length(self): + return len(self.to_raw()) + + def to_raw(self): + phy_payload = [self.get_mhdr().to_raw()] + phy_payload += self.mac_payload.to_raw() + phy_payload += self.get_mic() + return phy_payload + + def get_mhdr(self): + return self.mhdr + + def set_mhdr(self, mhdr): + self.mhdr = mhdr + + def get_direction(self): + return self.direction.get() + + def set_direction(self): + self.direction = Direction(self.get_mhdr()) + + def get_mac_payload(self): + return self.mac_payload + + def set_mac_payload(self, mac_payload): + self.mac_payload = mac_payload + + def get_mic(self): + if self.mic == None: + self.set_mic(self.compute_mic()) + return self.mic + + def set_mic(self, mic): + self.mic = mic + + def compute_mic(self): + if self.get_mhdr().get_mtype() == MHDR.JOIN_ACCEPT: + return self.mac_payload.frm_payload.encrypt_payload(self.appkey, self.get_direction(), self.get_mhdr())[-4:] + else: + return self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr()) + + def valid_mic(self): + if self.get_mhdr().get_mtype() == MHDR.JOIN_ACCEPT: + return self.get_mic() == self.mac_payload.frm_payload.encrypt_payload(self.appkey, self.get_direction(), self.get_mhdr())[-4:] + else: + return self.get_mic() == self.mac_payload.frm_payload.compute_mic(self.nwkey, self.get_direction(), self.get_mhdr()) + + def get_devaddr(self): + if self.get_mhdr().get_mtype() == MHDR.JOIN_ACCEPT: + return self.mac_payload.frm_payload.get_devaddr() + else: + return self.mac_payload.fhdr.get_devaddr() + + def get_payload(self): + return self.mac_payload.frm_payload.decrypt_payload(self.appkey, self.get_direction(), self.mic) + + def derive_nwskey(self, devnonce): + return self.mac_payload.frm_payload.derive_nwskey(self.appkey, devnonce) + + def derive_appskey(self, devnonce): + return self.mac_payload.frm_payload.derive_appskey(self.appkey, devnonce) diff --git a/micropython/uLoRaWAN/__init__.py b/micropython/uLoRaWAN/__init__.py new file mode 100644 index 0000000..99a3b13 --- /dev/null +++ b/micropython/uLoRaWAN/__init__.py @@ -0,0 +1,10 @@ + +""" +This is a port of library LoRaWAN to MicroPython using library ucryptolib. + +""" +from .PhyPayload import PhyPayload + + +def new(nwkey=[], appkey=[]): + return PhyPayload(nwkey, appkey)