Source code for gnss_product_management.lockfile.operations

"""Read, write, build, and validate lockfile entries.

Provides pure-function helpers for the lockfile lifecycle:

* **Validate** — verify that a lock-product's sink file still exists
  and that its hash matches.
* **Build** — construct a :class:`LockProduct` from a local or cloud file,
  computing its SHA-256 hash and byte size.
* **Read / Write** — serialize and deserialize individual
  :class:`LockProduct` sidecar JSON files and date-scoped
  :class:`DependencyLockFile` manifests.

All path arguments accept local :class:`~pathlib.Path` objects and cloud
URIs / :class:`~cloudpathlib.CloudPath` objects interchangeably via
:func:`~gnss_product_management.utilities.paths.as_path`.
"""

import datetime
import enum
import logging
from importlib.metadata import version as _get_package_version

from gnss_product_management.lockfile.models import (
    DependencyLockFile,
    LockProduct,
    LockProductAlternative,
)
from gnss_product_management.utilities.helpers import hash_file as _hash_file
from gnss_product_management.utilities.paths import AnyPath, as_path

logger = logging.getLogger(__name__)


[docs] def get_package_version() -> str: """Return the installed gnss-product-management version.""" try: return _get_package_version("gnss-product-management") except Exception: return "0.0.0-dev"
[docs] class HashMismatchMode(enum.Enum): """How to handle hash mismatches during lockfile validation.""" WARN = "warn" STRICT = "strict"
[docs] def validate_lock_product( product: LockProduct, mode: HashMismatchMode = HashMismatchMode.WARN, ) -> bool: """Check that a lock-product's sink file exists and its hash matches. Also accepts the decompressed version of a ``.gz`` sink (e.g. ``foo.SP3`` when the lock records ``foo.SP3.gz``). Args: product: The lock-product entry to validate. mode: How to handle hash mismatches. ``WARN`` logs a warning but returns ``True``. ``STRICT`` returns ``False`` so the caller can re-download. Returns: ``True`` if the sink file (or its decompressed counterpart) exists and (when a hash is recorded) the file's current SHA-256 matches the stored hash (or mode is WARN). """ sink_path = as_path(product.sink) # Accept the decompressed version when the .gz is gone if not sink_path.exists() and sink_path.suffix == ".gz": decompressed = as_path(str(sink_path)[: -len(".gz")]) if decompressed.exists(): # Update the lock product to point to the decompressed file product.sink = str(decompressed) product.hash = _hash_file(decompressed) product.size = decompressed.stat().st_size return True if not sink_path.exists(): logger.warning(f"Lock product validation failed: sink file does not exist: {sink_path}") return False if product.hash: actual_hash = _hash_file(sink_path) if actual_hash != product.hash: if mode == HashMismatchMode.STRICT: logger.warning( f"Hash mismatch for {sink_path} (strict mode): " f"expected {product.hash}, got {actual_hash}" ) return False else: logger.warning( f"Hash mismatch for {sink_path} (warn mode): " f"expected {product.hash}, got {actual_hash}. " f"Continuing with existing file." ) return True
[docs] def build_lock_product( sink: AnyPath | str, url: str, name: str = "", description: str = "", alternative_urls: list[str] | None = None, ) -> LockProduct: """Build a :class:`LockProduct` from a local or cloud file. Computes the SHA-256 hash and byte size of *sink* and packages them into a new ``LockProduct`` model. Args: sink: Path or URI to the file (local or cloud). url: Primary remote URL the file was downloaded from. name: Human-readable product name (e.g. ``'ORBIT'``). description: Free-text description of the product. alternative_urls: Optional mirror / fallback URLs. Returns: A fully-populated :class:`LockProduct`. Raises: FileNotFoundError: If *sink* does not exist. """ sink_path = as_path(str(sink)) if not sink_path.exists(): raise FileNotFoundError(f"Sink path does not exist: {sink_path}") hash_value = _hash_file(sink_path) size = sink_path.stat().st_size return LockProduct( name=name, description=description, url=url, sink=str(sink_path), hash=hash_value, size=size, alternatives=[LockProductAlternative(url=alt_url) for alt_url in (alternative_urls or [])], )
[docs] def get_lock_product_path(sink: AnyPath | str) -> AnyPath: """Return the ``_lock.json`` sidecar path for a sink file. Works for both local paths and cloud URIs — the sidecar is placed alongside the data file in the same storage backend. Args: sink: Path or URI to the downloaded product file. Returns: A path (local or cloud) pointing to the companion lock JSON (``<sink>_lock.json``). Raises: FileNotFoundError: If *sink* does not exist. """ sink_path = as_path(str(sink)) if not sink_path.exists(): raise FileNotFoundError(f"Sink path does not exist: {sink_path}") return as_path(str(sink_path) + "_lock.json")
[docs] def get_lock_product(sink: AnyPath | str) -> LockProduct | None: """Read the :class:`LockProduct` sidecar JSON for *sink*. Args: sink: Path or URI to the downloaded product file. Returns: The deserialized :class:`LockProduct`, or ``None`` if the sidecar file does not exist. """ lock_product_path = get_lock_product_path(sink) if not lock_product_path.exists(): return None with lock_product_path.open("r") as f: lock_product_data = f.read() return LockProduct.model_validate_json(lock_product_data)
[docs] def write_lock_product(lock_product: LockProduct) -> AnyPath: """Write a :class:`LockProduct` to its sidecar ``_lock.json`` file. Args: lock_product: The lock entry to persist. Returns: Path (local or cloud) to the written sidecar file. """ lock_product_path = get_lock_product_path(lock_product.sink) lock_product_path.write_text(lock_product.model_dump_json(indent=2), encoding="utf-8") return lock_product_path
# --------------------------------------------------------------------------- # Dependency lockfile (collection of LockProducts for one processing day) # ---------------------------------------------------------------------------
[docs] def get_dependency_lockfile_name( package: str, task: str, date: str | datetime.datetime, version: str | None = None, ) -> str: """Derive the canonical filename for a dependency lockfile. The filename encodes package, task, year, day-of-year, and version so that each processing day gets its own file. Station is **not** part of the identity. Args: package: Processing package name (e.g. ``'PRIDE'``). task: Processing task name (e.g. ``'PPP'``). date: Processing date as a ``datetime`` or ``'YYYY-MM-DD'`` string. version: gnss-product-management package version. Defaults to the installed version. Returns: A filename string like ``PRIDE_PPP_2025_015_0.1.0_lock.json``. Raises: ValueError: If *date* cannot be parsed. """ if version is None: version = get_package_version() if isinstance(date, str): try: date = datetime.datetime.strptime(date, "%Y-%m-%d") except Exception: try: date = datetime.datetime.fromisoformat(date) except Exception as e: raise ValueError( f"Invalid date format: {date}. Expected YYYY-MM-DD or ISO format." ) from e try: YYYY, DOY = date.strftime("%Y"), date.timetuple().tm_yday except Exception as e: raise ValueError( f"Invalid date value: {date}. Expected datetime or string in YYYY-MM-DD format." ) from e return "_".join([package, task, str(YYYY), str(DOY).zfill(3), version]) + "_lock.json"
[docs] def get_dependency_lockfile( directory: AnyPath | str, package: str, task: str, date: datetime.datetime | str, version: str | None = None, ) -> tuple[DependencyLockFile | None, AnyPath | None]: """Read a :class:`DependencyLockFile` from *directory*. Args: directory: Folder (local or cloud) containing lockfile JSON files. package: Processing package name. task: Processing task name. date: Processing date used to derive the filename. version: gnss-product-management package version. Defaults to the installed version. Returns: A ``(lockfile, path)`` tuple. If the file does not exist, ``lockfile`` is ``None`` but ``path`` is still returned so the caller knows where to write a new one. """ dir_path = as_path(str(directory)) lockfile_name = get_dependency_lockfile_name( package=package, task=task, version=version, date=date ) lockfile_path = dir_path / lockfile_name if not lockfile_path.exists(): return None, lockfile_path dep_lockfile_data = lockfile_path.read_text(encoding="utf-8") return DependencyLockFile.model_validate_json(dep_lockfile_data), lockfile_path
[docs] def write_dependency_lockfile( lockfile: DependencyLockFile, directory: AnyPath | str, update: bool = False, ) -> AnyPath: """Write a :class:`DependencyLockFile` to *directory*. Args: lockfile: The lockfile model to persist. directory: Target directory (local path or cloud URI). update: If ``True``, overwrite an existing file. Otherwise raise :class:`FileExistsError`. Returns: Path (local or cloud) to the written lockfile. Raises: FileExistsError: If the lockfile already exists and *update* is ``False``. """ dir_path = as_path(str(directory)) lockfile_name = get_dependency_lockfile_name( package=lockfile.package, task=lockfile.task, date=lockfile.date, version=lockfile.version, ) lockfile_path = dir_path / lockfile_name if lockfile_path.exists() and not update: raise FileExistsError( f"Lockfile already exists: {lockfile_path}, consider incrementing the version." ) dir_path.mkdir(parents=True, exist_ok=True) lockfile_path.write_text(lockfile.model_dump_json(indent=2), encoding="utf-8") return lockfile_path