"""
ubx_recorder_frame.py
UBX Player/Recorder widget for CFG commands entered by user via UBX
Configuration panel.
Records commands to memory array and allows user to load or save
this array to or from a file.
Facilitates copying configuration from one device to another.
Created on 9 Jan 2023
:author: semuadmin
:copyright: 2020 SEMU Consulting
:license: BSD 3-Clause
"""
from threading import Event, Thread
from time import sleep
from tkinter import Button, E, Frame, Label, TclError, W, filedialog
from PIL import Image, ImageTk
from pyubx2 import (
POLL_LAYER_BBR,
POLL_LAYER_FLASH,
SET,
SET_LAYER_BBR,
SET_LAYER_FLASH,
SET_LAYER_RAM,
TXN_NONE,
U1,
UBX_PROTOCOL,
UBXMessage,
UBXReader,
bytes2val,
val2bytes,
)
from pygpsclient.globals import (
HOME,
ICON_DELETE,
ICON_LOAD,
ICON_RECORD,
ICON_SAVE,
ICON_SEND,
ICON_STOP,
ICON_UNDO,
)
from pygpsclient.helpers import set_filename
from pygpsclient.strings import LBLCFGRECORD, SAVETITLE
STOP = 0
PLAY = 1
RECORD = 2
FLASH = 0.7
CFG = b"\x06"
VALSET = b"\x8a"
VALGET = b"\x8b"
MSG = b"\x01"
PRT = b"\x00"
[docs]
class UBX_Recorder_Frame(Frame):
"""
UBX Configuration command recorder panel.
"""
[docs]
def __init__(self, app, container, *args, **kwargs):
"""
Constructor.
:param Frame app: reference to main tkinter application
:param Frame container: reference to container frame (config-dialog)
:param args: optional args to pass to Frame parent class
:param kwargs: optional kwargs to pass to Frame parent class
"""
self.__app = app # Reference to main application class
self.__master = self.__app.appmaster # Reference to root class (Tk)
self.__container = container # Reference to UBX Configuration dialog
Frame.__init__(self, self.__container.container, *args, **kwargs)
self._img_load = ImageTk.PhotoImage(Image.open(ICON_LOAD))
self._img_save = ImageTk.PhotoImage(Image.open(ICON_SAVE))
self._img_play = ImageTk.PhotoImage(Image.open(ICON_SEND))
self._img_stop = ImageTk.PhotoImage(Image.open(ICON_STOP))
self._img_record = ImageTk.PhotoImage(Image.open(ICON_RECORD))
self._img_undo = ImageTk.PhotoImage(Image.open(ICON_UNDO))
self._img_delete = ImageTk.PhotoImage(Image.open(ICON_DELETE))
self._cmds_stored = []
self._rec_status = STOP
self._configfile = None
self._stop_event = Event()
self._bg = self.cget("bg") # default background color
self._configfile = None
self._configpath = None
self._body()
self._do_layout()
self.reset()
def _body(self):
"""
Set up frame and widgets.
"""
self._lbl_recorder = Label(self, text=LBLCFGRECORD, anchor=W)
self._btn_load = Button(
self,
image=self._img_load,
width=40,
command=self._on_load,
font=self.__app.font_md,
)
self._btn_save = Button(
self,
image=self._img_save,
width=40,
command=self._on_save,
font=self.__app.font_md,
)
self._btn_play = Button(
self,
image=self._img_play,
width=40,
command=self._on_play,
font=self.__app.font_md,
)
self._btn_record = Button(
self,
image=self._img_record,
width=40,
command=self._on_record,
font=self.__app.font_md,
)
self._btn_undo = Button(
self,
image=self._img_undo,
width=40,
command=self._on_undo,
font=self.__app.font_md,
)
self._btn_delete = Button(
self,
image=self._img_delete,
width=40,
command=self._on_delete,
font=self.__app.font_md,
)
self._lbl_status = Label(self, text="", anchor="center")
self._lbl_activity = Label(self, text="", anchor="center")
def _do_layout(self):
"""
Layout widgets.
"""
self._lbl_recorder.grid(column=0, row=0, columnspan=6, padx=3, sticky=(W, E))
self._btn_load.grid(column=0, row=1, ipadx=3, ipady=3, sticky=W)
self._btn_save.grid(column=1, row=1, ipadx=3, ipady=3, sticky=W)
self._btn_play.grid(column=2, row=1, ipadx=3, ipady=3, sticky=W)
self._btn_record.grid(column=3, row=1, ipadx=3, ipady=3, sticky=W)
self._btn_undo.grid(column=4, row=1, ipadx=3, ipady=3, sticky=W)
self._btn_delete.grid(column=5, row=1, ipadx=3, ipady=3, sticky=W)
self._lbl_status.grid(column=0, row=2, columnspan=6, padx=3, sticky=(W, E))
self._lbl_activity.grid(column=0, row=3, columnspan=6, padx=3, sticky=(W, E))
(cols, rows) = self.grid_size()
for i in range(cols):
self.grid_columnconfigure(i, weight=1)
for i in range(rows):
self.grid_rowconfigure(i, weight=1)
self.option_add("*Font", self.__app.font_sm)
[docs]
def reset(self):
"""
Reset panel to initial settings
"""
self._rec_status = STOP
self._update_status()
def _set_configfile_path(self) -> tuple:
"""
Set configuration file path.
:return: file path
:rtype: tuple
"""
configpath = filedialog.askdirectory(
parent=self.__container, title=SAVETITLE, initialdir=HOME, mustexist=True
)
if configpath in ((), ""):
return None, None # User cancelled
return set_filename(configpath, "config", "ubx")
def _open_configfile(self):
"""
Open configuration file.
"""
return self.__app.file_handler.open_file(
"ubx",
(
("ubx config files", "*.ubx"),
("u-center config files", "*.txt"),
("all files", "*.*"),
),
)
def _on_load(self):
"""
Load commands from file into in-memory recording.
"""
self._configfile = self._open_configfile()
if self._configfile is None: # user cancelled
return
self._cmds_stored = []
self._update_activity("Loading commands...")
if self._configfile[-3:] == "txt":
i = self._on_load_txt(self._configfile)
else:
i = self._on_load_ubx(self._configfile)
if i > 0:
fname = self._configfile.split("/")[-1]
self._update_activity(
f"{i} Command{'s' if i > 1 else ''} loaded from {fname}"
)
self._update_status()
def _on_load_ubx(self, fname: str) -> int:
"""
Load binary ubx configuration file
:param str fname: input filename
:return: no of items read
:rtype: int
"""
try:
with open(fname, "rb") as file:
ubr = UBXReader(file, protfilter=UBX_PROTOCOL, msgmode=SET)
eof = False
i = 0
while not eof:
_, parsed = ubr.read()
if parsed is not None:
self._cmds_stored.append(parsed)
i += 1
else:
eof = True
except Exception: # pylint: disable=broad-exception-caught
self._update_activity(f"ERROR parsing {fname}!")
return 0
return i
def _on_load_txt(self, fname: str) -> int:
"""
Load u-center format text configuration file.
Any messages other than CFG-MSG, CFG-PRT or CFG-VALGET are discarded.
The CFG-VALGET messages are converted into CFG-VALGET.
:param str fname: input file name
:return: no of items read
:rtype: int
"""
try:
with open(fname, "r", encoding="utf-8") as file:
i = 0
for line in file:
parts = line.replace(" ", "").split("-")
data = bytes.fromhex(parts[-1])
cls = data[0:1]
mid = data[1:2]
if cls != CFG:
continue
if mid == VALGET:
version = data[4:5]
layer = bytes2val(data[5:6], U1)
if layer == POLL_LAYER_BBR:
layers = SET_LAYER_BBR
elif layer == POLL_LAYER_FLASH:
layers = SET_LAYER_FLASH
else:
layers = SET_LAYER_RAM
layers = val2bytes(layers, U1)
transaction = val2bytes(TXN_NONE, U1) # not transactional
reserved0 = b"\x00"
cfgdata = data[8:]
payload = version + layers + transaction + reserved0 + cfgdata
parsed = UBXMessage(CFG, VALSET, SET, payload=payload)
else: # legacy CFG command
parsed = UBXMessage(CFG, mid, SET, payload=data[4:])
if parsed is not None:
self._cmds_stored.append(parsed)
i += 1
except Exception: # pylint: disable=broad-exception-caught
self._update_activity(f"ERROR parsing {fname}!")
return 0
return i
def _on_save(self):
"""
Save commands from in-memory recording to file.
"""
if self._rec_status == RECORD:
return
if len(self._cmds_stored) == 0:
self._update_activity("Nothing to save")
return
fname, self._configfile = self._set_configfile_path()
if self._configfile is None:
return
self._update_activity("Saving commands...")
with open(self._configfile, "wb") as file:
i = 0
for i, msg in enumerate(self._cmds_stored):
file.write(msg.serialize())
self._cmds_stored = []
self._update_activity(f"{i + 1} command{'s' if i > 0 else ''} saved to {fname}")
self._update_status()
def _on_play(self):
"""
Send commands to device from in-memory recording.
"""
if self._rec_status == RECORD:
return
if len(self._cmds_stored) == 0:
self._update_activity("Nothing to send")
return
if self._rec_status == STOP:
self._rec_status = PLAY
i = 0
for i, msg in enumerate(self._cmds_stored):
self._update_activity(f"{i} Sending {msg.identity}")
self.__app.gnss_outqueue.put(msg.serialize())
sleep(0.01)
self._update_activity(
f"{i + 1} command{'s' if i > 0 else ''} sent to device"
)
self._rec_status = STOP
self._update_status()
def _on_record(self):
"""
Add commands to in-memory recording.
"""
if self._rec_status == STOP:
self._rec_status = RECORD
self.__container.recordmode = True
# start flashing record label...
self._stop_event.clear()
Thread(
target=self._flash_record,
daemon=True,
args=(self._stop_event,),
).start()
elif self._rec_status == RECORD:
self._stop_event.set()
self._rec_status = STOP
self.__container.recordmode = False
self._update_activity("Recording stopped")
self._update_status()
def _on_undo(self):
"""
Remove last record from in-memory recording.
"""
if len(self._cmds_stored) == 0:
self._update_activity("Nothing to undo")
return
if self._rec_status == STOP:
if len(self._cmds_stored) > 0:
self._cmds_stored.pop()
self._update_activity("Last command undone")
self._update_status()
def _on_delete(self):
"""
Delete all records in in-memory recording.
"""
if self._rec_status == RECORD:
return
if len(self._cmds_stored) == 0:
self._update_activity("Nothing to delete")
return
i = len(self._cmds_stored)
self._update_activity(f"{i} command{'s' if i > 1 else ''} deleted")
self._cmds_stored = []
self._update_status()
def _update_status(self):
"""
Update status label.
"""
lcs = len(self._cmds_stored)
lst = f". Last command: {self._cmds_stored[-1].identity}" if lcs > 0 else ""
self._lbl_status.config(text=f"Commands in memory: {lcs}{lst}")
pimg = rimg = None
if self._rec_status == STOP:
pimg = self._img_play
rimg = self._img_record
elif self._rec_status == PLAY:
pimg = self._img_stop
rimg = self._img_record
elif self._rec_status == RECORD:
pimg = self._img_play
rimg = self._img_stop
self._btn_play.config(image=pimg)
self._btn_record.config(image=rimg)
def _update_activity(self, msg: str):
"""
Update activity label.
:param str msg: message
"""
if len(msg) > 55:
msg = f"{msg[0:30]}...{msg[-22:]}"
self._lbl_activity.config(text=msg)
[docs]
def update_record(self, msg: UBXMessage):
"""
Add UBX CFG SET command to in-memory recording.
:param UBXMessage msg: message to record
"""
if msg.msgmode == SET:
self._cmds_stored.append(msg)
self._update_status()
def _flash_record(self, stop: Event):
"""
THREADED
Flash record indicator for conspicuity.
"""
try:
cols = [("white", "red"), ("red", "white")]
i = 0
while not stop.is_set():
i = not i
self._lbl_activity.config(
text="RECORDING", fg=cols[i][0], bg=cols[i][1]
)
sleep(FLASH)
except TclError: # if dialog closed without stopping recording
pass