"""
Main NMEA GNSS/GPS Message Protocol Class.
Created on 04 Mar 2021
:author: semuadmin
:copyright: SEMU Consulting © 2021
:license: BSD 3-Clause
"""
# pylint: disable=invalid-name, too-many-instance-attributes, too-many-positional-arguments
import struct
from datetime import datetime, timezone
from logging import getLogger
import pynmeagps.exceptions as nme
import pynmeagps.nmeatypes_core as nmt
import pynmeagps.nmeatypes_get as nmg
import pynmeagps.nmeatypes_get_prop as nmgp
import pynmeagps.nmeatypes_poll as nmp
import pynmeagps.nmeatypes_set as nms
from pynmeagps.nmeahelpers import (
date2str,
date2utc,
ddd2dmm,
dmm2ddd,
generate_checksum,
time2str,
time2utc,
)
[docs]
class NMEAMessage:
"""NMEA GNSS/GPS Message Class."""
[docs]
def __init__(
self,
talker: str,
msgID: str,
msgmode: int,
hpnmeamode: bool = False,
validate: int = nmt.VALCKSUM,
userdefined: dict = None,
**kwargs,
):
"""Constructor.
If 'payload' is passed as a keyword arg, this is taken to contain the entire
message content as a list of string values; any other keyword args are ignored.
Otherwise, any individual attributes passed as keyword args will be set to the
value provided, all others will be assigned a nominal value according to type.
:param str talker: message talker e.g. "GP" or "P"
:param str msgID: message ID e.g. "GGA"
:param int msgmode: mode (0=GET, 1=SET, 2=POLL)
:param bool hpnmeamode: high precision lat/lon mode (7dp rather than 5dp) (False)
:param int validate: VALNONE (0), VALCKSUM (1), VALMSGID (2),
(can be OR'd) (1)
:param dict userdefined: user-defined payload definition dictionary (None)
:param kwargs: keyword arg(s) representing all or some payload attributes
:raises: NMEAMessageError
"""
# object is mutable during initialisation only
super().__setattr__("_immutable", False)
self._logger = getLogger(__name__)
self._validate = validate
self._userdefined = {} if userdefined is None else userdefined
if msgmode not in (0, 1, 2):
raise nme.NMEAMessageError(
f"Invalid msgmode {msgmode} - must be 0, 1 or 2."
)
if talker not in nmt.NMEA_TALKERS:
if self._validate & nmt.VALMSGID:
raise nme.NMEAMessageError(f"Unknown talker {talker}.")
if msgID in nmt.NMEA_MSGIDS:
self._defsource = nmt.DEF_STND # standard
elif msgID in nmt.NMEA_MSGIDS_PROP or msgID in nmt.NMEA_PREFIX_PROP:
self._defsource = nmt.DEF_PROP # proprietary
elif msgID in self._userdefined:
self._defsource = nmt.DEF_USER # user-defined
else:
self._defsource = nmt.DEF_UNKN # unrecognised
if self._validate & nmt.VALMSGID:
raise nme.NMEAMessageError(
f"Unknown msgID {talker}{msgID}, msgmode {('GET','SET','POLL')[msgmode]}."
)
self._mode = msgmode
# high precision NMEA mode returns NMEA lat/lon to 7dp rather than 5dp
self._hpnmeamode = hpnmeamode
self._talker = talker
self._msgID = msgID
self._do_attributes(**kwargs)
self._immutable = True # once initialised, object is immutable
def _do_attributes(self, **kwargs):
"""
Populate NMEAMessage from named attribute keywords.
Where a named attribute is absent, set to a nominal value (zeros or blanks).
:param kwargs: optional content key/value pairs
:raises: UBXTypeError
"""
pindex = 0 # payload index
gindex = [] # (nested) grouped attribute indices
try:
self._payload = kwargs.get("payload", [])
self._checksum = kwargs.get("checksum", None)
pdict = self._get_dict(**kwargs) # get payload definition dict
if pdict is None: # definition not yet implemented
if "payload" in kwargs:
self._set_attribute_nominal(kwargs["payload"])
return
for key in pdict.keys(): # process each attribute in dict
(pindex, gindex) = self._set_attribute(
pindex, pdict, key, gindex, **kwargs
)
# generate checksum for newly-created message
if self._checksum is None:
self._checksum = generate_checksum(
self._talker, self._msgID, self._payload
)
except (
AttributeError,
OverflowError,
struct.error,
TypeError,
ValueError,
) as err:
raise nme.NMEATypeError(
f"Incorrect type for attribute {key} in msgID {self._msgID}."
) from err
def _set_attribute(
self, pindex: int, pdict: dict, key: str, gindex: list, **kwargs
) -> tuple:
"""
Recursive routine to set individual or grouped payload attributes.
:param int pindex: payload index
:param dict pdict: dict representing payload definition
:param str key: attribute keyword
:param list gindex: repeating group index array
:param kwargs: optional payload key/value pairs
:return: (pindex, gindex[])
:rtype: tuple
"""
att = pdict[key] # get attribute type
if isinstance(att, tuple): # repeating group of attributes
(pindex, gindex) = self._set_attribute_group(att, pindex, gindex, **kwargs)
else: # single attribute
pindex = self._set_attribute_single(att, pindex, key, gindex, **kwargs)
return (pindex, gindex)
def _set_attribute_group(
self, att: tuple, pindex: int, gindex: list, **kwargs
) -> tuple:
"""
Process (nested) group of attributes.
:param tuple att: attribute group - tuple of (num repeats, attribute dict)
:param int pindex: payload index
:param list gindex: repeating group index array
:param kwargs: optional payload key/value pairs
:return: (pindex, gindex[])
:rtype: tuple
"""
gindex.append(0) # add a (nested) group index
numr, attd = att # number of repeats, group dictionary
# derive or retrieve number of items in group
if isinstance(numr, int): # fixed number of repeats
rng = numr
elif numr == "None": # indeterminate number of repeats
pindexend = 0 # may need tweaking
rng = self._calc_num_repeats(attd, self._payload, pindex, pindexend)
else: # number of repeats is defined in named attribute
rng = getattr(self, numr)
# recursively process each group attribute,
# incrementing the payload index and group index as we go
for i in range(rng):
gindex[-1] = i + 1
for key1 in attd.keys():
(pindex, gindex) = self._set_attribute(
pindex, attd, key1, gindex, **kwargs
)
gindex.pop() # remove this (nested) group index
return (pindex, gindex)
def _set_attribute_single(
self, att: str, pindex: int, key: str, gindex: list, **kwargs
) -> int:
"""
Set individual attribute value.
:param str att: attribute type e.g. 'NU'
:param int pindex: payload index
:param str key: attribute keyword
:param list gindex: repeating group index array
:param kwargs: optional payload key/value pairs
:return: pindex
:rtype: int
"""
# pylint: disable=no-member, access-member-before-definition, attribute-defined-outside-init
# if attribute is part of a (nested) repeating group, suffix name with group index
keyr = key
for i in gindex: # one index for each nested level
if i > 0:
keyr += f"_{i:02d}"
try:
# all attribute values have been provided
if "payload" in kwargs:
val = self._payload[pindex]
val = self.str2val(val, att)
# some attribute values have been provided,
# the rest will be set to a nominal value
else:
if att == nmt.LND and hasattr(self, "lon"):
if isinstance(self.lon, (int, float)):
val = "W" if self.lon < 0 else "E"
else: # pragma: no cover
val = "E"
elif att == nmt.LAD and hasattr(self, "lat"):
if isinstance(self.lat, (int, float)):
val = "S" if self.lat < 0 else "N"
else: # pragma: no cover
val = "N"
else:
val = kwargs.get(keyr, self.nomval(att))
vals = self.val2str(val, att, self._hpnmeamode)
self._payload.append(vals)
except (
IndexError
): # probably just an older device missing NMEA <=4.10 dict attributes
return pindex
setattr(self, keyr, val) # add attribute to NMEAMessage object
if "payload" in kwargs:
# override sign of lat/lon according to NS and EW values
if att == nmt.LND and hasattr(self, "lon"):
if isinstance(self.lon, (int, float)):
self.lon = -abs(self.lon) if val == "W" else abs(self.lon)
elif att == nmt.LAD and hasattr(self, "lat"):
if isinstance(self.lat, (int, float)):
self.lat = -abs(self.lat) if val == "S" else abs(self.lat)
pindex += 1 # move on to next attribute in payload definition
return pindex
def _set_attribute_nominal(self, payload: list):
"""
Set nominal attributes for unrecognised NMEA sentence types.
:param list payload: payload as list
"""
for i, fld in enumerate(payload):
setattr(self, f"field_{i+1:02d}", fld)
def _get_dict(self, **kwargs) -> dict:
"""
Get payload dictionary.
:return: dictionary representing payload definition
:rtype: dict
"""
try:
key = self.msgID
if key in nmt.NMEA_PREFIX_PROP: # proprietary, first element is msgId
if "payload" in kwargs:
if key == "ASHR" and self._payload[0][1].isdigit():
pass # exception for PASHR pitch and roll sentence without msgId
else:
key += self._payload[0]
elif "msgId" in kwargs:
key += kwargs["msgId"]
else:
raise nme.NMEAMessageError(
f"P{key} message definitions must "
"include payload or msgId keyword arguments."
)
key = key.upper()
if self._mode == nmt.POLL:
return nmp.NMEA_PAYLOADS_POLL[key]
if self._mode == nmt.SET:
return nms.NMEA_PAYLOADS_SET[key]
if self._defsource == nmt.DEF_PROP: # proprietary
return nmgp.NMEA_PAYLOADS_GET_PROP[key]
if self._defsource == nmt.DEF_USER: # user defined
return self._userdefined[key]
return nmg.NMEA_PAYLOADS_GET[key] # standard
except KeyError as err:
erm = f"Unknown msgID {key} msgmode {('GET', 'SET', 'POLL')[self._mode]}."
if self._validate & nmt.VALMSGID:
raise nme.NMEAMessageError(erm) from err
return None # message not yet implemented
def _calc_num_repeats(
self, attd: dict, payload: list, pindex: int, pindexend: int = 0
) -> int:
"""
Deduce number of items in repeating group.
:param dict attd: grouped attribute dictionary
:param list payload : content as list
:param int pindex: number of payload attributes before group
:param int pindexend: number of payload attributes after group
:return: number of repeats
:rtype: int
"""
lenpayload = len(payload) - pindex - pindexend
lengroup = len(attd)
return int(lenpayload / lengroup)
def __str__(self) -> str:
"""
Human readable representation.
:return: human readable representation
:rtype: str
"""
stg = f"<NMEA({self.identity}"
stg += ", "
if self._defsource == nmt.DEF_UNKN:
stg += "NOMINAL, "
for i, att in enumerate(self.__dict__):
if att[0] != "_": # only show public attributes
val = self.__dict__[att]
stg += att + "=" + str(val)
if i < len(self.__dict__) - 1:
stg += ", "
stg += ")>"
return stg
def __repr__(self) -> str:
"""
Machine readable representation.
eval(repr(obj)) = obj
:return: machine readable representation
:rtype: str
"""
return (
f"NMEAMessage('{self._talker}','{self._msgID}', "
f"{self._mode}, payload={self._payload})"
)
def __setattr__(self, name, value):
"""
Override setattr to make object immutable after instantiation.
:param str name: attribute name
:param object value: attribute value
:raises: NMEAMessageError
"""
if self._immutable:
raise nme.NMEAMessageError(
f"Object is immutable. Updates to {name} not permitted after initialisation."
)
super().__setattr__(name, value)
[docs]
def serialize(self) -> bytes:
"""
Serialize message.
:return: serialized output
:rtype: bytes
"""
output = "$" + self._talker + self._msgID + ","
for i, s in enumerate(self._payload):
output += ("," if i else "") + s
output += "*" + self._checksum + "\r\n"
return output.encode("utf-8") # convert str to bytes
@property
def identity(self) -> str:
"""
Identity getter.
:return: message identity e.g. GNGSA
:rtype: str
"""
# pylint: disable=no-member
if (
self._talker == "P"
and self._msgID in nmt.NMEA_PREFIX_PROP
and hasattr(self, "msgId")
):
return self._talker + self._msgID + self.msgId
return self._talker + self._msgID
@property
def talker(self) -> str:
"""
Talker getter.
:return: talker e.g. GN
:rtype: str
"""
return self._talker
@property
def msgID(self) -> str:
"""
Message id getter.
:return: message id e.g. GSA
:rtype: str
"""
return self._msgID
@property
def msgmode(self) -> int:
"""
Message mode getter.
:return: message mode
:rtype: int
"""
return self._mode
@property
def payload(self) -> list:
"""
Payload getter.
:return: raw payload as list of strings
:rtype: list
"""
return self._payload
@property
def checksum(self) -> str:
"""
Checksum getter.
:return: checksum as hex string
:rtype: str
"""
return self._checksum
[docs]
@staticmethod
def str2val(vals: str, att: str) -> object:
"""
Convert NMEA string to typed value
(this is the format that will be available to end users).
:param str vals: attribute value in NMEA protocol format
:param str att: attribute type e.g. 'DE'
:return: attribute value
:rtype: object
:raises: MMEATypeError
"""
val = vals
if att in (nmt.CH, nmt.ST, nmt.LAD, nmt.LND):
pass
elif att == nmt.HX:
val = vals
elif att == nmt.DE: # decimal
if vals != "":
val = float(vals)
elif att in (nmt.DT, nmt.DM): # date
val = date2utc(vals, att)
elif att == nmt.IN: # integer
if vals != "":
val = int(vals)
elif att in (nmt.LA, nmt.LN): # lat/lon (d)ddmm.mmmmm(mm)
val = dmm2ddd(vals)
elif att == nmt.TM: # time hhmmss.ss
val = time2utc(vals)
else:
raise nme.NMEATypeError(f"Unknown attribute type {att}.")
return val
[docs]
@staticmethod
def val2str(val, att: str, hpmode: bool = False) -> str:
"""
Convert typed value to NMEA string
(this is the format used internally by the NMEA protocol).
:param object val: typed attribute value
:param str att: attribute type e.g. 'IN'
:param bool hpmode: high precision lat/lon mode (7dp rather than 5dp)
:return: attribute value in NMEA protocol format
:rtype: str
:raises: NMEATypeError
"""
if att in (nmt.CH, nmt.ST, nmt.LAD, nmt.LND):
vals = str(val)
elif att == nmt.HX:
vals = str(val)
elif att == nmt.DE:
vals = str(val)
elif att == nmt.IN:
vals = str(val)
elif att in (nmt.LA, nmt.LN):
vals = ddd2dmm(val, att, hpmode)
elif att == nmt.TM:
vals = time2str(val)
elif att in (nmt.DT, nmt.DTL, nmt.DM):
vals = date2str(val, att)
else:
raise nme.NMEATypeError(f"Unknown attribute type {att}.")
return vals
[docs]
@staticmethod
def nomval(att: str) -> object:
"""
Return nominal value for specified attribute type
:param str att: attribute type e.g. 'DE'
:return: nominal value for type
:rtype: object
:raises: NMEATypeError
"""
if att in (nmt.CH, nmt.ST, nmt.LA, nmt.LN):
val = ""
elif att == nmt.LAD: # pragma: no cover
val = "N"
elif att == nmt.LND: # pragma: no cover
val = "E"
elif att == nmt.HX:
val = "0"
elif att == nmt.DE:
val = 0.0
elif att == nmt.IN:
val = 0
elif att == nmt.TM:
val = datetime.now(timezone.utc).time()
elif att in (nmt.DT, nmt.DTL, nmt.DM):
val = datetime.now(timezone.utc).date()
else:
raise nme.NMEATypeError(f"Unknown attribute type {att}.")
return val