Source code for gnss_product_management.utilities.helpers

"""Shared helper functions and sentinel types.

Contains low-level utilities used across the package:

* :func:`hash_file` — SHA-256 file hashing.
* :func:`_ensure_datetime` — date/datetime normalisation to UTC.
* :class:`_PassthroughDict` — dict that preserves ``{key}`` for missing keys.
* :func:`_listify` — coerce scalars to single-element lists.
* :func:`expand_dict_combinations` — Cartesian product of dict values.
"""

import datetime
import gzip
import hashlib
import itertools
import logging
from pathlib import Path

logger = logging.getLogger(__name__)


[docs] def hash_file(path) -> str: """Return the SHA-256 hex digest of a file. Accepts both local :class:`~pathlib.Path` and cloud :class:`~cloudpathlib.CloudPath` objects. Args: path: Filesystem or cloud path to the file to hash. Returns: A string in the form ``sha256:<hex_digest>``. """ h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1 << 16), b""): h.update(chunk) return f"sha256:{h.hexdigest()}"
[docs] def decompress_gzip(file_path: Path, dest_dir: Path | None = None) -> Path | None: """Decompress a gzip file and remove the original. Args: file_path: Path to the ``.gz`` file. dest_dir: Destination directory for the decompressed file. Defaults to the same directory as *file_path*. Returns: Path to the decompressed file, or ``None`` on failure. """ if not file_path.exists(): return None out_path = file_path.with_suffix("") if dest_dir is not None: dest_dir.mkdir(parents=True, exist_ok=True) out_path = dest_dir / out_path.name try: with gzip.open(file_path, "rb") as f_in, open(out_path, "wb") as f_out: f_out.write(f_in.read()) except (EOFError, OSError) as exc: logger.error("Failed to decompress %s: %s", file_path, exc) out_path.unlink(missing_ok=True) return None file_path.unlink(missing_ok=True) return out_path
def _ensure_datetime(date: datetime.date | datetime.datetime) -> datetime.datetime: """Coerce a date to a timezone-aware ``datetime`` (UTC). Args: date: A :class:`~datetime.date` or :class:`~datetime.datetime`. Naive datetimes are tagged as UTC. Returns: A timezone-aware :class:`~datetime.datetime` in UTC. """ if isinstance(date, datetime.date) and not isinstance(date, datetime.datetime): return datetime.datetime(date.year, date.month, date.day, tzinfo=datetime.timezone.utc) if date.tzinfo is None: return date.replace(tzinfo=datetime.timezone.utc) return date class _PassthroughDict(dict): """Dict subclass that returns ``'{key}'`` for missing keys. Used with :meth:`str.format_map` so that unresolved placeholders survive template expansion rather than raising :class:`KeyError`. """ def __missing__(self, key): return f"{{{key}}}" def _listify(v) -> list[str]: """Convert ``None`` or a single string to a list. Args: v: ``None``, a single string, or an existing list. Returns: A (possibly empty) ``list[str]``. """ if v is None: return [] return [v] if isinstance(v, str) else list(v)
[docs] def expand_dict_combinations(d: dict[str, list[str]]) -> list[dict[str, str]]: """Compute the Cartesian product of dict values. Args: d: Mapping from parameter names to lists of candidate values. Returns: A list of dicts, one per combination, with a single value per key. Example:: >>> expand_dict_combinations({"A": ["1","2"], "B": ["x","y"]}) [{"A":"1","B":"x"}, {"A":"1","B":"y"}, {"A":"2","B":"x"}, {"A":"2","B":"y"}] """ keys = list(d.keys()) vals = [d[k] for k in keys] return [dict(zip(keys, combo)) for combo in itertools.product(*vals)]