"""
uni_handler.py
Unicore GNSS Protocol handler - handles all incoming UNI messages
Parses individual UNI (Unicore UM98n GNSS) messages (using pyunignss library)
and adds selected attribute values to the app.gnss_status
data dictionary. This dictionary is then used to periodically
update the various user-selectable widget panels.
Created on 27 Jan 2026
:author: semuadmin (Steve Smith)
:copyright: 2020 semuadmin
:license: BSD 3-Clause
"""
import logging
from time import time
from pynmeagps import wnotow2utc
from pyunigps import DEVICE, UNIMessage
from pygpsclient.helpers import fix2desc
from pygpsclient.strings import NA
SATSINFO_GNSSID = {
0: 0, # GPS
1: 6, # GLONASS
2: 1, # SBAS
3: 2, # GAL
4: 3, # BDS
5: 5, # QZSS
6: 7, # IRNSS
}
SATELLITE_GNSSID = {
0: 0, # GPS
1: 6, # GLONASS
2: 1, # SBAS
5: 2, # GALILEO
6: 3, # BEIDOU
7: 5, # QZSS
9: 7, # NAVIC
}
[docs]
class UNIHandler:
"""
UNIHandler class
"""
[docs]
def __init__(self, app):
"""
Constructor.
:param Frame app: reference to main tkinter application
"""
self.__app = app # Reference to main application class
self.__master = self.__app.appmaster # Reference to root class (Tk)
self.logger = logging.getLogger(__name__)
self._raw_data = None
self._parsed_data = None
# pylint: disable=unused-argument
[docs]
def process_data(self, raw_data: bytes, parsed_data: UNIMessage):
"""
Process relevant UNI message types
:param bytes raw_data: raw data
:param UNIMessage parsed_data: parsed data
"""
if raw_data is None:
return
if self.__app.gnss_status.version_data["hwversion"] == NA:
self.__app.gnss_status.version_data["hwversion"] = "Unicore"
self.__app.device_label = self.__app.gnss_status.version_data["hwversion"]
# self.logger.debug(f"data received {parsed_data.identity}")
self._process_utc(parsed_data)
if parsed_data.identity in ("BESTNAV", "BESTNAVH"):
self._process_BESTNAV(parsed_data)
elif parsed_data.identity in ("PVTSLN",):
self._process_PVTSLN(parsed_data)
elif parsed_data.identity in (
"ADRNAV",
"ADRNAVH",
"PPPNAV",
"SPPNAV",
"SPPNAVH",
):
self._process_ADRNAV(parsed_data)
elif parsed_data.identity in ("SATSINFO",):
self._process_SATSINFO(parsed_data)
elif parsed_data.identity in ("SATELLITE",):
self._process_SATELLITE(parsed_data)
elif parsed_data.identity in ("STADOP", "ADRDOP", "PPPDOP"):
self._process_STADOP(parsed_data)
elif parsed_data.identity == "VERSION":
self._process_VERSION(parsed_data)
def _process_utc(self, data: UNIMessage):
"""
Process wno, tow and leapsecond from UNI message header.
:param UNIMessage data: parsed message
"""
self.__app.gnss_status.utc = wnotow2utc(data.wno, data.tow, data.leapsecond)
def _process_pos(self, lat: float, lon: float, hmsl: float, undulation: float):
"""
Process lat/lon/hmsl/hae.
:param float lat: lat
:param float lon: lon
:param float hmsl: hmsl in meters
:param float undulation: separation in meters
"""
self.__app.gnss_status.lat = lat
self.__app.gnss_status.lon = lon
self.__app.gnss_status.alt = hmsl
self.__app.gnss_status.hae = hmsl + undulation
def _process_fix(self, postype: int):
"""
Process fix type.
:param int postype: attribute representing fix type
"""
self.__app.gnss_status.fix = fix2desc("BESTNAV", postype)
self.__app.gnss_status.diff_corr = (
sum(c in self.__app.gnss_status.fix for c in ("RTK", "PPP", "SBAS")) > 0
)
def _process_BESTNAV(self, data: UNIMessage):
"""
Process BESTNAV sentence - Navigation position velocity time solution.
:param UNIMessage data: BESTNAV parsed message
"""
self._process_pos(data.lat, data.lon, data.hmsl, data.undulation)
self._process_fix(data.postype)
self.__app.gnss_status.sip = data.numsolnsvs
self.__app.gnss_status.diff_age = data.diffage
self.__app.gnss_status.speed = data.horspd
self.__app.gnss_status.track = data.trkgnd
self.__app.gnss_status.diff_station = data.stationid
def _process_PVTSLN(self, data: UNIMessage):
"""
Process PVTSLN sentence - Navigation position velocity time solution.
:param UNIMessage data: PVTSLN parsed message
"""
self._process_pos(
data.psrposlat, data.psrposlon, data.psrposhmsl, data.undulation
)
self._process_fix(data.psrpostype)
self.__app.gnss_status.sip = data.psrpossolnsvs
self.__app.gnss_status.speed = data.psrvelground
self.__app.gnss_status.track = data.headingdegree
self.__app.gnss_status.pdop = data.pdop
self.__app.gnss_status.hdop = data.hdop
self.__app.gnss_status.diff_age = data.bestposdiffage
def _process_ADRNAV(self, data: UNIMessage):
"""
Process ADRNAV, PPPNAV, SPPNAV sentences.
:param UNIMessage data: ADRNAV/PPPNAV/SPPNAV parsed message
"""
self._process_pos(data.lat, data.lon, data.hmsl, data.undulation)
self._process_fix(data.postype)
def _process_SATSINFO(self, data: UNIMessage):
"""
Process SATSINFO sentences - Space Vehicle Information for all
GNSS constellations.
:param UNIMessage data: SATSINFO parsed message
"""
self.__app.gnss_status.gsv_data = {}
num_siv = int(data.numsat)
now = time()
for i in range(num_siv):
idx = f"_{i+1:02d}"
gnssId = SATSINFO_GNSSID[getattr(data, "sysstatus" + idx + "_01")]
svid = getattr(data, "prn" + idx)
elev = getattr(data, "elev" + idx)
azim = getattr(data, "azi" + idx)
cno = getattr(data, "cno" + idx + "_01")
self.__app.gnss_status.gsv_data[(gnssId, svid)] = (
gnssId,
svid,
elev,
azim,
cno,
now,
)
self.__app.gnss_status.siv = len(self.__app.gnss_status.gsv_data)
def _process_SATELLITE(self, data: UNIMessage):
"""
Process SATELLITE sentences - Space Vehicle Information for a
specific GNSS constellation (7 in total).
:param UNIMessage data: SATSINFO parsed message
"""
gnssId = SATELLITE_GNSSID[getattr(data, "gnss")]
for gnss, prn in list(self.__app.gnss_status.gsv_data.keys()):
if gnss == gnssId:
self.__app.gnss_status.gsv_data.pop((gnss, prn))
num_siv = int(data.numsat)
now = time()
for i in range(num_siv):
idx = f"_{i+1:02d}"
svid = getattr(data, "prn" + idx)
elev = getattr(data, "elv" + idx)
azim = getattr(data, "azi" + idx)
cno = 0 # cno not available from this message
self.__app.gnss_status.gsv_data[(gnssId, svid)] = (
gnssId,
svid,
elev,
azim,
cno,
now,
)
self.__app.gnss_status.siv = len(self.__app.gnss_status.gsv_data)
def _process_STADOP(self, data: UNIMessage):
"""
Process STADOP/ADRDOP/PPPDOP sentences - DOP Information.
:param UNIMessage data: STADOP/ADRDOP/PPPDOP parsed message
"""
self.__app.gnss_status.pdop = data.pdop
self.__app.gnss_status.hdop = data.hdop
self.__app.gnss_status.vdop = data.vdop
def _process_VERSION(self, data: UNIMessage):
"""
Process VERSION sentences - hardware & software information.
:param UNIMessage data: VERSION parsed message
"""
self.__app.gnss_status.version_data["hwversion"] = (
f"Unicore {DEVICE.get(data.device, data.device)}"
)
self.__app.gnss_status.version_data["swversion"] = data.swversion
self.__app.gnss_status.version_data["fwversion"] = data.comptime
self.__app.gnss_status.version_data["romversion"] = NA
self.__app.gnss_status.version_data["gnss"] = NA
self.__app.device_label = self.__app.gnss_status.version_data["hwversion"]