Source code for gnss_product_management.client.product_query

"""ProductQuery — fluent builder for GNSS product search and download."""

from __future__ import annotations

import copy
import datetime
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import cast

from gnss_product_management.factories.models import FoundResource
from gnss_product_management.factories.ranking import (
    sort_by_preferences,
    sort_by_protocol,
)
from gnss_product_management.factories.remote_transport import WormHole
from gnss_product_management.factories.search_planner import SearchPlanner
from gnss_product_management.specifications.dependencies.dependencies import (
    SearchPreference,
)
from gnss_product_management.specifications.remote.resource import SearchTarget

logger = logging.getLogger(__name__)


[docs] class ProductQuery: """Fluent builder for constructing and executing a GNSS product search. Constructed via :meth:`GNSSClient.query` — do not instantiate directly. Chain method calls to build the query, then call :meth:`search` or :meth:`download` to execute:: results = ( client.query("ORBIT") .on(date) .where(TTT="FIN") .sources("COD", "ESA") .prefer(TTT=["FIN", "RAP", "ULT"]) .search() ) paths = ( client.query("CLOCK") .on(date) .where(TTT="FIN") .sources("local", "COD") .download(sink_id="local") ) Args: fetcher: :class:`WormHole` used for directory listing and file download. query_factory: :class:`SearchPlanner` used to build :class:`SearchTarget` objects from product specs. product: Product name (e.g. ``"ORBIT"``) or dict with ``name``, and optionally ``version`` / ``variant``. """ def __init__( self, wormhole: WormHole, search_planner: SearchPlanner, ) -> None: self._wormhole = wormhole self._search_planner = search_planner self._product: dict | None = None self._date: datetime.datetime | None = None self._date_range: tuple[datetime.datetime, datetime.datetime, datetime.timedelta] | None = ( None ) self._parameters: dict = {} self._source_ids: tuple | None = None # None = all sources self._preferences: list[SearchPreference] = []
[docs] def for_product(self, product: str | dict) -> ProductQuery: """Set the target product for the query. Args: product: Product name (e.g. ``"ORBIT"``) or dict with ``name``, and optionally ``version`` / ``variant``. Returns: ``self`` for chaining. """ clone = copy.copy(self) clone._product = {"name": product} if isinstance(product, str) else product return clone
[docs] def on(self, date: datetime.datetime) -> ProductQuery: """Set the target date for the query. Args: date: Timezone-aware datetime. Returns: ``self`` for chaining. """ clone = copy.copy(self) clone._date = date return clone
[docs] def on_range( self, start: datetime.datetime, end: datetime.datetime, *, step: datetime.timedelta = datetime.timedelta(days=1), ) -> ProductQuery: """Set a date range for the query. Searches are run for every date from *start* to *end* (inclusive) with the given *step* (default: 1 day). Results from all dates are merged into a single flat list from :meth:`search`. Args: start: First date to query (inclusive). end: Last date to query (inclusive). step: Interval between consecutive dates (default: 1 day). Returns: ``self`` for chaining. Raises: ValueError: If *start* is after *end*. """ if start > end: raise ValueError(f"start ({start.date()}) must not be after end ({end.date()})") clone = copy.copy(self) clone._date = None clone._date_range = (start, end, step) return clone
[docs] def where(self, **parameters) -> ProductQuery: """Constrain product parameters (hard filter). Keyword arguments use IGS long filename field codes as keys. Common constraints: - ``TTT="FIN"`` — final solutions only (≥13 days latency) - ``TTT="RAP"`` — rapid solutions only (≤17 hours latency) - ``TTT="ULT"`` — ultra-rapid solutions only (≤3 hours latency) - ``AAA="WUM"`` — Wuhan University products only - ``AAA=["WUM", "COD"]`` — WUM or COD only Use :meth:`prefer` instead of :meth:`where` when you want to rank results without excluding alternatives. Args: **parameters: IGS field name → required value or list of values. Returns: ``self`` for chaining. """ clone = copy.copy(self) clone._parameters.update(parameters) return clone
[docs] def sources(self, *ids: str) -> ProductQuery: """Restrict the search to specific local or remote sources. Pass any mix of registered local aliases and remote center IDs. Each ID is resolved at :meth:`search` time — local aliases are routed to local disk, everything else is treated as a remote center ID. Calling :meth:`sources` with no arguments is an error; omit the call entirely to search all available sources. Args: *ids: One or more source identifiers. Returns: ``self`` for chaining. Raises: ValueError: If no IDs are provided. """ if not ids: raise ValueError( "sources() requires at least one resource ID. " "Omit the call entirely to search all sources." ) clone = copy.copy(self) clone._source_ids = ids return clone
[docs] def prefer(self, **kwargs) -> ProductQuery: """Rank results by a preference cascade without hard-filtering. Unlike :meth:`where`, ``prefer`` keeps all matching products in the result list but sorts them so the most-preferred appear first. The standard IGS timeliness cascade is:: .prefer(TTT=["FIN", "RAP", "ULT"]) Center preference can be layered on top:: .prefer(TTT=["FIN", "RAP", "ULT"], AAA=["WUM", "COD", "GFZ"]) Multiple :meth:`prefer` calls accumulate in call order; later calls do not override earlier ones. Args: **kwargs: IGS field name → ordered list of preferred values, most preferred first. Returns: ``self`` for chaining. """ clone = copy.copy(self) for param, sorting in kwargs.items(): if isinstance(sorting, str): sorting = [sorting] clone._preferences.append(SearchPreference(parameter=param, sorting=list(sorting))) return clone
def _ranked_targets(self) -> list[SearchTarget]: """Return sorted :class:`SearchTarget` candidates before deduplication. Builds queries via :class:`SearchPlanner`, expands them through :class:`WormHole`, then applies preference and protocol sorting. The result is the pre-deduplication list used both by :meth:`search` and by :class:`ResolvePipeline`. Returns: Sorted list of :class:`SearchTarget` objects, local/file first, then ordered by *preferences* within each protocol tier. Raises: ValueError: If :meth:`for_product` or :meth:`on` have not been called. """ if self._product is None: raise ValueError("Call .for_product(product) before searching") if self._date is None: raise ValueError("Call .on(date) before searching") local_ids, remote_ids = self._resolve_sources() queries = self._search_planner.get( date=self._date, product=self._product, parameters=self._parameters or None, local_resources=local_ids, remote_resources=remote_ids, ) expanded = self._wormhole.search(queries) if not expanded: logger.debug( "No search targets found for product %s on date %s", self._product, self._date, ) logger.debug( "Expanded %d queries into %d targets for %s on %s", len(queries), len(expanded), self._product, self._date, ) if self._preferences: expanded = sort_by_preferences(expanded, self._preferences) return sort_by_protocol(expanded) def _search_range(self) -> list[FoundResource]: """Execute a search for every date in the configured range. Runs one :meth:`search` call per date in parallel using a :class:`~concurrent.futures.ThreadPoolExecutor` (max 8 workers) and merges all results into a single flat list. Returns: Combined list of :class:`FoundResource` objects from all dates. """ assert self._date_range is not None start, end, step = self._date_range dates: list[datetime.datetime] = [] current = start while current <= end: dates.append(current) current += step logger.debug( "on_range(): %d dates from %s to %s", len(dates), start.date(), end.date(), ) all_results: list[FoundResource] = [] with ThreadPoolExecutor(max_workers=min(len(dates), 8)) as executor: future_to_date = {executor.submit(self.on(date).search): date for date in dates} for future in as_completed(future_to_date): date = future_to_date[future] try: all_results.extend(future.result()) except Exception as exc: logger.warning("Search failed for date %s: %s", date.date(), exc) return all_results
[docs] def search(self) -> list[FoundResource]: """Execute the query and return ranked results. Returns: Ranked list of :class:`FoundResource` objects, best first. Local/file results precede remote ones; within each protocol tier results are ordered by *preferences*. When :meth:`on_range` was used, results from all dates are returned as a flat combined list. Raises: ValueError: If neither :meth:`on` nor :meth:`on_range` has been called. """ if self._product is None: raise ValueError("Call .for_product(product) before .search()") if self._date_range is not None: return self._search_range() if self._date is None: raise ValueError("Call .on(date) or .on_range(start, end) before .search()") logger.debug( "search() product=%s date=%s parameters=%s sources=%s", self._product, self._date, self._parameters, self._source_ids, ) ranked = self._ranked_targets() results: list[FoundResource] = [] seen: dict[tuple[str, str], bool] = {} for rq in ranked: hostname = rq.server.hostname filename: str = (rq.product.filename.value or "") if rq.product.filename else "" # type: ignore[union-attr] key: tuple[str, str] = (hostname, filename) if key in seen: continue seen[key] = True params = {p.name: p.value for p in rq.product.parameters if p.value is not None} protocol = (rq.server.protocol or "").upper() is_local = protocol in ("FILE", "LOCAL") if is_local: uri = str( Path(hostname) / (rq.directory.value or rq.directory.pattern) # type: ignore[union-attr] / filename ) else: proto = (rq.server.protocol or "ftp").lower() uri = ( f"{proto}://{hostname}/{rq.directory.value or rq.directory.pattern}/{filename}" # type: ignore[union-attr] ) r = FoundResource( product=rq.product.name, source="local" if is_local else "remote", uri=uri, parameters=params, date=self._date, ) r._query = rq results.append(r) return results
[docs] def download( self, sink_id: str, *, limit: int | None = None, ) -> list[Path]: """Search and download results in one call. Args: sink_id: Local resource alias to download into (e.g. ``"local"``). limit: Maximum number of files to download. ``None`` downloads all results. Returns: Paths to successfully downloaded files. Raises: ValueError: If :meth:`for_product` or :meth:`on` have not been called. """ # search() validates that _date is set via _ranked_targets() results = self.search() if limit is not None: results = results[:limit] assert self._date is not None # guaranteed by search() above paths: list[Path] = [] for r in results: if r._query is None: logger.warning("FoundResource has no internal query; skipping.") continue path = self._wormhole.download_one( query=cast(SearchTarget, r._query), local_resource_id=sink_id, local_factory=self._search_planner._workspace, date=self._date, ) if path is not None: r.local_path = path if isinstance(path, Path): paths.append(path) return paths
# -- Internal -------------------------------------------------- def _resolve_sources(self) -> tuple[list[str] | None, list[str] | None]: """Split source IDs into (local_ids, remote_ids) for SearchPlanner. Returns: A ``(local_ids, remote_ids)`` tuple, each ``None`` when empty (meaning "all of that type"). """ if self._source_ids is None: return None, None local_ids: list[str] = [] remote_ids: list[str] = [] for sid in self._source_ids: try: self._search_planner._workspace._get_registered_spec(sid) local_ids.append(sid) except KeyError: remote_ids.append(sid) return local_ids or None, remote_ids or None