Source code for gnss_product_management.factories.models

"""Public return types and exceptions for the ProductRegistry API."""

from __future__ import annotations

import datetime
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import urlparse

from pydantic import BaseModel, ConfigDict, Field, PrivateAttr

from gnss_product_management.utilities.paths import AnyPath

if TYPE_CHECKING:
    from gnss_product_management.lockfile import DependencyLockFile


[docs] class FoundResource(BaseModel): """A discovered IGS product — either a local file or a remote URI. Returned by :meth:`GNSSClient.search` and :meth:`ProductQuery.search`. The most useful properties for geodetic workflows: - ``r.center`` — analysis center code (``AAA`` field, e.g. ``"WUM"``) - ``r.quality`` — timeliness code (``TTT`` field: ``"FIN"``, ``"RAP"``, ``"ULT"``) - ``r.filename`` — bare IGS long filename - ``r.uri`` — full remote URL (``ftp://...``) or local path - ``r.is_local`` — ``True`` if already on disk - ``r.downloaded`` — ``True`` after a successful download """ model_config = ConfigDict(arbitrary_types_allowed=True) product: str = Field(..., description="Product name (e.g. 'ORBIT', 'CLOCK').") source: str = Field(..., description="'local' or 'remote'.") uri: str = Field(..., description="Local file path or remote URL.") parameters: dict[str, str] = Field( default_factory=dict, description="All resolved parameter values." ) date: datetime.datetime | None = Field( default=None, description="Target date this resource was resolved for." ) local_path: AnyPath | None = Field( default=None, description="Local filesystem path after a successful download." ) # Internal: original SearchTarget, not serialized. Used by DownloadPipeline. _query: object | None = PrivateAttr(default=None) @property def center(self) -> str: """Analysis center identifier (e.g. ``'WUM'``), or ``''`` if not applicable.""" return self.parameters.get("AAA", "") @property def quality(self) -> str: """Solution quality/type (e.g. ``'FIN'``, ``'RAP'``), or ``''`` if not applicable.""" return self.parameters.get("TTT", "") @property def is_local(self) -> bool: """``True`` if this resource was found on the local filesystem.""" return self.source == "local" @property def path(self) -> Path | None: """Return the local :class:`Path` if this is a local resource, else ``None``.""" if self.is_local: return Path(self.uri) return None @property def hostname(self) -> str: """Server hostname, or ``''`` for local resources.""" return "" if self.is_local else (urlparse(self.uri).hostname or "") @property def protocol(self) -> str: """Transport protocol (e.g. ``'ftp'``, ``'https'``, ``'file'``).""" return "file" if self.is_local else (urlparse(self.uri).scheme or "") @property def directory(self) -> str: """Parent directory of the resource file.""" raw = self.uri if self.is_local else urlparse(self.uri).path return str(Path(raw).parent) @property def filename(self) -> str: """Filename (basename) of the resource.""" raw = self.uri if self.is_local else urlparse(self.uri).path return Path(raw).name @property def downloaded(self) -> bool: """``True`` if the file has been downloaded and exists on disk.""" return self.local_path is not None and Path(self.local_path).exists()
[docs] class Resolution(BaseModel): """Result of resolving all dependencies for a task.""" task: str = Field(..., description="Dependency spec name (e.g. 'pride-pppar').") paths: list[Path] = Field( default_factory=list, description="Local paths of all resolved products." ) lockfile: DependencyLockFile | None = None model_config = {"arbitrary_types_allowed": True}
[docs] class DiscoveryEntry(BaseModel): """A single entry in a discovery report.""" product: str center: str = "" quality: str = "" source: str = "" uri: str = ""
[docs] class DiscoveryReport(BaseModel): """Structured summary of available products for a date.""" entries: list[DiscoveryEntry] = Field(default_factory=list) @property def products(self) -> list[str]: """Sorted list of unique product names in this report.""" return sorted(set(e.product for e in self.entries)) @property def centers(self) -> list[str]: """Sorted list of unique center identifiers in this report.""" return sorted(set(e.center for e in self.entries if e.center))
[docs] def filter(self, product: str | None = None, center: str | None = None) -> list[DiscoveryEntry]: """Filter entries by product name and/or center. Args: product: Product name filter. center: Center identifier filter. Returns: Matching :class:`DiscoveryEntry` instances. """ out = self.entries if product: out = [e for e in out if e.product == product] if center: out = [e for e in out if e.center == center] return out
[docs] class MissingProductError(Exception): """Raised when a required product cannot be found during resolve()."""
[docs] def __init__(self, missing: list[str], task: str = ""): """Initialise with the list of missing product names. Args: missing: Product names that could not be found. task: Optional task identifier for the error message. """ self.missing = missing self.task = task products = ", ".join(missing) msg = ( f"Missing required products for task {task!r}: {products}" if task else f"Missing required products: {products}" ) super().__init__(msg)