Source code for pride_ppp.factories.output

"""
PRIDE-PPP output parsing and validation.

Helpers for reading ``.kin`` / ``.res`` files into DataFrames and
validating pdp3 output.
"""

import logging
import os
from datetime import datetime
from pathlib import Path

import pandas as pd
from pydantic import ValidationError

from pride_ppp.specifications.output import PridePPP

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# RES file helpers
# ---------------------------------------------------------------------------


[docs] def get_wrms_from_res(res_path): """Compute per-epoch phase residual WRMS from a pdp3 ``.res`` file. The WRMS is computed as :math:`\\sqrt{\\sum w_i r_i^2 / \\sum w_i}` where :math:`r_i` are the double-differenced phase residuals (cycles) and :math:`w_i` are the corresponding weights, then converted to millimetres. Parameters ---------- res_path : str or Path Path to the pdp3 residual output file (``res_{YYYY}{DOY}_{site}.res``). Returns ------- pd.DataFrame Two-column DataFrame: * ``date`` \u2014 UTC epoch timestamp * ``wrms`` \u2014 phase residual WRMS (mm) for that epoch """ with open(res_path) as res_file: timestamps = [] data = [] wrms = 0 sumOfSquares = 0 sumOfWeights = 0 line = res_file.readline() # first line is header while True: if line == "": break line_data = line.split() if line_data[0] == "TIM": sumOfSquares = 0 sumOfWeights = 0 seconds_str = line_data[6] if "." in seconds_str: SS, fractional = seconds_str.split(".") SS = int(SS) fractional = fractional.ljust(6, "0")[:6] else: SS = int(seconds_str) fractional = "000000" isodate = ( f"{line_data[1]}-{line_data[2].zfill(2)}-{line_data[3].zfill(2)}" f"T{line_data[4].zfill(2)}:{line_data[5].zfill(2)}:{str(SS).zfill(2)}" f".{fractional}+00:00" ) timestamp = datetime.fromisoformat(isodate) timestamps.append(timestamp) line = res_file.readline() line_data = line.split() while not line.startswith("TIM"): phase_residual = float(line_data[1]) phase_weight = float(line_data[3].replace("D", "E")) sumOfSquares += phase_residual**2 * phase_weight sumOfWeights += phase_weight line = res_file.readline() if line == "": break line_data = line.split() wrms = (sumOfSquares / sumOfWeights) ** 0.5 * 1000 # in mm data.append(wrms) else: line = res_file.readline() wrms_df = pd.DataFrame({"date": timestamps, "wrms": data}) return wrms_df
[docs] def plot_kin_results_wrms(kin_df, title=None, save_as=None): """Plot kinematic results with WRMS in a 6-panel figure. Subplots (top → bottom): Latitude, Longitude, Height, Nsat (red = ≤ 4), PDOP on log scale (red = ≥ 5), WRMS (mm). Expects DataFrame columns: ``Latitude``, ``Longitude``, ``Height``, ``Nsat``, ``PDOP``, ``wrms``. Parameters ---------- kin_df : pd.DataFrame Output from ``read_kin_data`` merged with WRMS residuals. title : str, optional RINEX filename or label used in the figure title. save_as : str, optional If provided, save the figure to this path. """ import matplotlib.pyplot as plt size = 3 bad_nsat = kin_df[kin_df["Nsat"] <= 4] bad_pdop = kin_df[kin_df["PDOP"] >= 5] fig, axs = plt.subplots(6, 1, figsize=(10, 10), sharex=True) axs[0].scatter(kin_df.index, kin_df["Latitude"], s=size) axs[0].set_ylabel("Latitude") axs[1].scatter(kin_df.index, kin_df["Longitude"], s=size) axs[1].set_ylabel("Longitude") axs[2].scatter(kin_df.index, kin_df["Height"], s=size) axs[2].set_ylabel("Height") axs[3].scatter(kin_df.index, kin_df["Nsat"], s=size) axs[3].scatter(bad_nsat.index, bad_nsat["Nsat"], s=size * 2, color="red") axs[3].set_ylabel("Nsat") axs[4].scatter(kin_df.index, kin_df["PDOP"], s=size) axs[4].scatter(bad_pdop.index, bad_pdop["PDOP"], s=size * 2, color="red") axs[4].set_ylabel("log PDOP") axs[4].set_yscale("log") axs[4].set_ylim(1, 100) axs[5].scatter(kin_df.index, kin_df["wrms"], s=size) axs[5].set_ylabel("wrms mm") axs[0].ticklabel_format(axis="y", useOffset=False, style="plain") axs[1].ticklabel_format(axis="y", useOffset=False, style="plain") for ax in axs: ax.grid(True, c="lightgrey", zorder=0, lw=1, ls=":") plt.xticks(rotation=70) fig.suptitle(f"PRIDE-PPPAR results for {os.path.basename(title)}") fig.tight_layout() if save_as is not None: plt.savefig(save_as)
# --------------------------------------------------------------------------- # KIN file reading # ---------------------------------------------------------------------------
[docs] def kin_to_kin_position_df(source: str | Path) -> pd.DataFrame | None: """Parse a pdp3 ``.kin`` file into a DataFrame with optional WRMS residuals. Reads the kinematic position records after the ``END OF HEADER`` marker and converts them to a DataFrame indexed by UTC timestamp. Attempts to merge WRMS residuals from a co-located ``.res`` file (same directory, stem derived from the ``.kin`` filename); the ``wrms`` column will be ``None`` if the ``.res`` file is missing or cannot be parsed. Parameters ---------- source : str | Path Path to the pdp3 ``.kin`` output file. Returns ------- pd.DataFrame | None DataFrame with columns including ``time``, ``latitude``, ``longitude``, ``height``, ``pdop``, and ``wrms``, indexed by record number. Returns ``None`` if the file has no header or contains no valid data records. """ logger.info(f"Parsing KIN file: {source}") with open(source) as file: lines = file.readlines() end_header_index = next( (i for i, line in enumerate(lines) if line.strip() == "END OF HEADER"), None ) data = [] if end_header_index is None: logger.error(f"GNSS: No header found in FILE {str(source)}") return None for idx, line in enumerate(lines[end_header_index + 2 :]): split_line = line.strip().split() selected_columns = split_line[:9] + [split_line[-1]] try: ppp: PridePPP | ValidationError = PridePPP.from_kin_file(selected_columns) data.append(ppp) except Exception: pass if not data: logger.error(f"GNSS: No data found in FILE {source}") return None dataframe = pd.DataFrame([dict(pride_ppp) for pride_ppp in data]) dataframe["time"] = dataframe["time"].dt.tz_localize("UTC") dataframe.set_index("time", inplace=True) logger.info(f"Parsed {len(dataframe)} records from KIN file {source}") # Add residuals data try: source_path = Path(source) res_pattern = source_path.stem.replace("kin_", "res_").replace(".kin", "") res_file = source_path.parent / f"{res_pattern}.res" if res_file.exists(): logger.info(f"Adding residuals from {res_file}") wrms_df = get_wrms_from_res(res_file) if not wrms_df.empty: wrms_df.set_index("date", inplace=True) dataframe_sorted = dataframe.sort_index() wrms_sorted = wrms_df.sort_index() dataframe = pd.merge_asof( dataframe_sorted, wrms_sorted, left_index=True, right_index=True, direction="nearest", tolerance=pd.Timedelta(seconds=0.01), ) logger.info( f"Added WRMS residuals for {dataframe['wrms'].notna().sum()} of {len(dataframe)} records" ) else: logger.warning(f"No WRMS data found in {res_file}") dataframe["wrms"] = None else: logger.warning(f"No corresponding RES file found for {source}") dataframe["wrms"] = None except Exception as e: logger.error(f"Error adding residuals: {e}") dataframe["wrms"] = None dataframe = dataframe.drop(columns=["modified_julian_date", "second_of_day"], errors="ignore") dataframe.reset_index(inplace=True) logger.info(f"GNSS Parser: {dataframe.shape[0]} shots from FILE {str(source)}") return dataframe
[docs] def read_kin_data(kin_path): r"""Read a ``.kin`` file into a DataFrame using fixed-width column specs. The column widths match the pdp3 output format (PRIDE-PPPAR 3). The resulting DataFrame is indexed by UTC timestamps derived from MJD + seconds-of-day. Parameters ---------- kin_path : str Path to the ``.kin`` file. Returns ------- pd.DataFrame Fixed-width columns: Mjd, Sod, \*, X, Y, Z, Latitude, Longitude, Height, Nsat, G (GPS sats), R (GLONASS), E (Galileo), C2 (BDS-2), C3 (BDS-3), J (QZSS), PDOP. Indexed by UTC datetime derived from MJD + seconds-of-day. """ with open(kin_path) as kin_file: for i, line in enumerate(kin_file): if "END OF HEADER" in line: end_of_header = i + 1 break cols = [ "Mjd", "Sod", "*", "X", "Y", "Z", "Latitude", "Longitude", "Height", "Nsat", "G", "R", "E", "C2", "C3", "J", "PDOP", ] colspecs = [ (0, 6), (6, 16), (16, 18), (18, 32), (32, 46), (46, 60), (60, 77), (77, 94), (94, 108), (108, 114), (114, 117), (117, 120), (120, 123), (123, 126), (126, 129), (129, 132), (132, 140), ] kin_df = pd.read_fwf( kin_path, header=end_of_header, colspecs=colspecs, names=cols, on_bad_lines="skip", ) kin_df.set_index( pd.to_datetime(kin_df["Mjd"] + 2400000.5, unit="D", origin="julian") + pd.to_timedelta(kin_df["Sod"], unit="sec"), inplace=True, ) return kin_df
[docs] def validate_kin_file(source: str | Path) -> bool: """Validate a kin file. This is done by checking if it can be parsed into a DataFrame and contains data. Parameters ---------- source : str | Path The path to the kin file. Returns ------- bool True if the kin file is valid, False otherwise. """ if not isinstance(source, (str, Path)): logger.error(f"Invalid source type: {type(source)}") return False source = Path(source) if not source.exists(): return False df = kin_to_kin_position_df(source) if df is None or df.empty: logger.error(f"Kin file {source} is invalid or contains no data") return False return True