Source code for gnss_product_management.factories.pipelines.resolve

"""ResolvePipeline — Find + Download + LockfileWriter in one call.

High-level composition that resolves a :class:`DependencySpec` for a
given date: finds resources, optionally downloads them, writes per-file
sidecar lockfiles, and persists an aggregate lockfile.

Fast path: if an aggregate lockfile already exists for
``(package, task, date, version)`` the pipeline returns immediately
without searching or downloading.
"""

from __future__ import annotations

import datetime
import logging
from concurrent.futures import ThreadPoolExecutor
from functools import partial

from gnss_product_management.environments import ProductRegistry, WorkSpace
from gnss_product_management.factories.models import FoundResource
from gnss_product_management.factories.pipelines.download import DownloadPipeline
from gnss_product_management.factories.pipelines.lockfile_writer import LockfileWriter
from gnss_product_management.factories.remote_transport import WormHole
from gnss_product_management.factories.search_planner import SearchPlanner
from gnss_product_management.lockfile.manager import LockfileManager
from gnss_product_management.lockfile.operations import get_package_version
from gnss_product_management.specifications.dependencies.dependencies import (
    Dependency,
    DependencyResolution,
    DependencySpec,
    ResolvedDependency,
    SearchPreference,
)
from gnss_product_management.utilities.paths import AnyPath, as_path

logger = logging.getLogger(__name__)


[docs] class ResolvePipeline: """Find → Download → Lockfile for every dependency in a spec. Uses :class:`ProductQuery`, :class:`DownloadPipeline`, and :class:`LockfileWriter` internally. All dependencies are resolved in parallel via a :class:`~concurrent.futures.ThreadPoolExecutor`. Fast path: if an aggregate lockfile already exists for the ``(package, task, date, version)`` identity, returns immediately without searching or downloading. Args: env: The product registry with built catalogs. workspace: Workspace with registered local resources. max_connections: Maximum concurrent connections per host. transport: Optional shared :class:`WormHole` instance. If provided, the pipeline reuses it instead of creating a new one — useful when :class:`GNSSClient` already holds a pool. """ def __init__( self, env: ProductRegistry, workspace: WorkSpace, *, max_connections: int = 4, transport: WormHole | None = None, ) -> None: from gnss_product_management.client.product_query import ProductQuery self._env = env self._workspace = workspace transport = transport or WormHole(max_connections=max_connections, product_registry=env) planner = SearchPlanner(product_registry=env, workspace=workspace) self._query = ProductQuery(wormhole=transport, search_planner=planner) self._downloader = DownloadPipeline( env, workspace, transport=transport, max_connections=max_connections, )
[docs] def run( self, spec: DependencySpec, date: datetime.datetime, *, sink_id: str = "local_config", centers: list[str] | None = None, download: bool = True, ) -> tuple[DependencyResolution, AnyPath | None]: """Resolve all dependencies in *spec* for *date*. Args: spec: The dependency specification. date: Target date (timezone-aware datetime). sink_id: Local resource alias for download destination and lockfile storage. centers: Restrict remote search to these center IDs. download: If ``True`` (default), download remote resources. Returns: A tuple of (:class:`DependencyResolution`, lockfile path or ``None`` if nothing was resolved). """ version = get_package_version() lockfile_dir = self._workspace.lockfile_dir(sink_id) manager = LockfileManager(lockfile_dir) # --- Fast path: return immediately if aggregate lockfile exists ----- lf_path = manager.lockfile_path( package=spec.package, task=spec.task, date=date, version=version, ) existing = manager.load( package=spec.package, task=spec.task, date=date, version=version, ) if existing is not None: logger.info( "Lockfile already exists for %s on %s — skipping resolution: %s", spec.name, date.date(), lf_path, ) resolution = self._resolution_from_lockfile(existing, spec) logger.info(resolution.summary()) return resolution, lf_path # --- Full resolution ------------------------------------------------- resolve_one = partial( self._resolve_one, date=date, sink_id=sink_id, preferences=spec.preferences, centers=centers, download=download, ) with ThreadPoolExecutor(max_workers=15) as executor: resolved = list(executor.map(resolve_one, spec.dependencies)) resolution = DependencyResolution(spec_name=spec.name, resolved=resolved) lf_path: AnyPath | None = None if resolution.fulfilled: writer = LockfileWriter(lockfile_dir, package=spec.package) lf_path = writer.write(resolution, date) logger.info(resolution.summary()) return resolution, lf_path
# -- Internal ------------------------------------------------------------ def _resolve_one( self, dep: Dependency, *, date: datetime.datetime, sink_id: str, preferences: list[SearchPreference], centers: list[str] | None, download: bool, ) -> ResolvedDependency: """Resolve a single dependency. Args: dep: The dependency to resolve. date: Target date. sink_id: Local resource alias. preferences: Spec-level preference cascade. centers: Remote center IDs to restrict to. download: Whether to download remote resources. Returns: A :class:`ResolvedDependency` with the resolution result. """ logger.debug("Attempting to resolve dependency %s on %s", dep.spec, date.date()) try: q = self._query.for_product(dep.spec).on(date) if dep.constraints: q = q.where(**dep.constraints) if preferences: for pref in preferences: q = q.prefer(**{pref.parameter: pref.sorting}) if centers: q = q.sources(*centers) candidates = q.search() found: FoundResource | None = candidates[0] if candidates else None except Exception as exc: logger.debug("No candidates for %s: %s", dep.spec, exc) return ResolvedDependency(spec=dep.spec, required=dep.required, status="missing") if found is None: logger.warning("No search results for dependency %s", dep.spec) return ResolvedDependency(spec=dep.spec, required=dep.required, status="missing") if found.is_local: return ResolvedDependency( spec=dep.spec, required=dep.required, status="local", local_path=str(found.path), remote_url="", ) if not download: return ResolvedDependency( spec=dep.spec, required=dep.required, status="remote", remote_url=found.uri, ) path = self._downloader.run(found, date, sink_id=sink_id) if path is None: logger.warning("Download failed for dependency %s", dep.spec) return ResolvedDependency(spec=dep.spec, required=dep.required, status="missing") logger.info("Downloaded %s%s", dep.spec, path) return ResolvedDependency( spec=dep.spec, required=dep.required, status="downloaded", local_path=str(path), remote_url=found.uri, ) def _resolution_from_lockfile( self, existing, spec: DependencySpec, ) -> DependencyResolution: """Reconstruct a :class:`DependencyResolution` from an existing lockfile. Iterates over every dependency in the spec (not just those in the lockfile), marking any absent or file-missing entries as ``'missing'``. Args: existing: The loaded :class:`DependencyLockFile`. spec: The dependency specification. Returns: A :class:`DependencyResolution` with one entry per dependency. """ locked = {lp.name: lp for lp in existing.products} resolved: list[ResolvedDependency] = [] for dep in spec.dependencies: lp = locked.get(dep.spec) if lp is None: resolved.append( ResolvedDependency(spec=dep.spec, required=dep.required, status="missing") ) continue sink_path = as_path(lp.sink) if lp.sink else None if sink_path is None or not sink_path.exists(): logger.warning( "Lockfile entry for %s points to missing file %s — will re-resolve on next run", dep.spec, lp.sink, ) resolved.append( ResolvedDependency(spec=dep.spec, required=dep.required, status="missing") ) continue resolved.append( ResolvedDependency( spec=dep.spec, required=dep.required, status="local", remote_url=lp.url, local_path=lp.sink, ) ) return DependencyResolution(spec_name=spec.name, resolved=resolved)