Source code for gnss_product_management.environments.workspace

"""Workspace management for local GNSS product storage.

Maps :class:`LocalResourceSpec` definitions (loaded from YAML) to concrete
base directories on disk so that local resources can be queried through the
same :class:`ResourceQuery` interface used for remote servers.

Base directories may be local filesystem paths (``/data/gnss``) or cloud
URIs (``s3://bucket/prefix``).  Path operations are dispatched through
:func:`~gnss_product_management.utilities.paths.as_path` so that all
filesystem interactions work uniformly regardless of backend.
"""

from __future__ import annotations

import datetime
import logging
import re
from pathlib import Path
from typing import TYPE_CHECKING

from pydantic import BaseModel

from gnss_product_management.specifications.local.local import LocalResourceSpec
from gnss_product_management.specifications.parameters.parameter import ParameterCatalog
from gnss_product_management.specifications.products.catalog import ProductCatalog
from gnss_product_management.specifications.products.product import (
    PathTemplate,
    Product,
)
from gnss_product_management.specifications.remote.resource import SearchTarget, Server
from gnss_product_management.utilities.helpers import _ensure_datetime
from gnss_product_management.utilities.paths import AnyPath, as_path

if TYPE_CHECKING:
    from gnss_product_management.environments.environment import ProductRegistry

logger = logging.getLogger(__name__)


[docs] def paths_overlap(p1: AnyPath | str, p2: AnyPath | str) -> bool: """Check whether two paths share a common ancestor-descendant relationship. For local paths, checks the resolved filesystem hierarchy. For cloud URIs, falls back to string prefix comparison (cloud paths have no symlinks to resolve). Args: p1: First path or URI. p2: Second path or URI. Returns: ``True`` if either path is a parent of (or equal to) the other. """ s1 = str(p1).rstrip("/") s2 = str(p2).rstrip("/") # Cloud paths — use string prefix comparison if "://" in s1 or "://" in s2: return s1.startswith(s2) or s2.startswith(s1) # Local paths — resolve symlinks before comparing r1 = Path(s1).resolve() r2 = Path(s2).resolve() return r1.is_relative_to(r2) or r2.is_relative_to(r1)
[docs] class RegisteredLocalResource(BaseModel): """A local resource spec that has been bound to a base directory. ``base_dir`` is stored as a URI string so that it can represent both local paths (``/data/gnss``) and cloud locations (``s3://bucket/prefix``). Use the :attr:`base_path` property to obtain the appropriate :class:`~pathlib.Path` or :class:`~cloudpathlib.CloudPath` object for filesystem operations. Attributes: name: Human-readable identifier for this resource. base_dir: Base directory URI (local path or cloud URI). spec: The underlying local resource specification. item_to_dir: Mapping of item names to their subdirectory. server: A ``file``-protocol :class:`Server` wrapping *base_dir*. """ name: str base_dir: str spec: LocalResourceSpec item_to_dir: dict[str, str] server: Server @property def base_path(self) -> AnyPath: """The base directory as a :class:`~pathlib.Path` or cloud path.""" return as_path(self.base_dir)
[docs] class WorkSpace: """Registry of local storage directories and their layout specifications. Manages the mapping between ``LocalResourceSpec`` definitions (loaded from YAML) and concrete base directories on disk or in cloud storage. Each registered spec gets a ``Server(protocol='file')`` so that local resources can be queried with the same ``ResourceQuery`` interface used for remote servers. Also provides product-resolution methods (``source_product``, ``sink_product``, ``lockfile_dir``, ``find_local_files``) once bound to a :class:`ProductRegistry` via :meth:`bind`. Attributes: _registered_specs: Mapping of spec names to registered resources. _alias_map: Mapping of aliases to canonical spec names. _resource_specs: Loaded but not-yet-registered spec objects. _product_catalog: Product catalog (available after :meth:`bind`). _parameter_catalog: Parameter catalog (available after :meth:`bind`). Usage:: ws = WorkSpace() ws.add_resource_spec('local_config.yaml') ws.register_spec(base_dir='/data/gnss', spec_ids=['local_config'], alias='local') """
[docs] def __init__(self): """Initialise an empty workspace with no specs loaded.""" self._registered_specs: dict[str, RegisteredLocalResource] = {} self._alias_map: dict[str, str] = {} # alias → spec name self._resource_specs: dict[str, LocalResourceSpec] = {} self._product_catalog: ProductCatalog | None = None self._parameter_catalog: ParameterCatalog | None = None
[docs] def bind(self, product_registry: ProductRegistry) -> None: """Inject catalog references from a built :class:`ProductRegistry`. Must be called before using :meth:`source_product`, :meth:`sink_product`, or :meth:`find_local_files`. Args: product_registry: A fully built registry with catalogs. """ self._product_catalog = product_registry._product_catalog self._parameter_catalog = product_registry._parameter_catalog
[docs] def add_resource_spec(self, path: Path | str, id: str | None = None) -> None: """Load a :class:`LocalResourceSpec` from a YAML file. Args: path: Path to the YAML specification file. id: Optional override for the spec name. Defaults to the name declared inside the YAML file. Raises: AssertionError: If *path* does not exist or a spec with the same name is already registered. """ path = Path(path) assert path.exists(), f"Resource spec file not found: {path}" assert path.is_file(), f"Resource spec path must be a file: {path}" spec = LocalResourceSpec.from_yaml(path) spec = spec.model_copy(update={"source_file": path}) name = spec.name if id is not None: name = id assert name not in self._resource_specs, ( f"Resource spec with name '{name}' already exists. Please choose a unique name." ) self._resource_specs[name] = spec
[docs] def register_spec( self, base_dir: AnyPath | str, spec_ids: list[str], alias: str | None = None ) -> None: """Bind loaded spec(s) to a base directory and register the result. *base_dir* may be a local filesystem path or a cloud URI such as ``s3://bucket/prefix``. When multiple *spec_ids* are given they are merged into a single :class:`LocalResourceSpec`. Args: base_dir: Root directory for the resource (local path or cloud URI). spec_ids: One or more previously loaded spec identifiers. alias: Optional alias that also maps to this resource. Raises: AssertionError: If *base_dir* does not exist or any *spec_id* has not been loaded. ValueError: If *alias* is already in use or *base_dir* overlaps with an existing registration. """ base_path = as_path(str(base_dir)) assert base_path.exists(), f"Base directory not found: {base_dir}" assert base_path.is_dir(), f"Base directory must be a directory: {base_dir}" specs_to_register: list[LocalResourceSpec] = [] for spec_id in spec_ids: assert spec_id in self._resource_specs, ( f"Spec id '{spec_id}' not found. Available specs: {list(self._resource_specs.keys())}" ) built_spec = self._resource_specs[spec_id] specs_to_register.append(built_spec) spec_to_register = LocalResourceSpec.merge(specs_to_register) server = Server( id=spec_to_register.name, hostname=str(base_path), protocol="file", auth_required=False, description=specs_to_register[-1].description, ) if alias: if alias in self._alias_map: alias_mapped_spec = self._alias_map[alias] if alias_mapped_spec != spec_to_register.name: raise ValueError( f"Alias {alias!r} is already in use for spec {self._alias_map[alias]!r}." ) self._alias_map[alias] = spec_to_register.name item_to_dir: dict[str, str] = {} for coll_name, coll in spec_to_register.collections.items(): for item in coll.items: if item in item_to_dir: raise ValueError( f"Spec {item!r} is in multiple collections: " f"{item_to_dir[item]!r} and {coll_name!r}" ) item_to_dir[item] = coll.directory local_resource = RegisteredLocalResource( name=spec_to_register.name, base_dir=str(base_path), spec=spec_to_register, item_to_dir=item_to_dir, server=server, ) # Check for overlapping base directories with existing registered resources for registered_spec in self._registered_specs.values(): if paths_overlap(registered_spec.base_dir, str(base_path)): raise ValueError( f"Base directory {base_dir!r} overlaps with existing base directory " f"{registered_spec.base_dir!r} for spec {registered_spec.name!r}. " f"Please choose non-overlapping base directories for local resources." ) self._registered_specs[spec_to_register.name] = local_resource
[docs] def register( self, spec: LocalResourceSpec | Path | str, base_dir: Path | str, alias: str | None = None, ) -> None: """Register a local resource specification in one step. Convenience alternative to :meth:`add_resource_spec` + :meth:`register_spec`. Accepts a :class:`LocalResourceSpec` object or a path to a YAML file. Args: spec: A :class:`LocalResourceSpec`, or a path to a YAML file. base_dir: Root directory on disk for the resource. alias: Optional alias that also maps to this resource. Raises: ValueError: If *base_dir* overlaps with an existing registration or *alias* is already taken. """ if isinstance(base_dir, str): base_dir = Path(base_dir) if isinstance(spec, (Path, str)): spec = LocalResourceSpec.from_yaml(str(spec)) base_path = as_path(str(base_dir)) if base_dir else None server = Server( id=spec.name, hostname=str(base_path) if base_path else spec.name, protocol="file", auth_required=False, description=spec.description, ) name = spec.name if name in self._registered_specs: raise ValueError(f"Local resource {name!r} is already registered.") for registered_spec in self._registered_specs.values(): if paths_overlap(registered_spec.base_dir, base_dir): raise ValueError( f"Base directory {base_dir!r} for spec {name!r} overlaps with " f"existing base directory {registered_spec.base_dir!r} for spec " f"{registered_spec.name!r}. Please choose non-overlapping base " f"directories for local resources." ) if alias: if alias in self._alias_map: raise ValueError( f"Alias {alias!r} is already in use for spec {self._alias_map[alias]!r}." ) self._alias_map[alias] = name item_to_dir: dict[str, str] = {} for coll_name, coll in spec.collections.items(): for item in coll.items: if item in item_to_dir: raise ValueError( f"Spec {item!r} is in multiple collections: " f"{item_to_dir[item]!r} and {coll_name!r}" ) item_to_dir[item] = coll.directory self._registered_specs[name] = RegisteredLocalResource( name=name, base_dir=str(as_path(str(base_dir))), spec=spec, item_to_dir=item_to_dir, server=server, )
@property def resource_ids(self) -> list[str]: """Identifiers for all registered local resources.""" return list(self._registered_specs.keys()) def _get_registered_spec(self, name_or_alias: str) -> RegisteredLocalResource: """Look up a registered local resource by name or alias. Args: name_or_alias: Resource name or alias. Returns: The registered local resource. Raises: KeyError: If *name_or_alias* is not found. """ if name_or_alias in self._alias_map: name_or_alias = self._alias_map[name_or_alias] registered_spec = self._registered_specs.get(name_or_alias) if registered_spec is None: raise KeyError( f"Local resource {name_or_alias!r} not found. " f"Known resources: {list(self._registered_specs.keys())}" ) return registered_spec
[docs] def source_product(self, product: Product, resource_id: str) -> list[SearchTarget]: """Resolve a product into SearchTarget objects for a local resource. Args: product: Product to resolve. resource_id: Local resource identifier. Returns: A list of :class:`SearchTarget` objects. Raises: KeyError: If *resource_id* or *product.name* is not found. """ registered_spec = self._get_registered_spec(resource_id) directory_template = registered_spec.item_to_dir.get(product.name) if directory_template is None: raise KeyError( f"Product {product.name!r} not found in resource {resource_id!r}. " f"Known products: {sorted(registered_spec.item_to_dir.keys())}" ) directory = PathTemplate(pattern=directory_template) directory.derive(product.parameters) return [ SearchTarget( product=product, server=registered_spec.server, directory=directory, ) ]
[docs] def sink_product( self, product: Product, resource_id: str, date: datetime.date, ) -> SearchTarget: """Resolve the local directory for a product on a given date. Args: product: Product to locate. resource_id: Local resource identifier. date: Target date for computed template fields. Returns: A :class:`SearchTarget` with resolved directory path. Raises: KeyError: If *resource_id* or *product.name* is not found. """ assert self._parameter_catalog is not None, "Call bind() before sink_product()" dt = _ensure_datetime(date) registered_spec = self._get_registered_spec(resource_id) directory_template = registered_spec.item_to_dir.get(product.name) if directory_template is None: raise KeyError( f"Spec {product.name!r} not found in any local collection. " f"Known specs: {list(registered_spec.item_to_dir.keys())}" ) resolved = self._parameter_catalog.interpolate(directory_template, dt, computed_only=True) out_query = SearchTarget( product=product, server=registered_spec.server, directory=PathTemplate(pattern=directory_template, value=resolved), ) return out_query
[docs] def lockfile_dir( self, resource_id: str, ) -> AnyPath: """Return the dependency lockfile directory for a resource. For local resources this is a subdirectory of the base directory. For cloud resources (e.g. ``s3://bucket/prefix``) it is an equivalent cloud path, enabling distributed workers to share lockfile state via cloud storage. Args: resource_id: Local resource identifier. Returns: Path to the lockfile directory (created if needed). """ registered_spec = self._get_registered_spec(resource_id) dep_lockfile_dir = registered_spec.base_path / "dependency_lockfiles" dep_lockfile_dir.mkdir(parents=True, exist_ok=True) return dep_lockfile_dir
[docs] def find_local_files( self, query: SearchTarget, date: datetime.date | None = None, ) -> list[AnyPath]: """Search local or cloud storage for files matching a query. Works identically for local :class:`~pathlib.Path` and cloud :class:`~cloudpathlib.CloudPath` base directories. Args: query: SearchTarget with directory and filename patterns. date: Optional date for interpolating computed fields. Returns: Sorted list of matching paths (local or cloud). Raises: KeyError: If the query's server is not registered. """ assert self._parameter_catalog is not None, "Call bind() before find_local_files()" dir_pattern = query.directory.pattern if date: date = _ensure_datetime(date) dir_pattern = self._parameter_catalog.interpolate(dir_pattern, date, computed_only=True) # Find the registered spec that owns this query's server. registered_spec = None for candidate in self._registered_specs.values(): if candidate.server.id == query.server.id: registered_spec = candidate break if registered_spec is None: raise KeyError( f"Server {query.server.id!r} not found in any registered local resource. " f"Known servers: {[s.server.id for s in self._registered_specs.values()]}" ) file_pattern = query.product.filename.pattern if query.product.filename else None if date and file_pattern: date = _ensure_datetime(date) file_pattern = self._parameter_catalog.interpolate( file_pattern, date, computed_only=True ) search_dir = registered_spec.base_path / dir_pattern if file_pattern: return sorted( p for p in search_dir.iterdir() if p.is_file() and re.search(file_pattern, p.name, re.IGNORECASE) ) return sorted(p for p in search_dir.iterdir() if p.is_file())
# ---- Rich display -------------------------------------------------------
[docs] def display(self) -> None: """Print a rich summary of loaded specs and registered local resources. Prints two tables: - **Loaded Specs** — every spec loaded via :meth:`add_resource_spec`, with its name and product list. - **Registered Resources** — every resource bound to a base directory via :meth:`register_spec`, with its alias(es), base directory, and the products it covers. Requires the ``rich`` package (bundled as a project dependency). """ from rich import box from rich.console import Console from rich.table import Table console = Console() # Reverse the alias map: spec_name → [alias, ...] spec_aliases: dict[str, list[str]] = {} for alias, spec_name in self._alias_map.items(): spec_aliases.setdefault(spec_name, []).append(alias) if self._resource_specs: st = Table( title="[bold]Loaded Local Specs[/bold]", box=box.ROUNDED, show_lines=False, header_style="bold white", ) st.add_column("Spec Name", style="bold cyan", no_wrap=True) st.add_column("Collections", style="dim") for spec_name, spec in sorted(self._resource_specs.items()): collections = sorted(spec.collections.keys()) st.add_row(spec_name, ", ".join(collections)) console.print(st) if self._registered_specs: rt = Table( title="[bold]Registered Local Resources[/bold]", box=box.ROUNDED, show_lines=True, header_style="bold white", ) rt.add_column("Name", style="bold green", no_wrap=True) rt.add_column("Alias(es)", style="dim", no_wrap=True) rt.add_column("Spec", style="dim", no_wrap=True) rt.add_column("Spec File", style="dim") rt.add_column("Base Directory", style="dim") rt.add_column("Products", style="dim") for name, resource in sorted(self._registered_specs.items()): aliases = ", ".join(sorted(spec_aliases.get(name, []))) products = sorted(resource.item_to_dir.keys()) spec = self._resource_specs.get(name) or resource.spec spec_file = str(spec.source_file) if spec.source_file else "" rt.add_row( name, aliases, spec.name, spec_file, resource.base_dir, "\n".join(products), ) console.print(rt)