"""GNSSClient — high-level entry point for searching and downloading GNSS products."""
from __future__ import annotations
import datetime
import logging
from itertools import groupby
from pathlib import Path
from typing import TYPE_CHECKING
from gpm_specs.configs import LOCAL_SPEC_DIR
from gnss_product_management.client.product_query import ProductQuery
from gnss_product_management.defaults import DefaultProductEnvironment
from gnss_product_management.environments import WorkSpace
from gnss_product_management.factories.models import FoundResource
from gnss_product_management.factories.pipelines.download import DownloadPipeline
from gnss_product_management.factories.pipelines.resolve import ResolvePipeline
from gnss_product_management.factories.remote_transport import WormHole
from gnss_product_management.factories.search_planner import SearchPlanner
from gnss_product_management.specifications.dependencies.dependencies import (
DependencyResolution,
DependencySpec,
SearchPreference,
)
from gnss_product_management.utilities.paths import AnyPath
if TYPE_CHECKING:
from gnss_product_management.environments import ProductRegistry
logger = logging.getLogger(__name__)
[docs]
class GNSSClient:
"""Single entry point for IGS product search, download, and dependency resolution.
Wraps :class:`ProductQuery`, :class:`DownloadPipeline`, and
:class:`ResolvePipeline`. All user-facing operations go through this class.
Use :meth:`from_defaults` to build from the bundled IGS center and product
specs. Pass a ``base_dir`` to enable downloads and local-first resolution::
# Search only (no local sink)
client = GNSSClient.from_defaults()
results = client.search(date, product="ORBIT", parameters={"TTT": "FIN"})
# Search and download to a local directory
client = GNSSClient.from_defaults(base_dir="/data/gnss")
paths = client.download(results[:1], sink_id="local")
# Search and download to S3
client = GNSSClient.from_defaults(base_dir="s3://my-bucket/gnss")
paths = client.download(results[:1], sink_id="local")
# Full dependency resolution (orbit + clock + bias + ERP + ...)
resolution, lockfile = client.resolve_dependencies("spec.yaml", date, sink_id="local")
``max_connections`` sets the per-host FTP/HTTPS connection pool size.
CDDIS (NASA FTPS) enforces strict anonymous connection limits; keep it at
2–4 for CDDIS-heavy queries. FTP centers (COD, WUM, GFZ) generally
tolerate 6–8.
Args:
product_registry: Built :class:`ProductRegistry` with loaded product
and center specs.
workspace: :class:`WorkSpace` with registered local directories.
max_connections: Maximum concurrent connections per host (default 4).
"""
def __init__(
self,
product_registry: ProductRegistry,
workspace: WorkSpace,
*,
max_connections: int = 4,
) -> None:
self._product_registry = product_registry
self._workspace = workspace
self._transport = WormHole(
max_connections=max_connections, product_registry=product_registry
)
self._planner = SearchPlanner(product_registry=product_registry, workspace=workspace)
self._query = ProductQuery(wormhole=self._transport, search_planner=self._planner)
self._downloader = DownloadPipeline(
product_registry,
workspace,
transport=self._transport,
)
[docs]
@classmethod
def from_defaults(
cls,
base_dir: Path | str | None = None,
*,
local_alias: str = "local",
max_connections: int = 4,
) -> GNSSClient:
"""Construct a client from the bundled default specs.
Loads the pre-built :data:`DefaultProductEnvironment` and creates a
fresh :class:`WorkSpace` from the bundled local layout specs. If
*base_dir* is provided, all local specs are registered against that
directory, enabling downloads and local-first resolution.
*base_dir* may be a local path **or** a cloud storage URI supported
by `cloudpathlib <https://cloudpathlib.drivendata.org/>`_::
# Local
client = GNSSClient.from_defaults(base_dir="/data/gnss")
# Amazon S3
client = GNSSClient.from_defaults(base_dir="s3://my-bucket/gnss")
# Google Cloud Storage
client = GNSSClient.from_defaults(base_dir="gs://my-bucket/gnss")
Args:
base_dir: Root directory for local product storage. If ``None``,
the client operates in search-only mode (no local sink).
local_alias: Alias for the registered local resource (default
``"local"``). Used as the ``sink_id`` in
:meth:`download` and :meth:`resolve_dependencies`.
max_connections: Maximum concurrent connections per host.
Keep at 2–4 for CDDIS; 6–8 for COD, WUM, GFZ.
Returns:
A configured :class:`GNSSClient` instance.
"""
workspace = WorkSpace()
for path in Path(LOCAL_SPEC_DIR).glob("*.yaml"):
workspace.add_resource_spec(path)
if base_dir is not None:
spec_ids = list(workspace._resource_specs.keys())
workspace.register_spec(
base_dir=Path(base_dir),
spec_ids=spec_ids,
alias=local_alias,
)
return cls(
product_registry=DefaultProductEnvironment,
workspace=workspace,
max_connections=max_connections,
)
[docs]
def display(self) -> None:
"""Display the client's loaded product registry and workspace specs."""
self._product_registry.display()
self._workspace.display()
[docs]
def query(self) -> ProductQuery:
"""Return a fluent :class:`ProductQuery` builder.
This is the preferred entry point for building searches. Chain
calls to narrow the query, then call :meth:`ProductQuery.search`
or :meth:`ProductQuery.download` to execute::
results = (
client.query()
.for_product("ORBIT")
.on(date)
.where(TTT="FIN")
.sources("COD", "ESA")
.search()
)
Returns:
A :class:`ProductQuery` bound to this client.
"""
return ProductQuery(
wormhole=self._transport,
search_planner=self._planner,
)
[docs]
def search(
self,
date: datetime.datetime,
product: str | dict,
*,
parameters: dict | None = None,
preferences: list[SearchPreference] | None = None,
local_resources: list[str] | None = None,
remote_resources: list[str] | None = None,
) -> list[FoundResource]:
"""Search for IGS products matching the given criteria.
Builds :class:`SearchTarget` objects from the product catalog, lists
remote directories via the connection pool, matches filenames by regex,
and ranks results by preference and protocol.
Key ``parameters`` keys follow the IGS long filename convention:
- ``TTT`` — solution timeliness: ``"FIN"`` (final, ≥13 d),
``"RAP"`` (rapid, ≤17 h), ``"ULT"`` (ultra-rapid, ≤3 h)
- ``AAA`` — analysis center: ``"COD"``, ``"ESA"``, ``"GFZ"``,
``"WUM"``, ``"IGS"``, …
For the full fluent interface (including date-range searches) use
:meth:`query` instead.
Args:
date: Target date (timezone-aware datetime).
product: Product name (e.g. ``"ORBIT"``, ``"CLOCK"``, ``"BIA"``)
or a dict with ``name``, and optionally ``version`` /
``variant``.
parameters: Parameter constraints, e.g.
``{"TTT": "FIN", "AAA": "WUM"}``.
preferences: Preference cascade for ranking. Each
:class:`SearchPreference` names a parameter and an ordered
list of preferred values, e.g.
``SearchPreference(parameter="TTT", sorting=["FIN","RAP"])``.
local_resources: Restrict to these local resource aliases.
remote_resources: Restrict to these remote center IDs.
Returns:
Ranked list of :class:`FoundResource` objects. Local (file://)
results precede remote ones; within each protocol tier results
are ordered by *preferences*.
"""
product_name: str = product.get("name", "") if isinstance(product, dict) else product
q = self._query.for_product(product_name).on(date)
if parameters:
q = q.where(**parameters)
if preferences:
for pref in preferences:
q = q.prefer(**{pref.parameter: pref.sorting})
if local_resources or remote_resources:
sources = list(local_resources or []) + list(remote_resources or [])
q = q.sources(*sources)
return q.search()
[docs]
def download(
self,
results: list[FoundResource],
*,
sink_id: str,
) -> list[Path]:
"""Download product candidates to the local sink.
Pass a pre-sliced list to limit how many files are fetched, e.g.
``client.download(results[:1], sink_id="local")``.
Args:
results: Ranked :class:`FoundResource` list from :meth:`search`
or :meth:`ProductQuery.search`. Each result must carry a
``date`` (set automatically by the query builders).
sink_id: Local resource identifier (alias registered in the
workspace, e.g. ``"local"``).
Returns:
Paths to successfully downloaded (and decompressed) files.
"""
# by_date: Dict[datetime.datetime, List[FoundResource]] = {}
# for r in results:
# if r.date is None:
# logger.warning("FoundResource has no date; skipping.")
# continue
# by_date.setdefault(r.date, []).append(r)
by_date = {
k: list(v)
for k, v in groupby(
results,
key=lambda r: r.date if r.date is not None else datetime.datetime.min,
)
}
paths: list[Path] = []
for date, date_results in by_date.items():
downloaded = self._downloader.run(date_results, date, sink_id=sink_id)
for r, path in zip(date_results, downloaded):
if path is not None:
r.local_path = path
if isinstance(path, Path):
paths.append(path)
return paths
[docs]
def resolve_dependencies(
self,
dep_spec: DependencySpec | Path | str,
date: datetime.datetime,
*,
sink_id: str,
) -> tuple[DependencyResolution, AnyPath | None]:
"""Resolve all dependencies in a spec for the given date.
Accepts a :class:`DependencySpec` object or a path to a YAML file.
Checks local disk first, then downloads any missing files. If a
``DependencyLockFile`` already exists for ``(package, task, date,
version)``, resolution returns immediately without any network calls.
Args:
dep_spec: Dependency specification — a :class:`DependencySpec`
instance, or a path (``str`` or :class:`Path`) to a YAML file.
The spec encodes which products are required, the center/
timeliness preference cascade, and per-product constraints.
date: Target date (timezone-aware datetime, midnight UTC).
sink_id: Local resource alias for storing resolved files
(e.g. ``"local"``).
Returns:
A ``(DependencyResolution, lockfile_path)`` tuple.
``DependencyResolution`` exposes ``.summary()``, ``.table()``,
``.product_paths()``, ``.missing``, and
``.all_required_fulfilled``.
"""
if isinstance(dep_spec, (str, Path)):
dep_spec = DependencySpec.from_yaml(dep_spec)
pipeline = ResolvePipeline(
env=self._product_registry,
workspace=self._workspace,
transport=self._transport,
)
return pipeline.run(dep_spec, date, sink_id=sink_id)