gnss-product-management API
YAML-driven GNSS product discovery, query expansion, and dependency resolution.
Client
GNSSClient — high-level entry point for searching and downloading GNSS products.
- class gnss_product_management.client.gnss_client.GNSSClient(product_registry, workspace, *, max_connections=4)[source]
Bases:
objectSingle entry point for IGS product search, download, and dependency resolution.
Wraps
ProductQuery,DownloadPipeline, andResolvePipeline. All user-facing operations go through this class.Use
from_defaults()to build from the bundled IGS center and product specs. Pass abase_dirto enable downloads and local-first resolution:# Search only (no local sink) client = GNSSClient.from_defaults() results = client.search(date, product="ORBIT", parameters={"TTT": "FIN"}) # Search and download to a local directory client = GNSSClient.from_defaults(base_dir="/data/gnss") paths = client.download(results[:1], sink_id="local") # Search and download to S3 client = GNSSClient.from_defaults(base_dir="s3://my-bucket/gnss") paths = client.download(results[:1], sink_id="local") # Full dependency resolution (orbit + clock + bias + ERP + ...) resolution, lockfile = client.resolve_dependencies("spec.yaml", date, sink_id="local")
max_connectionssets the per-host FTP/HTTPS connection pool size. CDDIS (NASA FTPS) enforces strict anonymous connection limits; keep it at 2–4 for CDDIS-heavy queries. FTP centers (COD, WUM, GFZ) generally tolerate 6–8.- Parameters:
product_registry (ProductRegistry) – Built
ProductRegistrywith loaded product and center specs.workspace (WorkSpace) –
WorkSpacewith registered local directories.max_connections (int) – Maximum concurrent connections per host (default 4).
- display()[source]
Display the client’s loaded product registry and workspace specs.
- Return type:
None
- download(results, *, sink_id)[source]
Download product candidates to the local sink.
Pass a pre-sliced list to limit how many files are fetched, e.g.
client.download(results[:1], sink_id="local").- Parameters:
results (list[FoundResource]) – Ranked
FoundResourcelist fromsearch()orProductQuery.search(). Each result must carry adate(set automatically by the query builders).sink_id (str) – Local resource identifier (alias registered in the workspace, e.g.
"local").
- Returns:
Paths to successfully downloaded (and decompressed) files.
- Return type:
- classmethod from_defaults(base_dir=None, *, local_alias='local', max_connections=4)[source]
Construct a client from the bundled default specs.
Loads the pre-built
DefaultProductEnvironmentand creates a freshWorkSpacefrom the bundled local layout specs. If base_dir is provided, all local specs are registered against that directory, enabling downloads and local-first resolution.base_dir may be a local path or a cloud storage URI supported by cloudpathlib:
# Local client = GNSSClient.from_defaults(base_dir="/data/gnss") # Amazon S3 client = GNSSClient.from_defaults(base_dir="s3://my-bucket/gnss") # Google Cloud Storage client = GNSSClient.from_defaults(base_dir="gs://my-bucket/gnss")
- Parameters:
base_dir (Path | str | None) – Root directory for local product storage. If
None, the client operates in search-only mode (no local sink).local_alias (str) – Alias for the registered local resource (default
"local"). Used as thesink_idindownload()andresolve_dependencies().max_connections (int) – Maximum concurrent connections per host. Keep at 2–4 for CDDIS; 6–8 for COD, WUM, GFZ.
- Returns:
A configured
GNSSClientinstance.- Return type:
- query()[source]
Return a fluent
ProductQuerybuilder.This is the preferred entry point for building searches. Chain calls to narrow the query, then call
ProductQuery.search()orProductQuery.download()to execute:results = ( client.query() .for_product("ORBIT") .on(date) .where(TTT="FIN") .sources("COD", "ESA") .search() )
- Returns:
A
ProductQuerybound to this client.- Return type:
- resolve_dependencies(dep_spec, date, *, sink_id)[source]
Resolve all dependencies in a spec for the given date.
Accepts a
DependencySpecobject or a path to a YAML file. Checks local disk first, then downloads any missing files. If aDependencyLockFilealready exists for(package, task, date, version), resolution returns immediately without any network calls.- Parameters:
dep_spec (DependencySpec | Path | str) – Dependency specification — a
DependencySpecinstance, or a path (strorPath) to a YAML file. The spec encodes which products are required, the center/ timeliness preference cascade, and per-product constraints.date (datetime) – Target date (timezone-aware datetime, midnight UTC).
sink_id (str) – Local resource alias for storing resolved files (e.g.
"local").
- Returns:
A
(DependencyResolution, lockfile_path)tuple.DependencyResolutionexposes.summary(),.table(),.product_paths(),.missing, and.all_required_fulfilled.- Return type:
tuple[DependencyResolution, Path | CloudPath | None]
- search(date, product, *, parameters=None, preferences=None, local_resources=None, remote_resources=None)[source]
Search for IGS products matching the given criteria.
Builds
SearchTargetobjects from the product catalog, lists remote directories via the connection pool, matches filenames by regex, and ranks results by preference and protocol.Key
parameterskeys follow the IGS long filename convention:TTT— solution timeliness:"FIN"(final, ≥13 d),"RAP"(rapid, ≤17 h),"ULT"(ultra-rapid, ≤3 h)AAA— analysis center:"COD","ESA","GFZ","WUM","IGS", …
For the full fluent interface (including date-range searches) use
query()instead.- Parameters:
date (datetime) – Target date (timezone-aware datetime).
product (str | dict) – Product name (e.g.
"ORBIT","CLOCK","BIA") or a dict withname, and optionallyversion/variant.parameters (dict | None) – Parameter constraints, e.g.
{"TTT": "FIN", "AAA": "WUM"}.preferences (list[SearchPreference] | None) – Preference cascade for ranking. Each
SearchPreferencenames a parameter and an ordered list of preferred values, e.g.SearchPreference(parameter="TTT", sorting=["FIN","RAP"]).local_resources (list[str] | None) – Restrict to these local resource aliases.
remote_resources (list[str] | None) – Restrict to these remote center IDs.
- Returns:
//) results precede remote ones; within each protocol tier results are ordered by preferences.
- Return type:
Ranked list of
FoundResourceobjects. Local (file
ProductQuery — fluent builder for GNSS product search and download.
- class gnss_product_management.client.product_query.ProductQuery(wormhole, search_planner)[source]
Bases:
objectFluent builder for constructing and executing a GNSS product search.
Constructed via
GNSSClient.query()— do not instantiate directly.Chain method calls to build the query, then call
search()ordownload()to execute:results = ( client.query("ORBIT") .on(date) .where(TTT="FIN") .sources("COD", "ESA") .prefer(TTT=["FIN", "RAP", "ULT"]) .search() ) paths = ( client.query("CLOCK") .on(date) .where(TTT="FIN") .sources("local", "COD") .download(sink_id="local") )
- Parameters:
fetcher –
WormHoleused for directory listing and file download.query_factory –
SearchPlannerused to buildSearchTargetobjects from product specs.product – Product name (e.g.
"ORBIT") or dict withname, and optionallyversion/variant.wormhole (WormHole)
search_planner (SearchPlanner)
- download(sink_id, *, limit=None)[source]
Search and download results in one call.
- Parameters:
- Returns:
Paths to successfully downloaded files.
- Raises:
ValueError – If
for_product()oron()have not been called.- Return type:
- for_product(product)[source]
Set the target product for the query.
- on(date)[source]
Set the target date for the query.
- Parameters:
date (datetime) – Timezone-aware datetime.
- Returns:
selffor chaining.- Return type:
- on_range(start, end, *, step=datetime.timedelta(days=1))[source]
Set a date range for the query.
Searches are run for every date from start to end (inclusive) with the given step (default: 1 day). Results from all dates are merged into a single flat list from
search().- Parameters:
- Returns:
selffor chaining.- Raises:
ValueError – If start is after end.
- Return type:
- prefer(**kwargs)[source]
Rank results by a preference cascade without hard-filtering.
Unlike
where(),preferkeeps all matching products in the result list but sorts them so the most-preferred appear first. The standard IGS timeliness cascade is:.prefer(TTT=["FIN", "RAP", "ULT"])
Center preference can be layered on top:
.prefer(TTT=["FIN", "RAP", "ULT"], AAA=["WUM", "COD", "GFZ"])
Multiple
prefer()calls accumulate in call order; later calls do not override earlier ones.- Parameters:
**kwargs – IGS field name → ordered list of preferred values, most preferred first.
- Returns:
selffor chaining.- Return type:
- search()[source]
Execute the query and return ranked results.
- Returns:
Ranked list of
FoundResourceobjects, best first. Local/file results precede remote ones; within each protocol tier results are ordered by preferences. Whenon_range()was used, results from all dates are returned as a flat combined list.- Raises:
ValueError – If neither
on()noron_range()has been called.- Return type:
- sources(*ids)[source]
Restrict the search to specific local or remote sources.
Pass any mix of registered local aliases and remote center IDs. Each ID is resolved at
search()time — local aliases are routed to local disk, everything else is treated as a remote center ID.Calling
sources()with no arguments is an error; omit the call entirely to search all available sources.- Parameters:
*ids (str) – One or more source identifiers.
- Returns:
selffor chaining.- Raises:
ValueError – If no IDs are provided.
- Return type:
- where(**parameters)[source]
Constrain product parameters (hard filter).
Keyword arguments use IGS long filename field codes as keys. Common constraints:
TTT="FIN"— final solutions only (≥13 days latency)TTT="RAP"— rapid solutions only (≤17 hours latency)TTT="ULT"— ultra-rapid solutions only (≤3 hours latency)AAA="WUM"— Wuhan University products onlyAAA=["WUM", "COD"]— WUM or COD only
Use
prefer()instead ofwhere()when you want to rank results without excluding alternatives.- Parameters:
**parameters – IGS field name → required value or list of values.
- Returns:
selffor chaining.- Return type:
Environments
ProductRegistry — builds the full catalog chain and manages remote resources.
Loads parameter, format, product, and resource specification YAMLs, then
builds derived catalogs (ParameterCatalog → FormatCatalog →
ProductCatalog). Also registers remote ResourceCatalog objects
and provides classify() for parsing product filenames
back into structured metadata.
- class gnss_product_management.environments.environment.LoadedSpecs(*, filename, built)[source]
Bases:
BaseModelRecord of a loaded specification file and its parsed result.
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.environments.environment.ProductRegistry[source]
Bases:
objectUnified container for the specification / factory layer.
Incrementally loads YAML specs via
add_*()methods, then callsbuild()to derive the full catalog chain:ParameterCatalog → FormatCatalog → ProductCatalog
Remote resource specs loaded via
add_resource_spec()are built intoResourceCatalogobjects that supportsource_product()andsink_product()for remote query resolution.After building,
classify()parses a product filename into structured metadata (product name, format, version, variant, parameters).- add_parameter_spec(path, id='default')[source]
Load and register a parameter specification YAML file.
- property all_queries: list[SearchTarget]
Flattened list of every search target across all remote centers.
- build()[source]
Build the full catalog chain from loaded specs.
Must be called after all
add_*()methods. Builds:ParameterCatalog→FormatCatalog→ProductCatalog→ remoteResourceCatalogobjects.- Return type:
None
- property catalogs: list[ResourceCatalog]
All registered remote resource catalogs.
- property centers: list[str]
Alias for
resource_ids.
- classify(filename, parameters=None)[source]
Parse a product filename and return its metadata.
- Parameters:
- Returns:
A dict with keys
product,format,version,variant, andparameterson match, orNoneif no product template matches.- Return type:
- display()[source]
Print a rich summary of loaded products and registered remote centers.
Prints two tables to the terminal:
Products — every product name with its versions and variants.
Remote Centers — every registered data center with its available products, protocols, and hostnames.
Requires the
richpackage (bundled as a project dependency).- Return type:
None
- get(center_id)[source]
Retrieve a remote resource catalog by center identifier.
- Parameters:
center_id (str) – Data center identifier.
- Returns:
The matching
ResourceCatalog.- Return type:
- static match_pinned_query(found, incoming)[source]
Check if a found query matches an incoming product based on pinned parameters.
- sink_product(product, resource_id, date)[source]
Resolve the remote directory/filename for uploading product.
- Parameters:
- Returns:
A
SearchTargetwith resolved paths.- Raises:
KeyError – If no matching entry exists.
- Return type:
Workspace management for local GNSS product storage.
Maps LocalResourceSpec definitions (loaded from YAML) to concrete
base directories on disk so that local resources can be queried through the
same ResourceQuery interface used for remote servers.
Base directories may be local filesystem paths (/data/gnss) or cloud
URIs (s3://bucket/prefix). Path operations are dispatched through
as_path() so that all
filesystem interactions work uniformly regardless of backend.
- class gnss_product_management.environments.workspace.RegisteredLocalResource(*, name, base_dir, spec, item_to_dir, server)[source]
Bases:
BaseModelA local resource spec that has been bound to a base directory.
base_diris stored as a URI string so that it can represent both local paths (/data/gnss) and cloud locations (s3://bucket/prefix). Use thebase_pathproperty to obtain the appropriatePathorCloudPathobject for filesystem operations.- Parameters:
- spec
The underlying local resource specification.
- Type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- spec: LocalResourceSpec
- class gnss_product_management.environments.workspace.WorkSpace[source]
Bases:
objectRegistry of local storage directories and their layout specifications.
Manages the mapping between
LocalResourceSpecdefinitions (loaded from YAML) and concrete base directories on disk or in cloud storage. Each registered spec gets aServer(protocol='file')so that local resources can be queried with the sameResourceQueryinterface used for remote servers.Also provides product-resolution methods (
source_product,sink_product,lockfile_dir,find_local_files) once bound to aProductRegistryviabind().- _registered_specs
Mapping of spec names to registered resources.
- _alias_map
Mapping of aliases to canonical spec names.
- _resource_specs
Loaded but not-yet-registered spec objects.
Usage:
ws = WorkSpace() ws.add_resource_spec('local_config.yaml') ws.register_spec(base_dir='/data/gnss', spec_ids=['local_config'], alias='local')
- add_resource_spec(path, id=None)[source]
Load a
LocalResourceSpecfrom a YAML file.- Parameters:
- Raises:
AssertionError – If path does not exist or a spec with the same name is already registered.
- Return type:
None
- bind(product_registry)[source]
Inject catalog references from a built
ProductRegistry.Must be called before using
source_product(),sink_product(), orfind_local_files().- Parameters:
product_registry (ProductRegistry) – A fully built registry with catalogs.
- Return type:
None
- display()[source]
Print a rich summary of loaded specs and registered local resources.
Prints two tables:
Loaded Specs — every spec loaded via
add_resource_spec(), with its name and product list.Registered Resources — every resource bound to a base directory via
register_spec(), with its alias(es), base directory, and the products it covers.
Requires the
richpackage (bundled as a project dependency).- Return type:
None
- find_local_files(query, date=None)[source]
Search local or cloud storage for files matching a query.
Works identically for local
Pathand cloudCloudPathbase directories.- Parameters:
query (SearchTarget) – SearchTarget with directory and filename patterns.
date (date | None) – Optional date for interpolating computed fields.
- Returns:
Sorted list of matching paths (local or cloud).
- Raises:
KeyError – If the query’s server is not registered.
- Return type:
- lockfile_dir(resource_id)[source]
Return the dependency lockfile directory for a resource.
For local resources this is a subdirectory of the base directory. For cloud resources (e.g.
s3://bucket/prefix) it is an equivalent cloud path, enabling distributed workers to share lockfile state via cloud storage.
- register(spec, base_dir, alias=None)[source]
Register a local resource specification in one step.
Convenience alternative to
add_resource_spec()+register_spec(). Accepts aLocalResourceSpecobject or a path to a YAML file.- Parameters:
- Raises:
ValueError – If base_dir overlaps with an existing registration or alias is already taken.
- Return type:
None
- register_spec(base_dir, spec_ids, alias=None)[source]
Bind loaded spec(s) to a base directory and register the result.
base_dir may be a local filesystem path or a cloud URI such as
s3://bucket/prefix. When multiple spec_ids are given they are merged into a singleLocalResourceSpec.- Parameters:
- Raises:
AssertionError – If base_dir does not exist or any spec_id has not been loaded.
ValueError – If alias is already in use or base_dir overlaps with an existing registration.
- Return type:
None
- sink_product(product, resource_id, date)[source]
Resolve the local directory for a product on a given date.
- Parameters:
- Returns:
A
SearchTargetwith resolved directory path.- Raises:
KeyError – If resource_id or product.name is not found.
- Return type:
- gnss_product_management.environments.workspace.paths_overlap(p1, p2)[source]
Check whether two paths share a common ancestor-descendant relationship.
For local paths, checks the resolved filesystem hierarchy. For cloud URIs, falls back to string prefix comparison (cloud paths have no symlinks to resolve).
Specifications
Parameter model and ParameterCatalog — replaces MetadataField + MetadataCatalog.
- class gnss_product_management.specifications.parameters.parameter.DerivationMethod(*values)[source]
-
How a parameter value is obtained.
- COMPUTED = 'computed'
- ENUM = 'enum'
- class gnss_product_management.specifications.parameters.parameter.Parameter(*, name, value=None, pattern=None, description=None, derivation=DerivationMethod.ENUM, compute=None)[source]
Bases:
BaseModelA single metadata parameter with optional regex pattern and compute function.
- Parameters:
- derivation: DerivationMethod | None
- model_config = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.parameters.parameter.ParameterCatalog(parameters)[source]
Bases:
objectRegistry of parameters with pattern defaults and computed-field support.
Replaces
MetadataCatalog. Compatible withregister_computed_fields().- computed(name, pattern=None, *, description=None)[source]
Decorator that registers a computed parameter field.
- classmethod from_yaml(yaml_path)[source]
Load parameter definitions from a meta-spec YAML file.
Does not register computed fields — call
register_computed_fields()separately after loading.- Parameters:
- Return type:
- interpolate(template, date, *, computed_only=False)[source]
Substitute
{NAME}placeholders in template.
- merge(other)[source]
Merge another catalog into this one.
Duplicate names are overwritten by other with a warning.
- Parameters:
other (ParameterCatalog) – Catalog to merge.
- Returns:
A new
ParameterCatalogwith combined parameters.- Return type:
Raw Pydantic models and registry for format specifications.
Two distinct types live here:
FormatSpec/FormatVersionSpec/FormatFieldDef/FormatSpecCollectionDescribe the shape of a file format as a reusable library entry — e.g. “RINEX v3 has these metadata fields and these filename templates”. Loaded from the
formats:section of the product spec YAML.FormatRegistryValidates and indexes a
FormatSpecCollectionagainst aParameterCatalog, providingget_format()/get_version()look-ups.
See format_spec for the product-facing catalog that resolves
format-variant bindings into Product
objects.
- class gnss_product_management.specifications.format.spec.FormatFieldDef(*, pattern=None, default=None, description=None)[source]
Bases:
BaseModelProperties of a metadata field declared inside a format version.
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.format.spec.FormatRegistry(*, formats=<factory>)[source]
Bases:
BaseModelRead-only registry of reusable format definitions.
Built from a
FormatSpecCollectionby resolving every metadata field’s default value against aParameterCatalog.- Parameters:
formats (dict[str, FormatSpec])
- formats
Mapping of format name to resolved
FormatSpec.- Type:
- classmethod build(format_spec, metadata_catalog)[source]
Build a
FormatRegistryby resolving metadata field defaults.Verifies that every metadata field referenced in a format version has either a pattern value or an entry in metadata_catalog.
- Parameters:
format_spec (FormatSpecCollection) – Raw format specification collection.
metadata_catalog (ParameterCatalog) – Global parameter catalog for field defaults.
- Returns:
A
FormatRegistrywith all fields resolved.- Raises:
AssertionError – If a field is missing both a pattern and a catalog entry.
ValueError – If a file template placeholder has no corresponding metadata field.
- Return type:
- formats: dict[str, FormatSpec]
- get_format(name)[source]
Retrieve a format by name.
- Parameters:
name (str) – Format name.
- Returns:
The matching
FormatSpec.- Raises:
KeyError – If name is not registered.
- Return type:
- get_version(format_name, version)[source]
Retrieve a specific version of a format.
- Parameters:
- Returns:
The matching
FormatVersionSpec.- Raises:
KeyError – If the format or version is not found.
- Return type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.format.spec.FormatSpec(*, description='', versions=<factory>, compression=<factory>)[source]
Bases:
BaseModelA top-level format definition (e.g. RINEX, PRODUCT, TABLE).
Contains the format description and a mapping of version →
FormatVersionSpec. Each version in turn maps variant names to filename templates and metadata field definitions.Note
This is the format-library model. For the product-facing model that binds a specific format+version+variant to parameter lists and filename templates, see
FormatVariantSpec.- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- versions: dict[str, FormatVersionSpec]
- class gnss_product_management.specifications.format.spec.FormatSpecCollection(*, formats=<factory>)[source]
Bases:
BaseModelCollection of format specifications from the
formats:YAML key.- Parameters:
formats (dict[str, FormatSpec])
- formats: dict[str, FormatSpec]
- classmethod from_yaml(path)[source]
Load from a YAML file.
Accepts two layouts:
Wrapped — a top-level
formats:key whose value is a mapping of format name →FormatSpec-compatible dict.Flat (
format_spec.yamlconvention) — format names are top-level keys; each entry hasversions → variants → {parameters, filename}which are converted to themetadata/file_templatesexpected byFormatVersionSpec.
- Parameters:
- Returns:
A
FormatSpecCollectioninstance.- Return type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.format.spec.FormatVersionSpec(*, description=None, notes=None, metadata=<factory>, file_templates=<factory>, compression=<factory>, **extra_data)[source]
Bases:
BaseModelA single version of a format (e.g. RINEX v3, PRODUCT v1).
- Parameters:
- metadata: dict[str, FormatFieldDef | None]
- model_config = {'extra': 'allow'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Format-variant specifications and FormatCatalog — resolves FormatVariantSpec → Product.
- class gnss_product_management.specifications.format.format_spec.FormatCatalog(*, formats)[source]
Bases:
CatalogResolved format catalog — maps format names to VersionCatalog[VariantCatalog[Product]].
- Parameters:
formats (dict[str, VersionCatalog[Product]])
- formats
Mapping of format name to version/variant product hierarchy.
- classmethod build(format_spec_catalog, parameter_catalog)[source]
Build concrete Products from abstract format specs and a ParameterCatalog.
- Parameters:
format_spec_catalog (FormatSpecCatalog) – The raw format spec definitions.
parameter_catalog (ParameterCatalog) – Global parameter catalog.
- Returns:
A
FormatCatalogmapping format names to resolved products.- Return type:
- formats: dict[str, VersionCatalog[Product]]
- merge(other)[source]
Merge another catalog into this one.
Duplicate entries are overwritten by other with a warning.
- Parameters:
other (FormatCatalog) – Catalog to merge.
- Returns:
A new
FormatCatalogwith combined entries.- Return type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.format.format_spec.FormatSpecCatalog(*, formats)[source]
Bases:
BaseModelCollection of raw format-variant specifications loaded from YAML.
- Parameters:
formats (dict[str, VersionCatalog[FormatVariantSpec]])
- formats: dict[str, VersionCatalog[FormatVariantSpec]]
- classmethod from_yaml(path)[source]
Load from a YAML file containing pre-built format specs.
Expected YAML layout:
FORMAT_NAME: name: FORMAT_NAME versions: "1": name: "1" variants: variant_name: name: FORMAT_NAME version: "1" variant: variant_name parameters: - name: PARAM filename: "{PARAM}..."
- Parameters:
path (Path)
- Return type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.format.format_spec.FormatVariantSpec(*, name, version=None, variant=None, parameters=<factory>, filename=None)[source]
Bases:
BaseModelA single file-format variant binding: name × version × variant → parameters + filename.
This is a resolved “leaf” entry in the format spec YAML — it names which format (e.g.
RINEX), which version ("3"), which variant (observation), and lists the parameters and filename template that together define a concrete file shape.- Parameters:
- materialize(parameter_catalog)[source]
Materialize against a ParameterCatalog to produce a Product.
- Parameters:
parameter_catalog (ParameterCatalog) – Global parameter catalog for defaults.
- Returns:
A
Productwith resolved parameters.- Return type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Core product models — PathTemplate, Product, and catalog hierarchies.
- class gnss_product_management.specifications.products.product.PathTemplate(*, pattern, value=None, description=None)[source]
Bases:
BaseModelA template pattern with
{NAME}-style placeholders, resolved viaderive().- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- to_regex(parameter_catalog)[source]
Convert the template pattern into a regex with named capture groups.
Each
{PARAM}placeholder is replaced with(?P<PARAM>pattern)using the parameter’s regex from parameter_catalog. Literal characters outside placeholders are escaped.- Parameters:
parameter_catalog (ParameterCatalog) – Catalog supplying regex patterns for each parameter name.
- Returns:
A regex string suitable for
re.fullmatch().- Return type:
- class gnss_product_management.specifications.products.product.Product(*, name, parameters, directory=None, filename=None)[source]
Bases:
BaseModelA resolved product with its parameters and file/directory templates.
- Parameters:
name (str)
directory (PathTemplate | None)
filename (PathTemplate | None)
- directory: PathTemplate | None
- filename: PathTemplate | None
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.products.product.VariantCatalog(*, variants)[source]
Bases:
BaseModel,Generic[T]Collection of named variants for a single version.
- variants
Mapping of variant name to product instance.
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.products.product.VersionCatalog(*, versions)[source]
Bases:
BaseModel,Generic[T]Collection of named versions, each containing variants.
- Parameters:
versions (dict[str, VariantCatalog])
- versions
Mapping of version name to
VariantCatalog.
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- versions: dict[str, VariantCatalog]
- gnss_product_management.specifications.products.product.infer_from_regex(regex, filename, parameters)[source]
Infer parameter values from filename using a pre-built regex.
After
derive()and the query-factory’s “fill in patterns” step, each parameter’s.valueis either a concrete literal or its regex pattern. This function reconstructs a named-group regex by replacing each parameter’s contribution with(?P<name>pattern), then matches filename and updates every parameter’s.value.- Parameters:
- Returns:
The updated parameter list on match, or
None.- Return type:
ProductSpec and ProductCatalog — resolve product specs against FormatCatalog.
- class gnss_product_management.specifications.products.catalog.ProductCatalog(*, products)[source]
Bases:
CatalogResolved product catalog — maps product names to VersionCatalog[VariantCatalog[Product]].
- Parameters:
products (dict[str, VersionCatalog[Product]])
- products
Mapping of product name to version/variant product hierarchy.
- classmethod build(product_spec_catalog, format_catalog)[source]
Build concrete Products from abstract product specs and a FormatCatalog.
- Parameters:
product_spec_catalog (ProductSpecCatalog) – Raw product spec definitions.
format_catalog (FormatCatalog) – Resolved format catalog.
- Returns:
A
ProductCatalogwith all products materialized.- Return type:
- merge(other)[source]
Merge another catalog into this one.
Duplicate entries are overwritten by other with a warning.
- Parameters:
other (ProductCatalog) – Catalog to merge.
- Returns:
A new
ProductCatalogwith combined entries.- Return type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- products: dict[str, VersionCatalog[Product]]
- class gnss_product_management.specifications.products.catalog.ProductSpec(*, name, format, version, variant, parameters=<factory>, filename=None)[source]
Bases:
BaseModelA product specification that binds to a format variant with parameter overrides.
- Parameters:
- materialize(format_catalog)[source]
Materialize against a FormatCatalog to produce a fully-merged Product.
- Parameters:
format_catalog (FormatCatalog) – Resolved format catalog.
- Returns:
A
Productwith format-level parameters overlaid by product-spec overrides.- Return type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.products.catalog.ProductSpecCatalog(*, products)[source]
Bases:
BaseModelCollection of raw product specifications loaded from YAML.
- Parameters:
products (dict[str, VersionCatalog[ProductSpec]])
- products
Mapping of product name to version/variant spec hierarchy.
- classmethod from_yaml(path)[source]
Load product specs from a YAML file, extracting the
products:section.Transforms the YAML structure into nested Pydantic models by injecting
namefrom dict keys and convertingformatslists into version/variant dicts.- Return type:
- merge(other)[source]
Merge another catalog into this one.
Duplicate entries are overwritten by other with a warning.
- Parameters:
other (ProductSpecCatalog) – Catalog to merge.
- Returns:
A new
ProductSpecCatalogwith combined entries.- Return type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- products: dict[str, VersionCatalog[ProductSpec]]
Pure Pydantic models for local storage specifications.
- class gnss_product_management.specifications.local.local.LocalCollection(*, directory, description=None, items=<factory>)[source]
Bases:
BaseModelA group of product specs sharing a directory template.
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.local.local.LocalResourceSpec(*, name='default', description=None, collections=<factory>, source_file=None)[source]
Bases:
BaseModelRoot model for a local storage layout.
A single spec maps collection names to
LocalCollectionobjects. Multiple specs can be merged viamerge()so that different YAML files (e.g. per-project or per-workflow) combine into one unified layout.- Parameters:
- collections: dict[str, LocalCollection]
- classmethod from_yaml(path)[source]
Load from a YAML file.
Accepts either a top-level
local:wrapper or a flat file whose top-level key iscollections:.- Parameters:
- Returns:
A
LocalResourceSpecinstance.- Return type:
- classmethod merge(specs)[source]
Merge multiple local storage specs into one.
Later specs override collections with the same name. Items within identically-named collections are combined (union).
- Parameters:
specs (Sequence[LocalResourceSpec]) – Sequence of specs to merge.
- Returns:
A new
LocalResourceSpecwith combined collections.- Return type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Server, ResourceSpec, SearchTarget — remote resource models (Layer 1).
- class gnss_product_management.specifications.remote.resource.ResourceProductSpec(*, id, server_id, available=True, product_name, product_version=None, description=None, parameters, directory)[source]
Bases:
BaseModelA product offering within a resource/center.
Maps a catalog product to a server with parameter overrides.
- Parameters:
- parameters
Parameter overrides (values pinned by this center).
- directory
Directory template for this product.
- directory: PathTemplate
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.remote.resource.ResourceSpec(*, id, name, description=None, website=None, servers=[], products=[])[source]
Bases:
BaseModelRoot resource specification for a data center.
- Parameters:
- servers
Server endpoints for this center.
- products
Product offerings hosted by this center.
- classmethod from_yaml(path)[source]
Load a resource specification from a YAML file.
- Parameters:
- Returns:
A
ResourceSpecinstance.- Return type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- products: list[ResourceProductSpec]
- class gnss_product_management.specifications.remote.resource.SearchTarget(*, product, server, directory)[source]
Bases:
BaseModelA single concrete query target — one combination of parameter values.
- Parameters:
product (Product)
server (Server)
directory (PathTemplate)
- product
The product being queried.
- server
The server endpoint to query.
- directory
Directory path template for the product.
- directory: PathTemplate
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.remote.resource.Server(*, id, hostname, protocol=None, auth_required=False, description=None)[source]
Bases:
BaseModelA remote or local server endpoint.
- Parameters:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
ResourceCatalog — resolve a ResourceSpec against a ProductCatalog into queryable products.
- class gnss_product_management.specifications.remote.resource_catalog.ResourceCatalog(*, id, name, description=None, website=None, servers, queries)[source]
Bases:
CatalogResolves a ResourceSpec against a ProductCatalog into queryable products.
- Parameters:
- servers
Server endpoints for this center.
- queries
Expanded
SearchTargetobjects.
- classmethod build(resource_spec, product_catalog)[source]
Build concrete queries by expanding a ResourceSpec against a ProductCatalog.
- Parameters:
resource_spec (ResourceSpec) – Raw resource specification for a data center.
product_catalog – Resolved product catalog.
- Returns:
A
ResourceCatalogcontaining all expanded queries.- Return type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- queries: list[SearchTarget]
Pure Pydantic models and result types for dependency specifications.
- class gnss_product_management.specifications.dependencies.dependencies.Dependency(*, spec, required=True, description='', constraints=<factory>)[source]
Bases:
BaseModelA single product dependency.
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.dependencies.dependencies.DependencyResolution(spec_name, resolved=<factory>)[source]
Bases:
objectAggregated resolution result for all dependencies in a spec.
- Parameters:
spec_name (str)
resolved (list[ResolvedDependency])
- resolved
List of
ResolvedDependencyresults.
- property fulfilled: list[ResolvedDependency]
Dependencies that have been resolved (not missing).
- property missing: list[ResolvedDependency]
Dependencies that could not be resolved.
- product_paths()[source]
Return a
{spec: uri}mapping for resolved local files.Values are URI strings that work for both local paths and cloud locations. Pass them through
gnss_product_management.utilities.paths.as_path()to get a path object suitable for filesystem operations.
- resolved: list[ResolvedDependency]
- class gnss_product_management.specifications.dependencies.dependencies.DependencySpec(*, name, description='', preferences=<factory>, dependencies=<factory>, package, task)[source]
Bases:
BaseModelFull dependency specification for a processing task.
- Parameters:
name (str)
description (str)
preferences (list[SearchPreference])
dependencies (list[Dependency])
package (str)
task (str)
- dependencies: list[Dependency]
- classmethod from_yaml(path)[source]
Load a dependency specification from a YAML file.
- Parameters:
- Returns:
A
DependencySpecinstance.- Return type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- preferences: list[SearchPreference]
- class gnss_product_management.specifications.dependencies.dependencies.ResolvedDependency(*, spec, required, status, local_path=None, remote_url=None)[source]
Bases:
BaseModelResolution result for one dependency.
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.specifications.dependencies.dependencies.SearchPreference(*, parameter, sorting=<factory>, description='')[source]
Bases:
BaseModelOne slot in the preference cascade.
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Base class for Layer 2 catalog objects.
All catalog classes must implement build() as a classmethod that
constructs a fully-built instance from lower-layer specifications.
- class gnss_product_management.specifications.catalog.Catalog[source]
Bases:
BaseModelAbstract base for built catalogs.
Subclasses must implement:
@classmethod def build(cls, ...) -> Self: ...
The
buildclassmethod is the only way to construct a catalog in production code. Direct__init__is still available for testing or deserialization.- abstractmethod classmethod build(*args, **kwargs)[source]
Build a concrete catalog instance from lower-layer specs.
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Factories
Public return types and exceptions for the ProductRegistry API.
- class gnss_product_management.factories.models.DiscoveryEntry(*, product, center='', quality='', source='', uri='')[source]
Bases:
BaseModelA single entry in a discovery report.
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.factories.models.DiscoveryReport(*, entries=<factory>)[source]
Bases:
BaseModelStructured summary of available products for a date.
- Parameters:
entries (list[DiscoveryEntry])
- entries: list[DiscoveryEntry]
- filter(product=None, center=None)[source]
Filter entries by product name and/or center.
- Parameters:
- Returns:
Matching
DiscoveryEntryinstances.- Return type:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.factories.models.FoundResource(*, product, source, uri, parameters=<factory>, date=None, local_path=None)[source]
Bases:
BaseModelA discovered IGS product — either a local file or a remote URI.
Returned by
GNSSClient.search()andProductQuery.search(). The most useful properties for geodetic workflows:r.center— analysis center code (AAAfield, e.g."WUM")r.quality— timeliness code (TTTfield:"FIN","RAP","ULT")r.filename— bare IGS long filenamer.uri— full remote URL (ftp://...) or local pathr.is_local—Trueif already on diskr.downloaded—Trueafter a successful download
- Parameters:
- date: datetime.datetime | None
- model_config = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(context, /)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
self (BaseModel) – The BaseModel instance.
context (Any) – The context.
- Return type:
None
- exception gnss_product_management.factories.models.MissingProductError(missing, task='')[source]
Bases:
ExceptionRaised when a required product cannot be found during resolve().
- class gnss_product_management.factories.models.Resolution(*, task, paths=<factory>, lockfile=None)[source]
Bases:
BaseModelResult of resolving all dependencies for a task.
- Parameters:
task (str)
lockfile (DependencyLockFile | None)
- lockfile: DependencyLockFile | None
- model_config = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
SearchPlanner — lazy narrowing query builder.
- class gnss_product_management.factories.search_planner.SearchPlanner(product_registry, workspace)[source]
Bases:
objectLazy search planner — narrows parameter ranges, resolves on demand.
Uses
ProductRegistryfor remote resource catalogs,WorkSpacefor local resource resolution,ProductCatalog(nested version→variant→Product hierarchy), andParameterCatalogfor fallback regex patterns and computed date-field resolution.- Parameters:
product_registry (ProductRegistry)
workspace (WorkSpace)
- _env
The product registry backing this planner.
- _workspace
Workspace with registered local resources.
Usage:
sp = SearchPlanner( product_registry=registry, workspace=workspace, ) results = sp.get( datetime.date(2024, 1, 1), product={"name": "ORBIT"}, parameters={"AAA": ["WUM", "COD"]}, )
- __init__(product_registry, workspace)[source]
Initialise the search planner.
- Parameters:
product_registry (ProductRegistry) – Built
ProductRegistrywith catalogs and remote resource catalogs ready.workspace (WorkSpace) –
WorkSpacewith registered local resources.
- static build_queries_from_planner(templates, date, query_planner, resource_selection=None)[source]
Build search queries from a given planner and resource selection.
- Parameters:
templates (list[Product]) – List of product templates to build queries from.
date (datetime) – Target date for resolving computed directory fields.
query_planner (WorkSpace | ProductRegistry) – A
WorkSpace(local) orProductRegistry(remote) withresource_idsandsource_productinterface.resource_selection (list[str] | None) – Optional list of resource IDs to restrict to.
- Returns:
A list of
SearchTargetobjects built from the planner.- Return type:
- get(date, product, parameters=None, local_resources=None, remote_resources=None)[source]
Narrow parameter ranges and return searchable resources.
- Parameters:
date (datetime) – Target date for computed metadata fields.
product (dict[str, str | list[str]]) – Product query dict with
name, optionallyversionandvariant.parameters (dict[str, str | list[str]] | None) – User constraints on metadata fields. Unset fields remain as wildcard regex patterns.
local_resources (list[str] | None) – If given, restrict to these local collection IDs.
remote_resources (list[str] | None) – If given, restrict to these remote center IDs.
- Returns:
A list of
SearchTargetobjects.- Raises:
ValueError – If the product, version, or variant is not found.
- Return type:
SourcePlanner — common interface for local and remote search planners.
- class gnss_product_management.factories.source_planner.SourcePlanner(*args, **kwargs)[source]
Bases:
ProtocolShared query-side interface for search planners.
Both
RemoteSearchPlannerandLocalSearchPlannersatisfy this protocol, allowingSearchPlannerand other consumers to treat local and remote resources uniformly.Registration (
register()) is intentionally not part of this protocol — each planner accepts different spec types at setup time.- register(spec, **args)[source]
Register a resource specification with the planner.
- Parameters:
spec – The resource specification to register.
**args – Additional keyword arguments.
- Return type:
None
- sink_product(product, resource_id, **args)[source]
Return the target path for product on resource_id.
- Parameters:
- Returns:
A
SearchTargetwith resolved path templates.- Return type:
ranking — module-level helpers for sorting SearchTarget results.
- gnss_product_management.factories.ranking.sort_by_preferences(targets, preferences)[source]
Sort search targets according to a preference cascade.
Iterates preferences in reverse order (lowest priority first) so that the highest-priority preference ends up as the primary sort key.
- Parameters:
targets (list[SearchTarget]) – SearchTarget objects to sort.
preferences (list[SearchPreference]) – Ordered list of
SearchPreferenceobjects defining the desired sort cascade.
- Returns:
Sorted list of targets.
- Return type:
- gnss_product_management.factories.ranking.sort_by_protocol(targets)[source]
Sort search targets by server protocol, preferring local/file over remote.
- Parameters:
targets (list[SearchTarget]) – SearchTarget objects to sort.
- Returns:
Sorted list of targets, with
FILE/LOCALfirst, thenFTP/FTPS, thenHTTP/HTTPS.- Return type:
WormHole — protocol-agnostic file search and download.
- class gnss_product_management.factories.remote_transport.WormHole(*, max_connections=4, product_registry=None)[source]
Bases:
objectSearch for files described by
SearchTargetobjects.For each target, lists the remote (FTP/HTTP) or local directory, matches
product.filename.patternagainst the listing, and populatesdirectory.valueandfilename.valueon the target.- Parameters:
max_connections (int)
- _connection_pool_factory
Factory managing per-host connection pools.
- _env
Optional product registry for parameter back-filling.
Usage:
targets = sp.get(date=..., product=..., parameters=...) transport = WormHole() results = transport.search(targets) for st in results: print(st.server.hostname, st.product.filename.value)
- __init__(*, max_connections=4, product_registry=None)[source]
Initialise the transport.
- Parameters:
max_connections (int) – Maximum connections per host pool.
product_registry – Optional
ProductRegistryfor parameter back-filling after a filename match.
- Return type:
None
- download_one(query, local_resource_id, local_factory, date)[source]
Synchronously download matched files for one search target.
Skips the download if the destination file already exists and is non-empty.
- Parameters:
query (SearchTarget) – The resolved search target with filename value.
local_resource_id (str) – Target local resource identifier.
local_factory (WorkSpace) – Planner for resolving local sink paths.
date (datetime) – Target date for computing sink directory.
- Returns:
Path (local or cloud) to the downloaded file, or
Noneon failure.- Return type:
Path | CloudPath | None
- search(targets)[source]
Search every target’s server/directory for matching files.
Targets are grouped by
(hostname, directory)so each unique remote directory is listed exactly once. Pattern matching for every target in the group runs against the shared listing. Thefilename.valuefield is set on each returned target.- Parameters:
targets (list[SearchTarget]) – SearchTarget objects to search.
- Returns:
A list of
SearchTargetobjects withfilename.valuealready populated — one per matched filename.- Return type:
Connection pool — thread-safe fsspec filesystem instances per host.
- class gnss_product_management.factories.connection_pool.ConnectionPool(hostname, max_connections=4)[source]
Bases:
objectThread-safe pool of fsspec filesystem instances for a single host.
- hostname
Server address (URL or local path).
- protocol
Inferred protocol (
'ftp','http','file', etc.).
- max_connections
Maximum number of concurrent connections.
- full_path(directory)[source]
Combine hostname base with a relative directory.
Behaviour differs by protocol:
fileusesos.path.join(),http/httpsconcatenates with forward slashes, andftptreats the directory as an already-absolute server path.
- get_connection()[source]
Acquire a connection from the pool.
- Yields:
An
fsspec.AbstractFileSysteminstance.- Raises:
ConnectionError – If the pool failed to initialise.
- replace_connection(dead)[source]
Swap a dead connection for a fresh one in the pool.
The caller must already hold a semaphore slot (i.e. be inside
get_connection).- Parameters:
dead (AbstractFileSystem) – The stale filesystem instance to remove and replace.
- Returns:
A fresh
fsspec.AbstractFileSystem, orNoneif reconnection fails (the pool will be one slot smaller).- Return type:
AbstractFileSystem | None
- class gnss_product_management.factories.connection_pool.ConnectionPoolFactory(max_connections=4)[source]
Bases:
objectManage per-host connection pools with a shared directory listing cache.
- Parameters:
max_connections (int)
- max_connections
Default maximum connections per host.
- __init__(max_connections=4)[source]
Initialise the factory.
- Parameters:
max_connections (int) – Default maximum connections per host pool.
- add_connection(hostname)[source]
Ensure a connection pool exists for hostname.
- Parameters:
hostname (str) – Server address to pool.
- download_file(hostname, remote_path, target_dir)[source]
Download a file from a remote or local host.
Retries once with a fresh connection on broken-pipe errors.
- Parameters:
- Returns:
Path to the downloaded file, or
Noneon failure.- Raises:
ValueError – If no pool exists for hostname.
- Return type:
Path | None
- get_connection(hostname)[source]
Acquire a connection for hostname.
- Parameters:
hostname (str) – Server address.
- Yields:
An
fsspec.AbstractFileSysteminstance.- Raises:
ValueError – If no pool exists for hostname.
- get_file_size(hostname, remote_path)[source]
Return the size of a remote file in bytes, or
Noneif unavailable.Uses
fsspec.info()which is a metadata-only call — no data is transferred. ReturnsNoneon any error so callers can treat an unknown size as “proceed with download”.
- list_directory(hostname, directory)[source]
List a remote or local directory with caching.
Results (including empty lists for failed lookups) are cached to avoid redundant network calls.
- Parameters:
- Returns:
A list of filenames.
- Raises:
ValueError – If no pool exists for hostname.
- Return type:
ResolvePipeline — Find + Download + LockfileWriter in one call.
High-level composition that resolves a DependencySpec for a
given date: finds resources, optionally downloads them, writes per-file
sidecar lockfiles, and persists an aggregate lockfile.
Fast path: if an aggregate lockfile already exists for
(package, task, date, version) the pipeline returns immediately
without searching or downloading.
- class gnss_product_management.factories.pipelines.resolve.ResolvePipeline(env, workspace, *, max_connections=4, transport=None)[source]
Bases:
objectFind → Download → Lockfile for every dependency in a spec.
Uses
ProductQuery,DownloadPipeline, andLockfileWriterinternally. All dependencies are resolved in parallel via aThreadPoolExecutor.Fast path: if an aggregate lockfile already exists for the
(package, task, date, version)identity, returns immediately without searching or downloading.- Parameters:
env (ProductRegistry) – The product registry with built catalogs.
workspace (WorkSpace) – Workspace with registered local resources.
max_connections (int) – Maximum concurrent connections per host.
transport (WormHole | None) – Optional shared
WormHoleinstance. If provided, the pipeline reuses it instead of creating a new one — useful whenGNSSClientalready holds a pool.
- run(spec, date, *, sink_id='local_config', centers=None, download=True)[source]
Resolve all dependencies in spec for date.
- Parameters:
spec (DependencySpec) – The dependency specification.
date (datetime) – Target date (timezone-aware datetime).
sink_id (str) – Local resource alias for download destination and lockfile storage.
centers (list[str] | None) – Restrict remote search to these center IDs.
download (bool) – If
True(default), download remote resources.
- Returns:
A tuple of (
DependencyResolution, lockfile path orNoneif nothing was resolved).- Return type:
tuple[DependencyResolution, Path | CloudPath | None]
DownloadPipeline — found resource → local path.
Fetches remote FoundResource objects to the local workspace
using WormHole and SearchPlanner for sink path
resolution. Writes a per-file sidecar lockfile after every successful
fetch (local or remote) so that callers can verify integrity later.
- class gnss_product_management.factories.pipelines.download.DownloadPipeline(env, workspace, *, transport=None, max_connections=4)[source]
Bases:
objectDownload
FoundResourceobjects to the local workspace.Already-local resources return immediately with their existing path. Remote resources are downloaded via
WormHole.A per-file sidecar
<filename>_lock.jsonis written alongside every successfully resolved file if one does not already exist.- Parameters:
env (ProductRegistry) – The product registry with built catalogs.
workspace (WorkSpace) – Workspace with registered local resources.
max_connections (int) – Maximum concurrent connections per host.
transport (WormHole | None) – Optional shared
WormHoleinstance.
- run(resources, date, *, sink_id='local_config')[source]
Download found resources to the workspace.
- Parameters:
resources (FoundResource | list[FoundResource]) – A single
FoundResourceor a list of them.date (datetime) – Target date for computing sink directory.
sink_id (str) – Local resource alias to download into.
- Returns:
A
Path(orNoneon failure) for a single resource, or a list of paths for multiple resources.- Return type:
LockfileWriter — write a DependencyResolution to a lockfile.
Thin wrapper around LockfileManager that converts a resolution
into a persisted, reproducible manifest.
- class gnss_product_management.factories.pipelines.lockfile_writer.LockfileWriter(lockfile_dir, *, package='PRIDE')[source]
Bases:
objectWrite a
DependencyResolutionto a JSON lockfile.- Parameters:
lockfile_dir (Path) – Directory where lockfiles are stored.
package (str) – Package name (e.g.
'PRIDE').
- write(resolution, date, *, version=None)[source]
Persist resolution as a lockfile.
- Parameters:
resolution (DependencyResolution) – The completed dependency resolution.
date (datetime) – Processing date.
version (str | None) – Package version; defaults to the installed version.
- Returns:
Path to the written lockfile.
- Return type:
Lockfile
Pydantic models for lockfile entries and dependency lockfiles.
Defines the data structures that represent a single resolved product
(LockProduct) and a collection of resolved products for one
processing day (DependencyLockFile). These models are
serialized to JSON sidecar files on disk.
- class gnss_product_management.lockfile.models.DependencyLockFile(*, date, package, task, version, requires_date=True, timestamp=<factory>, products=<factory>, metadata=None)[source]
Bases:
BaseModelTop-level lockfile: a fully-resolved, reproducible product manifest.
The lockfile is date-scoped — one lockfile per processing day. Identity key:
(task, date, version)where version is thegnss-product-managementpackage version.- Parameters:
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- products: list[LockProduct]
- class gnss_product_management.lockfile.models.LockProduct(*, name, description='', timestamp=<factory>, url, hash='', size=None, sink='', alternatives=<factory>)[source]
Bases:
BaseModelA single resolved product entry in the lockfile.
- Parameters:
- alternatives: list[LockProductAlternative]
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gnss_product_management.lockfile.models.LockProductAlternative(*, url)[source]
Bases:
BaseModelAn alternative (mirror / fallback) source for a locked product.
- Parameters:
url (str)
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Lockfile manager — single facade for all lockfile operations.
Provides a unified interface for checking, loading, saving, and
sharing dependency lockfiles. All callers (ResolvePipeline,
PrideProcessor, CLI) should go through LockfileManager
rather than calling operations functions directly.
lockfile_dir may be a local filesystem path or a cloud URI
(s3://bucket/prefix). All reads and writes are dispatched through
CloudPath / Path so the
manager is storage-agnostic.
- class gnss_product_management.lockfile.manager.LockfileManager(lockfile_dir)[source]
Bases:
objectFacade for dependency lockfile lifecycle.
- _dir
Directory where aggregate lockfiles are stored.
- Parameters:
lockfile_dir (AnyPath | str) – Directory for aggregate lockfile storage. May be a local
Path, a URI string, or aCloudPath(e.g.s3://bucket/locks).
- build_aggregate(products, package, task, date, version=None)[source]
Build a
DependencyLockFilefrom per-file sidecar products.- Parameters:
- Returns:
A new
DependencyLockFileready to be saved.- Return type:
- exists(package, task, date, version=None)[source]
Check if a lockfile exists for the given identity.
- export_lockfile(package, task, date, version=None)[source]
Return the path to the aggregate lockfile for sharing.
- import_lockfile(path, strict=False)[source]
Import a lockfile from another machine or cloud location.
Validates each product’s hash. In warn mode (default), mismatches are logged but products are kept. In strict mode, invalid products are removed so the caller can re-resolve them.
- Parameters:
- Returns:
The imported (and possibly pruned) lockfile.
- Return type:
- load(package, task, date, version=None)[source]
Load an existing lockfile, or
None.- Parameters:
- Return type:
DependencyLockFile | None
- static lockfile_name(package, task, date, version=None)[source]
Return the canonical lockfile filename for the given identity.
- lockfile_path(package, task, date, version=None)[source]
Return the expected path for a lockfile (may not exist yet).
- save(lockfile)[source]
Write (or overwrite) an aggregate lockfile.
- Returns:
Path (local or cloud) to the written file.
- Parameters:
lockfile (DependencyLockFile)
- Return type:
Path | CloudPath
Read, write, build, and validate lockfile entries.
Provides pure-function helpers for the lockfile lifecycle:
Validate — verify that a lock-product’s sink file still exists and that its hash matches.
Build — construct a
LockProductfrom a local or cloud file, computing its SHA-256 hash and byte size.Read / Write — serialize and deserialize individual
LockProductsidecar JSON files and date-scopedDependencyLockFilemanifests.
All path arguments accept local Path objects and cloud
URIs / CloudPath objects interchangeably via
as_path().
- class gnss_product_management.lockfile.operations.HashMismatchMode(*values)[source]
Bases:
EnumHow to handle hash mismatches during lockfile validation.
- STRICT = 'strict'
- WARN = 'warn'
- gnss_product_management.lockfile.operations.build_lock_product(sink, url, name='', description='', alternative_urls=None)[source]
Build a
LockProductfrom a local or cloud file.Computes the SHA-256 hash and byte size of sink and packages them into a new
LockProductmodel.- Parameters:
sink (Path | CloudPath | str) – Path or URI to the file (local or cloud).
url (str) – Primary remote URL the file was downloaded from.
name (str) – Human-readable product name (e.g.
'ORBIT').description (str) – Free-text description of the product.
alternative_urls (list[str] | None) – Optional mirror / fallback URLs.
- Returns:
A fully-populated
LockProduct.- Raises:
FileNotFoundError – If sink does not exist.
- Return type:
- gnss_product_management.lockfile.operations.get_dependency_lockfile(directory, package, task, date, version=None)[source]
Read a
DependencyLockFilefrom directory.- Parameters:
directory (Path | CloudPath | str) – Folder (local or cloud) containing lockfile JSON files.
package (str) – Processing package name.
task (str) – Processing task name.
date (datetime | str) – Processing date used to derive the filename.
version (str | None) – gnss-product-management package version. Defaults to the installed version.
- Returns:
A
(lockfile, path)tuple. If the file does not exist,lockfileisNonebutpathis still returned so the caller knows where to write a new one.- Return type:
tuple[DependencyLockFile | None, Path | CloudPath | None]
- gnss_product_management.lockfile.operations.get_dependency_lockfile_name(package, task, date, version=None)[source]
Derive the canonical filename for a dependency lockfile.
The filename encodes package, task, year, day-of-year, and version so that each processing day gets its own file. Station is not part of the identity.
- Parameters:
- Returns:
A filename string like
PRIDE_PPP_2025_015_0.1.0_lock.json.- Raises:
ValueError – If date cannot be parsed.
- Return type:
- gnss_product_management.lockfile.operations.get_lock_product(sink)[source]
Read the
LockProductsidecar JSON for sink.- Parameters:
sink (Path | CloudPath | str) – Path or URI to the downloaded product file.
- Returns:
The deserialized
LockProduct, orNoneif the sidecar file does not exist.- Return type:
LockProduct | None
- gnss_product_management.lockfile.operations.get_lock_product_path(sink)[source]
Return the
_lock.jsonsidecar path for a sink file.Works for both local paths and cloud URIs — the sidecar is placed alongside the data file in the same storage backend.
- Parameters:
sink (Path | CloudPath | str) – Path or URI to the downloaded product file.
- Returns:
A path (local or cloud) pointing to the companion lock JSON (
<sink>_lock.json).- Raises:
FileNotFoundError – If sink does not exist.
- Return type:
Path | CloudPath
- gnss_product_management.lockfile.operations.get_package_version()[source]
Return the installed gnss-product-management version.
- Return type:
- gnss_product_management.lockfile.operations.validate_lock_product(product, mode=HashMismatchMode.WARN)[source]
Check that a lock-product’s sink file exists and its hash matches.
Also accepts the decompressed version of a
.gzsink (e.g.foo.SP3when the lock recordsfoo.SP3.gz).- Parameters:
product (LockProduct) – The lock-product entry to validate.
mode (HashMismatchMode) – How to handle hash mismatches.
WARNlogs a warning but returnsTrue.STRICTreturnsFalseso the caller can re-download.
- Returns:
Trueif the sink file (or its decompressed counterpart) exists and (when a hash is recorded) the file’s current SHA-256 matches the stored hash (or mode is WARN).- Return type:
- gnss_product_management.lockfile.operations.write_dependency_lockfile(lockfile, directory, update=False)[source]
Write a
DependencyLockFileto directory.- Parameters:
lockfile (DependencyLockFile) – The lockfile model to persist.
directory (Path | CloudPath | str) – Target directory (local path or cloud URI).
update (bool) – If
True, overwrite an existing file. Otherwise raiseFileExistsError.
- Returns:
Path (local or cloud) to the written lockfile.
- Raises:
FileExistsError – If the lockfile already exists and update is
False.- Return type:
Path | CloudPath
- gnss_product_management.lockfile.operations.write_lock_product(lock_product)[source]
Write a
LockProductto its sidecar_lock.jsonfile.- Parameters:
lock_product (LockProduct) – The lock entry to persist.
- Returns:
Path (local or cloud) to the written sidecar file.
- Return type:
Path | CloudPath
Utilities
Path utilities — unified local/cloud path construction.
as_path is the single entry point for converting URI strings into the
appropriate path object. Callers never need to import cloudpathlib
directly; this module dispatches to CloudPath for
s3://, gs://, and az:// URIs and falls back to
Path for everything else.
- gnss_product_management.utilities.paths.as_path(uri)[source]
Return a
PathorCloudPath.Detects the scheme from uri and dispatches accordingly:
s3://,gs://,az://→CloudPathanything else (including bare POSIX paths) →
Path
- Parameters:
uri (str | Path | CloudPath) – A URI string, local path string,
Path, orCloudPath.- Returns:
The appropriate path object for the given URI.
- Return type:
Path | CloudPath
Examples:
>>> as_path("/data/gnss") PosixPath('/data/gnss') >>> as_path("s3://my-bucket/gnss-data") S3Path('s3://my-bucket/gnss-data') >>> as_path(Path("/data/gnss")) PosixPath('/data/gnss')
Computed metadata field registrations for a ParameterCatalog.
Defines date-to-metadata transformations (DDD, GPSWEEK, YYYY,
REFFRAME, etc.) and register_computed_fields() which wires
them onto a catalog so that parameter values can be derived from a
processing date at query time.
Usage:
from gnss_product_management.specifications.parameters.parameter import ParameterCatalog
from gnss_product_management.utilities.metadata_funcs import register_computed_fields
cat = ParameterCatalog.from_yaml("meta_spec.yaml")
register_computed_fields(cat)
- class gnss_product_management.utilities.metadata_funcs.IGSAntexReferenceFrameType(*values)[source]
Bases:
EnumIGS ANTEX reference frame identifiers.
Maps each IGS reference frame to its canonical lowercase string used in ANTEX filenames (e.g.
igs20.atx).- IGS05 = 'igs05'
- IGS08 = 'igs08'
- IGS14 = 'igs14'
- IGS20 = 'igs20'
- IGSR3 = 'igsR3'
- gnss_product_management.utilities.metadata_funcs.register_computed_fields(registry)[source]
Register all date-derived computed fields onto registry.
Iterates over the built-in
_COMPUTED_FIELDSlist and callsregistry.computed()for each one, wiring the pure date transformation functions into the catalog.- Parameters:
registry – A
ParameterCataloginstance to extend.- Return type:
None
Shared helper functions and sentinel types.
Contains low-level utilities used across the package:
hash_file()— SHA-256 file hashing._ensure_datetime()— date/datetime normalisation to UTC._PassthroughDict— dict that preserves{key}for missing keys._listify()— coerce scalars to single-element lists.expand_dict_combinations()— Cartesian product of dict values.
- gnss_product_management.utilities.helpers.decompress_gzip(file_path, dest_dir=None)[source]
Decompress a gzip file and remove the original.
- gnss_product_management.utilities.helpers.expand_dict_combinations(d)[source]
Compute the Cartesian product of dict values.
- Parameters:
d (dict[str, list[str]]) – Mapping from parameter names to lists of candidate values.
- Returns:
A list of dicts, one per combination, with a single value per key.
- Return type:
Example:
>>> expand_dict_combinations({"A": ["1","2"], "B": ["x","y"]}) [{"A":"1","B":"x"}, {"A":"1","B":"y"}, {"A":"2","B":"x"}, {"A":"2","B":"y"}]