Source code for pride_ppp.factories.processor

"""PrideProcessor — concurrent-safe RINEX → kinematic position pipeline.

Owns all internal state (ProductEnvironment, WorkSpace, DependencySpec).
Each ``process()`` call runs pdp3 in a date-partitioned working directory
(``pride_dir/{year}/{doy}/``) so concurrent calls never collide.
"""

from __future__ import annotations

import datetime
import enum
import logging
import re
import shutil
import subprocess
import tempfile
from collections.abc import Iterator, Sequence
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from itertools import groupby
from pathlib import Path
from typing import Literal

import pandas as pd
from gnss_product_management import (
    GNSSClient,
    WorkSpace,
)
from gnss_product_management.environments import ProductRegistry
from gnss_product_management.specifications.dependencies.dependencies import (
    DependencyResolution,
    DependencySpec,
)
from gnss_product_management.utilities.paths import as_path
from gpm_specs.configs import (
    CENTERS_RESOURCE_DIR,
    FORMAT_SPEC_YAML,
    LOCAL_SPEC_DIR,
    META_SPEC_YAML,
    PRODUCT_SPEC_YAML,
)

from ..defaults import (
    PRIDE_CENTERS_DIR,
    PRIDE_DIR_SPEC,
    PRIDE_INSTALL_SPEC,
    PRIDE_PPPAR_FINAL_SPEC,
    PRIDE_PPPAR_SPEC,
    PRIDE_PRODUCT_SPEC,
)
from ..specifications.cli import PrideCLIConfig
from ..specifications.config import PRIDEPPPFileConfig, SatelliteProducts
from .output import get_wrms_from_res, kin_to_kin_position_df
from .rinex import rinex_get_time_range

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Processing mode
# ---------------------------------------------------------------------------


[docs] class ProcessingMode(enum.Enum): """IGS product timeliness mode for dependency resolution. Selects which dependency-spec YAML governs which products are accepted. * ``DEFAULT`` — cascades through FIN → RAP → ULT. Uses the best available product at run time. Suitable for near-real-time processing or when the observation date is within the last two weeks. * ``FINAL`` — accepts only IGS final (FIN) products (available ≥13 days after observation). Suitable for post-processing campaigns where reproducibility and highest accuracy are required. IGS product latency reference (approximate): - FIN (final) : orbit ≈ 13 days, clock ≈ 13 days - RAP (rapid) : orbit ≈ 17 hours, clock ≈ 17 hours - ULT (ultra-rapid) : orbit ≈ 3 hours, clock predicted half """ DEFAULT = "default" FINAL = "final"
_MODE_TO_SPEC: dict[ProcessingMode, Path] = { ProcessingMode.DEFAULT: PRIDE_PPPAR_SPEC, ProcessingMode.FINAL: PRIDE_PPPAR_FINAL_SPEC, } # Regex for inferring 4-char site ID from RINEX filenames. # Matches the leading 4 alphabetic characters of the filename stem. _SITE_RE = re.compile(r"^([A-Za-z0-9]{4})") # Spec name → SatelliteProducts field mapping _SPEC_TO_PRODUCT_FIELD: dict[str, str] = { "ORBIT": "satellite_orbit", "CLOCK": "satellite_clock", "BIA": "code_phase_bias", "ATTOBX": "quaternions", "ERP": "erp", } # --------------------------------------------------------------------------- # Result # ---------------------------------------------------------------------------
[docs] @dataclass(frozen=True) class ProcessingResult: """Immutable result from a single RINEX → kinematic processing run. Attributes ---------- rinex_path : Path Path to the input RINEX observation file. site : str 4-character station identifier (e.g. ``"NCC1"``). date : datetime.date Observation date for this processing run. kin_path : Path or None Path to the output ``.kin`` file, or ``None`` if pdp3 did not produce one. res_path : Path or None Path to the output ``.res`` residuals file, or ``None``. config_path : Path Path to the ``config_file`` used for this run. resolution : DependencyResolution Product resolution result (fulfilled and missing dependencies). returncode : int pdp3 process exit code (0 = success). stderr : str Captured stderr from the pdp3 subprocess. """ rinex_path: Path site: str date: datetime.date kin_path: Path | None res_path: Path | None config_path: Path resolution: DependencyResolution returncode: int = 0 stderr: str = "" @property def success(self) -> bool: """``True`` if pdp3 produced a valid .kin output file.""" return self.kin_path is not None and self.kin_path.exists()
[docs] def positions(self) -> pd.DataFrame | None: """Parse the ``.kin`` file into a DataFrame of kinematic positions. Returns ------- pd.DataFrame or None Kinematic position DataFrame indexed by UTC epoch, or ``None`` if the run failed. Columns: * ``Latitude`` — geodetic latitude (degrees) * ``Longitude`` — geodetic longitude (degrees) * ``Height`` — ellipsoidal height (metres) * ``Nsat`` — number of satellites used in the epoch solution * ``PDOP`` — position dilution of precision * ``wrms`` — phase residual WRMS per epoch (mm), merged from the ``.res`` file Coordinates are in the reference frame of the resolved orbit/clock products (IGS20/ITRF2020 for Repro3+ products; IGS14/ITRF2014 for earlier series). Epochs with ``Nsat ≤ 4`` or ``PDOP ≥ 5`` should be treated with caution. """ if not self.success or self.kin_path is None: return None return kin_to_kin_position_df(self.kin_path)
[docs] def residuals(self) -> pd.DataFrame | None: """Parse the ``.res`` file into a WRMS DataFrame. Returns ------- pd.DataFrame or None WRMS residuals indexed by timestamp, or ``None``. """ if self.res_path is None or not self.res_path.exists(): return None return get_wrms_from_res(self.res_path)
# --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _infer_site(rinex: Path) -> str: """Extract a 4-character site identifier from a RINEX filename. Matches the first four alphanumeric characters of the filename stem. Works for both RINEX 3 long names (``SITE00USA_R_20251500000_01D_30S_MO.rnx`` → ``SITE``) and legacy 8.3 names (``NCC12500.25o`` → ``NCC1``). Falls back to ``"SIT1"`` when the filename does not contain a 4-character alphanumeric prefix. This fallback is silent — pass ``site=`` explicitly to :meth:`PrideProcessor.process` for non-standard filenames so that output files are named correctly. Args: rinex: Path to a RINEX observation file. Returns: Uppercase 4-char site ID. """ m = _SITE_RE.match(rinex.stem) return m.group(1).upper() if m else "SIT1" def _resolution_to_satellite_products( resolution: DependencyResolution, ) -> tuple[SatelliteProducts, Path | None]: """Map a dependency resolution to PRIDE-PPPAR satellite product paths. Iterates over the fulfilled dependencies in *resolution* and extracts the local filenames for each product category that pdp3 needs (orbits, clocks, biases, quaternions, ERP). The resulting ``SatelliteProducts`` instance is written directly into the PRIDE config file. Only the **first** fulfilled dependency per category is used (the resolver already returns them in preference order). Args: resolution: A completed ``DependencyResolution`` from the resolver. Returns: A 2-tuple of ``(SatelliteProducts, product_directory)``. *product_directory* is the common parent directory of the resolved product files, or ``None`` if no products were fulfilled. """ product_fields: dict[str, str] = {} product_dir: Path | None = None for rd in resolution.fulfilled: # Map the spec name (e.g. "ORBIT") to the SatelliteProducts field name field_name = _SPEC_TO_PRODUCT_FIELD.get(rd.spec) if field_name is None or field_name in product_fields: continue local_path = rd.local_path if local_path is None: continue path = as_path(local_path) product_fields[field_name] = path.name # Use the first product's parent as the common product directory if product_dir is None: product_dir = path.parent return ( SatelliteProducts( satellite_orbit=product_fields.get("satellite_orbit"), satellite_clock=product_fields.get("satellite_clock"), code_phase_bias=product_fields.get("code_phase_bias"), quaternions=product_fields.get("quaternions"), erp=product_fields.get("erp"), product_directory=str(product_dir) if product_dir else "Default", ), product_dir, ) def _resolution_to_table_dir(resolution: DependencyResolution) -> Path | None: """Locate the directory containing ANTEX / reference table files. Scans fulfilled dependencies for the ``ATTATX`` spec (antenna phase centre corrections) and returns its parent directory. pdp3 uses this as the ``table_directory`` in its config file to find ancillary data (ANTEX, leap-second table, satellite metadata, etc.). Args: resolution: A completed ``DependencyResolution``. Returns: Parent directory of the ANTEX file, or ``None`` if not resolved. """ for rd in resolution.fulfilled: if rd.spec in "ATTATX": local_path = rd.local_path if local_path is not None: path = as_path(local_path) return path.parent return None def _write_config( satellite_products: SatelliteProducts, table_dir: Path | None, dest: Path, ) -> Path: """Write a PRIDE-PPPAR ``config_file`` to disk. Loads the default config template, injects the resolved satellite product filenames and table directory, then serialises to *dest*. Parent directories are created automatically. Args: satellite_products: Resolved product paths for orbits, clocks, etc. table_dir: Directory containing ANTEX and reference tables. dest: Target path for the config file (e.g. ``pride_dir/2025/015/config_file``). Returns: The path that was written (same as *dest*). """ config = PRIDEPPPFileConfig.load_default() config.satellite_products = satellite_products config.observation.table_directory = str(table_dir) if table_dir else "Default" dest.parent.mkdir(parents=True, exist_ok=True) config.write_config_file(dest) return dest # --------------------------------------------------------------------------- # PrideProcessor # ---------------------------------------------------------------------------
[docs] class PrideProcessor: """RINEX observation file in → kinematic PPP-AR positions out. Owns its own :class:`ProductRegistry`, :class:`WorkSpace`, and :class:`GNSSClient`. No global state; a single instance is safe to reuse across many :meth:`process` / :meth:`process_batch` calls. The full pipeline per RINEX file: 1. Infer site ID and observation date from the RINEX filename/header. 2. Resolve IGS products (orbit, clock, bias, ERP, ATX) for that date via :meth:`GNSSClient.resolve_dependencies`. Skips resolution if a valid ``DependencyLockFile`` already exists. 3. Return a cached :class:`ProcessingResult` if a valid ``.kin`` output already exists in ``output_dir`` (unless *override* is ``True``). 4. Write a ``pdp3`` ``config_file`` to ``pride_dir/{year}/{doy}/``. 5. Execute ``pdp3`` and move ``.kin`` / ``.res`` outputs to ``output_dir``. For a full station-year, use :meth:`process_batch`: it resolves products once per unique date, writes configs for all dates, skips cached results, then dispatches ``pdp3`` subprocesses in parallel. Example:: processor = PrideProcessor( pride_dir=Path("/data/pride"), output_dir=Path("/data/output"), mode=ProcessingMode.FINAL, ) for result in processor.process_batch(rinex_files, max_workers=4): if result.success: print(result.date, len(result.positions()), "epochs") """
[docs] def __init__( self, pride_dir: Path, output_dir: Path, *, pride_install_dir: Path | None = None, cli_config: PrideCLIConfig | None = PrideCLIConfig(), mode: ProcessingMode | Literal["FINAL", "DEFAULT"] = ProcessingMode.DEFAULT, ) -> None: """Initialise the processor and all its owned subsystems. Construction is intentionally *eager*: the ProductEnvironment is built, all spec YAMLs are parsed, and the WorkSpace is registered so that every subsequent ``process()`` / ``process_batch()`` call can proceed without further setup. Args: pride_dir: Root directory for PRIDE products and working state. Config files and pdp3 working directories will be written under ``pride_dir/{year}/{doy}/``. output_dir: Final destination for ``.kin`` / ``.res`` output files produced by pdp3. pride_install_dir: Optional path to a PRIDE-PPPAR installation that provides additional table files. When set, its spec is registered on the WorkSpace. cli_config: Override the default pdp3 CLI flags. When ``None`` a default ``PrideCLIConfig`` (kinematic, 1 s, loose edit) is used. mode: Product timeliness mode. Selects which dependency-spec YAML governs product resolution: * ``ProcessingMode.DEFAULT`` — FIN → RAP → ULT cascade. * ``ProcessingMode.FINAL`` — only FINAL products. Also accepts the string literals ``"DEFAULT"`` or ``"FINAL"`` for convenience. """ if isinstance(mode, str): mode = ProcessingMode(mode.upper()) self._pride_dir = Path(pride_dir) self._output_dir = Path(output_dir) self._pride_install_dir = Path(pride_install_dir) if pride_install_dir else None self._cli_config = cli_config if cli_config is not None else PrideCLIConfig() self._mode = mode # Load the DependencySpec that matches the requested processing mode. # The dep-spec controls which TTT (timeliness) values the resolver # will accept — e.g. FINAL mode restricts TTT to [FIN]. spec_path = _MODE_TO_SPEC[self._mode] self._dep_spec = DependencySpec.from_yaml(spec_path) logger.info("Processing mode: %s (spec: %s)", mode.value, spec_path.name) # Build the product registry (parameter, format, product, and center specs) # and the workspace (sink mappings), then wire them into a GNSSClient. registry = self._build_registry() workspace = self._build_workspace() self._client = GNSSClient( product_registry=registry, workspace=workspace, max_connections=10, )
# ------------------------------------------------------------------ # # Private construction helpers # ------------------------------------------------------------------ # def _build_registry(self) -> ProductRegistry: """Construct and finalise the :class:`ProductRegistry`. Loads, in order: 1. **Parameter metadata** — dimension names, allowed values. 2. **Format spec** — file naming templates (SP3, CLK, …). 3. **Product specs** — the core product catalogue *and* the PRIDE-specific product catalogue (tables, ocean models, etc.). 4. **Resource specs** — per-analysis-centre YAML files that describe where each product lives on the remote servers. Both the shared ``CENTERS_RESOURCE_DIR`` and the PRIDE-specific ``PRIDE_CENTERS_DIR`` are scanned. Returns: A fully built :class:`ProductRegistry`. """ registry = ProductRegistry() registry.add_parameter_spec(META_SPEC_YAML) registry.add_format_spec(FORMAT_SPEC_YAML) registry.add_product_spec(PRODUCT_SPEC_YAML) registry.add_product_spec(PRIDE_PRODUCT_SPEC, id="pride") for path in Path(CENTERS_RESOURCE_DIR).glob("*.yaml"): registry.add_resource_spec(path) if PRIDE_CENTERS_DIR.is_dir(): for path in PRIDE_CENTERS_DIR.glob("*.yaml"): registry.add_resource_spec(path) registry.build() return registry def _build_workspace(self) -> WorkSpace: """Create the ``WorkSpace`` and register local directory specs. A WorkSpace maps logical *sink IDs* to physical base directories. After registration, the resolver can materialise files under: * ``"pride"`` → ``self._pride_dir`` * ``"pride_install"`` → ``self._pride_install_dir`` (optional) Returns: A configured :class:`WorkSpace`. """ ws = WorkSpace() for path in Path(LOCAL_SPEC_DIR).glob("*.yaml"): ws.add_resource_spec(path) ws.add_resource_spec(PRIDE_DIR_SPEC) ws.add_resource_spec(PRIDE_INSTALL_SPEC) ws.register_spec( base_dir=self._pride_dir, spec_ids=["pride_config"], alias="pride", ) if self._pride_install_dir: ws.register_spec( base_dir=self._pride_install_dir, spec_ids=["pride_install_config"], alias="pride_install", ) return ws # ------------------------------------------------------------------ # # Resolution (per-call fresh resolver) # ------------------------------------------------------------------ # def _resolve( self, date: datetime.datetime, local_sink_id: str = "pride", ) -> DependencyResolution: """Resolve all dependencies for a single UTC date. Delegates to :meth:`GNSSClient.resolve_dependencies`. The resolver walks the dependency spec's preference list (centre × timeliness) top-down and for each product either verifies a local copy already exists or downloads it. Args: date: Target date (midnight UTC) for product resolution. local_sink_id: WorkSpace alias that receives downloaded files. Defaults to ``"pride"`` which maps to ``self._pride_dir``. Returns: A :class:`DependencyResolution` containing fulfilled and missing product entries. """ resolution, _ = self._client.resolve_dependencies( self._dep_spec, date, sink_id=local_sink_id, ) return resolution # ------------------------------------------------------------------ # # Directory helpers # ------------------------------------------------------------------ # def _working_dir(self, date: datetime.date) -> Path: """Return ``pride_dir/{year}/{doy}/``, creating it if needed.""" doy = date.timetuple().tm_yday d = self._pride_dir / str(date.year) / f"{doy:03d}" d.mkdir(parents=True, exist_ok=True) return d # ------------------------------------------------------------------ # # Subprocess execution # ------------------------------------------------------------------ # def _build_pdp_command(self, rinex: Path, site: str, config_path: Path) -> list[str]: """Assemble the full ``pdp3`` command-line invocation. Clones the processor's CLI config, overriding ``pride_configfile_path`` with the date-specific config file, then generates the argument list via ``PrideCLIConfig.generate_pdp_command``. Args: rinex: Path to the observation file passed to pdp3. site: 4-char site identifier (e.g. ``"NCC1"``). config_path: The ``config_file`` written by ``_write_config``. Returns: A list of strings suitable for ``subprocess.run()``. """ cli = PrideCLIConfig( **{ **self._cli_config.model_dump(), "pride_configfile_path": config_path, } ) return cli.generate_pdp_command(site=site, local_file_path=str(rinex)) @staticmethod def _run_pdp3( command: list[str], site: str, output_dir: Path, ) -> tuple[Path | None, Path | None, int, str]: """Execute pdp3 in *working_dir* and move outputs to *output_dir*. pdp3 writes intermediate files (ambiguity tables, residual grids) into its current working directory. We point it at the persistent ``pride_dir/{year}/{doy}/`` directory so these artefacts are available for inspection after the run. After execution the method searches recursively for ``kin_*`` and ``res_*`` output files matching *site*, appends ``.kin`` / ``.res`` extensions, and moves them to *output_dir*. Args: command: Full pdp3 argument list from ``_build_pdp_command``. site: 4-char site ID used to locate output files by pattern. output_dir: Final destination for ``.kin`` / ``.res`` files. Returns: ``(kin_path, res_path, returncode, stderr)`` where paths are ``None`` when the corresponding output was not produced. Raises: FileNotFoundError: If the ``pdp3`` binary is not on ``PATH``. """ if not shutil.which("pdp3"): raise FileNotFoundError("pdp3 binary not found in PATH") with tempfile.TemporaryDirectory() as tmpdir: # Run pdp3 as a blocking subprocess, capturing all output result = subprocess.run( command, cwd=tmpdir, capture_output=True, text=True, ) # Replay stdout/stderr through the logger for observability if result.stdout: for line in result.stdout.strip().splitlines(): logger.info(line) if result.stderr: for line in result.stderr.strip().splitlines(): logger.warning(line) # pdp3 writes outputs as e.g. "kin_2025254_ncc1" (no extension). # Search recursively in the working dir to find them. kin_files = list(Path(tmpdir).rglob(f"kin_*_{site.lower()}")) res_files = list(Path(tmpdir).rglob(f"res_*_{site.lower()}")) kin_out: Path | None = None res_out: Path | None = None output_dir.mkdir(parents=True, exist_ok=True) # Move outputs to the final output directory with proper extensions if kin_files: src = kin_files[0] dst = output_dir / (src.name + ".kin") shutil.move(str(src), str(dst)) kin_out = dst logger.info("Generated kin file %s", dst) if res_files: src = res_files[0] dst = output_dir / (src.name + ".res") shutil.move(str(src), str(dst)) res_out = dst logger.info("Generated res file %s", dst) return kin_out, res_out, result.returncode, result.stderr def _build_kin_res_paths( self, date: datetime.datetime, site: str, output_dir: Path ) -> tuple[Path | None, Path | None]: """Construct the expected ``.kin`` and ``.res`` output paths. The naming convention mirrors what pdp3 produces: ``kin_{YYYY}{DOY}_{site}.kin`` and ``res_{YYYY}{DOY}_{site}.res``. These paths are used to check for pre-existing valid output before launching a new pdp3 run. Args: date: The observation date (used for YYYY and DOY components). site: Lowercase 4-char site ID. output_dir: Directory where outputs are stored. Returns: ``(kin_path, res_path)`` — both may point to non-existent files. """ doy = date.timetuple().tm_yday kin_name = f"kin_{date.year}{doy:03d}_{site.lower()}.kin" res_name = f"res_{date.year}{doy:03d}_{site.lower()}.res" kin_path = output_dir / kin_name res_path = output_dir / res_name return kin_path, res_path def _validate_kinfile(self, kin_path: Path, override: bool = False) -> bool: """Check whether a ``.kin`` output file already contains valid data. When *override* is ``True`` the check is skipped entirely and the method returns ``False`` (i.e. "no valid cached result") so that the caller always re-runs pdp3. When *override* is ``False`` the method returns ``True`` only if *kin_path* exists on disk **and** can be successfully parsed into a non-empty DataFrame of kinematic positions. Args: kin_path: Expected location of the ``.kin`` file. override: Force re-processing regardless of existing output. Returns: ``True`` if a valid, parseable output already exists. """ if not override: if not kin_path.exists(): return False # Attempt to parse the kinfile — only accept it if it yields data kin_df: pd.DataFrame | None = kin_to_kin_position_df(kin_path) if kin_df and not kin_df.empty: return True return False # ------------------------------------------------------------------ # # Public API # ------------------------------------------------------------------ #
[docs] def process( self, rinex: Path, *, site: str | None = None, date: datetime.date | None = None, override: bool = False, ) -> ProcessingResult: """Process one RINEX file end-to-end. This is the primary single-file entry point. The full pipeline is: 1. **Infer metadata** — site ID and observation date are extracted from the RINEX filename / header if not provided explicitly. 2. **Resolve products** — :meth:`GNSSClient.resolve_dependencies` walks the preference cascade (centre × timeliness) and downloads any missing products to ``pride_dir``. 3. **Check cache** — if a valid ``.kin`` output already exists in ``output_dir`` and *override* is ``False``, the run is skipped and a cached ``ProcessingResult`` is returned immediately. 4. **Write config** — a ``config_file`` is written to ``pride_dir/{year}/{doy}/``. 5. **Run pdp3** — the binary is executed with ``cwd`` set to the same ``{year}/{doy}/`` directory. Outputs are moved to ``output_dir``. Args: rinex: Path to the RINEX observation file. site: 4-char site ID. Inferred from the filename if omitted. date: Override observation date. When ``None`` the date is extracted from the RINEX header via ``rinex_get_time_range``. override: When ``True``, re-run pdp3 even if a valid ``.kin`` file already exists. Returns: A ``ProcessingResult`` summarising the run outcome, including paths to output files, the resolved products, and pdp3's return code. Raises: FileNotFoundError: If *rinex* does not exist. """ rinex = Path(rinex) if not rinex.exists(): raise FileNotFoundError(f"RINEX file not found: {rinex}") # --- 1. Infer metadata ------------------------------------------------ if site is None: site = _infer_site(rinex) # Determine date if date is None: ts_start, _ = rinex_get_time_range(rinex) start_date = ts_start.date() if isinstance(ts_start, datetime.datetime) else ts_start else: start_date = date if isinstance(date, datetime.date) else date.date() # Normalise to midnight UTC for the resolver target_dt = datetime.datetime( start_date.year, start_date.month, start_date.day, tzinfo=datetime.timezone.utc, ) # --- 2. Resolve products ---------------------------------------------- logger.info("Resolving products for %s (site=%s)", start_date, site) resolution = self._resolve(target_dt) logger.info(resolution.summary()) # --- 3. Check cache --------------------------------------------------- kin_file, res_file = self._build_kin_res_paths( date=target_dt, site=site, output_dir=self._output_dir ) if self._validate_kinfile(kin_file, override=override): logger.info( "Valid output already exists for %s on %s, skipping pdp3 run", site, start_date, ) return ProcessingResult( rinex_path=rinex, site=site, date=start_date, kin_path=kin_file, res_path=res_file if res_file and res_file.exists() else None, config_path=Path("(cached)"), resolution=resolution, ) if not resolution.all_required_fulfilled: missing = [r.spec for r in resolution.missing if r.required] logger.error("Missing required products: %s", missing) # --- 4. Write config -------------------------------------------------- # Config is persisted at pride_dir/{year}/{doy}/config_file so it can # be inspected after the run and reused by subsequent pdp3 calls for # the same date. work_dir = self._working_dir(start_date) sat_products, _ = _resolution_to_satellite_products(resolution) table_dir = _resolution_to_table_dir(resolution) config_path = _write_config(sat_products, table_dir, work_dir / "config_file") command = self._build_pdp_command(rinex=rinex, site=site, config_path=config_path) # --- 5. Run pdp3 ------------------------------------------------------ kin_path, res_path, returncode, stderr = self._run_pdp3( command=command, site=site, output_dir=self._output_dir, ) return ProcessingResult( rinex_path=rinex, site=site, date=start_date, kin_path=kin_path, res_path=res_path, config_path=config_path, resolution=resolution, returncode=returncode, stderr=stderr, )
[docs] def process_batch( self, rinex_files: Sequence[Path], *, sites: Sequence[str] | None = None, max_workers: int = 1, override: bool = False, ) -> Iterator[ProcessingResult]: """Process multiple RINEX files, sharing product resolution per date. This is the preferred entry point when processing many files. The pipeline is structured to minimise redundant network calls: 1. **Group by date** — RINEX files are sorted by observation date so that product resolution (the expensive step) happens exactly once per unique date. 2. **Resolve & write configs** — for each unique date, resolve products and write a ``config_file`` to the year/doy working directory under ``pride_dir``. 3. **Skip cached results** — any RINEX whose ``.kin`` output already exists and validates is returned immediately without running pdp3 (unless *override* is ``True``). 4. **Dispatch pdp3** — remaining jobs are dispatched to a ``ThreadPoolExecutor`` for parallel subprocess execution. Resolution and config generation are always single-threaded (main thread) to avoid race conditions. Args: rinex_files: Paths to RINEX observation files. sites: Per-file 4-char site IDs. When ``None``, inferred from filenames via ``_infer_site``. max_workers: Maximum number of concurrent pdp3 subprocesses. ``1`` means fully sequential execution. override: Re-run pdp3 even when valid output already exists. Yields: A ``ProcessingResult`` for each file as it completes. Cached results are yielded first, then pdp3 results in completion order. Wrap in ``list()`` if you need all results at once. Raises: ValueError: If *sites* length does not match *rinex_files*. """ if sites is None: sites = [_infer_site(Path(r)) for r in rinex_files] if len(sites) != len(rinex_files): raise ValueError(f"sites ({len(sites)}) must match rinex_files ({len(rinex_files)})") # --- Step 1: Gather metadata for each RINEX file ------------------------ # Build (rinex, site, date) tuples by reading each RINEX header. jobs: list[tuple[Path, str, datetime.date]] = [] for rinex, site in zip(rinex_files, sites): rinex = Path(rinex) ts_start, _ = rinex_get_time_range(rinex) d = ts_start.date() if isinstance(ts_start, datetime.datetime) else ts_start jobs.append((rinex, site, d)) # --- Step 2: Resolve products once per unique date ---------------------- # Sorting + groupby ensures each date is visited exactly once. jobs_sorted = sorted(jobs, key=lambda j: j[2]) resolutions: dict[datetime.date, DependencyResolution] = {} for date_key, group in groupby(jobs_sorted, key=lambda j: j[2]): target_dt = datetime.datetime( date_key.year, date_key.month, date_key.day, tzinfo=datetime.timezone.utc, ) logger.info("Resolving products for %s", date_key) resolutions[date_key] = self._resolve(target_dt) logger.info(resolutions[date_key].summary()) # --- Step 3: Write per-date config files in year/doy dirs --------------- work_dirs: dict[datetime.date, Path] = {} config_paths: dict[datetime.date, Path] = {} for date_key, resolution in resolutions.items(): sat_products, _ = _resolution_to_satellite_products(resolution) table_dir = _resolution_to_table_dir(resolution) work_dir = self._working_dir(date_key) work_dirs[date_key] = work_dir config_paths[date_key] = _write_config( sat_products, table_dir, work_dir / "config_file", ) # --- Step 4: Build commands, skip cached results ------------------------- pending: list[tuple[int, list[str], str, datetime.date]] = [] for i, (rinex, site, d) in enumerate(jobs): kin_file, res_file = self._build_kin_res_paths( date=d, site=site, output_dir=self._output_dir, ) if self._validate_kinfile(kin_file, override=override): logger.info("Valid output already exists for %s on %s, skipping", site, d) yield ProcessingResult( rinex_path=rinex, site=site, date=d, kin_path=kin_file, res_path=res_file if res_file and res_file.exists() else None, config_path=config_paths[d], resolution=resolutions[d], ) continue command = self._build_pdp_command( rinex=rinex, site=site, config_path=config_paths[d], ) pending.append((i, command, site, d)) # --- Step 5: Dispatch pdp3 subprocesses --------------------------------- # Only the pdp3 calls are parallelised; resolution and config writing # above are always single-threaded to avoid race conditions. if max_workers <= 1: for idx, command, site, d in pending: kin_path, res_path, rc, stderr = self._run_pdp3( command=command, site=site, output_dir=self._output_dir, ) rinex, site, d = jobs[idx] yield ProcessingResult( rinex_path=rinex, site=site, date=d, kin_path=kin_path, res_path=res_path, config_path=config_paths[d], resolution=resolutions[d], returncode=rc, stderr=stderr, ) else: with ThreadPoolExecutor(max_workers=max_workers) as pool: future_to_idx = { pool.submit( self._run_pdp3, command=cmd, site=site, output_dir=self._output_dir, ): idx for idx, cmd, site, d in pending } for future in as_completed(future_to_idx): idx = future_to_idx[future] kin_path, res_path, rc, stderr = future.result() rinex, site, d = jobs[idx] yield ProcessingResult( rinex_path=rinex, site=site, date=d, kin_path=kin_path, res_path=res_path, config_path=config_paths[d], resolution=resolutions[d], returncode=rc, stderr=stderr, )