"""
RINEX utility functions.
Extract timestamps and time ranges from RINEX observation files,
and merge RINEX 2 broadcast ephemerides into RINEX 3 BRDM format.
"""
import logging
from datetime import datetime
from pathlib import Path
from typing import IO
logger = logging.getLogger(__name__)
def _header_get_time(line: str) -> datetime:
"""Parse a timestamp from a RINEX header line containing ``GPS``.
Expects the format produced by the ``TIME OF FIRST OBS`` /
``TIME OF LAST OBS`` header records::
2025 1 15 0 0 0.0000000 GPS
Everything before ``GPS`` is split on whitespace and interpreted as
``YYYY MM DD HH MI SS.sss``.
Args:
line: A single RINEX header line.
Returns:
Parsed UTC datetime (fractional seconds truncated to integer).
"""
time_values = line.split("GPS")[0].strip().split()
return datetime(
year=int(time_values[0]),
month=int(time_values[1]),
day=int(time_values[2]),
hour=int(time_values[3]),
minute=int(time_values[4]),
second=int(float(time_values[5])),
)
[docs]
def epoch_get_time(line: str) -> datetime:
"""Extract the epoch timestamp from a RINEX 2 observation record.
Assumes a 2-digit year (added to 2000). The line is whitespace-split
and the first six tokens are interpreted as
``YY MM DD HH MI SS.sss``.
Args:
line: A single RINEX observation epoch line.
Returns:
Parsed UTC datetime.
"""
date_line = line.strip().split()
return datetime(
year=2000 + int(date_line[0]),
month=int(date_line[1]),
day=int(date_line[2]),
hour=int(date_line[3]),
minute=int(date_line[4]),
second=int(float(date_line[5])),
)
[docs]
def rinex_get_time_range(source: str | Path) -> tuple[datetime, datetime]:
"""
Extract the time range from a RINEX observation file.
Parameters
----------
source : str | Path
Path to the RINEX observation file.
Returns
-------
Tuple[datetime, datetime]
Start and end timestamps.
Raises
------
ValueError
If the time range cannot be extracted.
"""
timestamp_data_start = None
timestamp_data_end = None
with open(source) as f:
files = f.readlines()
for line in files:
if timestamp_data_start is None:
if "TIME OF FIRST OBS" in line:
start_time = _header_get_time(line)
timestamp_data_start = start_time
timestamp_data_end = start_time
year = str(timestamp_data_start.year)[2:]
break
if timestamp_data_start is not None:
if line.strip().startswith(year):
try:
current_date = epoch_get_time(line)
if current_date and current_date > timestamp_data_start:
timestamp_data_end = current_date
except Exception:
pass
if timestamp_data_start is not None and timestamp_data_end == timestamp_data_start:
timestamp_data_end = datetime(
year=timestamp_data_start.year,
month=timestamp_data_start.month,
day=timestamp_data_start.day,
hour=23,
minute=59,
second=59,
microsecond=999999,
)
if timestamp_data_start is None or timestamp_data_end is None:
logger.error("Failed to extract time range from %s", source)
raise ValueError(f"Failed to extract time range from {source}")
return timestamp_data_start, timestamp_data_end
# ---------------------------------------------------------------------------
# Broadcast navigation merge (RINEX 2 → RINEX 3 BRDM)
# ---------------------------------------------------------------------------
def _write_brdn(file: Path, prefix: str, fm: IO) -> None:
"""Write GPS broadcast ephemeris records from a RINEX 2 ``.n`` file.
Args:
file: Path to the RINEX 2 GPS broadcast file.
prefix: Constellation prefix character (``'G'``).
fm: Open file handle for the merged output.
"""
try:
with open(file) as fn:
lines = fn.readlines()
except Exception as e:
logger.error("Unable to open or read file %s: %s", file, e)
return
in_header = True
i = 1
while i < len(lines):
try:
if not in_header:
line = lines[i].replace("D", "e")
prn = int(line[0:2])
yyyy = int(line[3:5]) + 2000
mm = int(line[6:8])
dd = int(line[9:11])
hh = int(line[12:14])
mi = int(line[15:17])
ss = round(float(line[18:22]))
num2 = float(line[22:41])
num3 = float(line[41:60])
num4 = float(line[60:79])
fm.write(
f"{prefix}{prn:02d} {yyyy:04d} {mm:02d} {dd:02d}"
f" {hh:02d} {mi:02d} {ss:02d}"
f" {num2:.12e} {num3:.12e} {num4:.12e}\n"
)
for t in range(1, 4):
line = lines[i + t].replace("D", "e")
num1 = float(line[3:22])
num2 = float(line[22:41])
num3 = float(line[41:60])
num4 = float(line[60:79])
fm.write(f" {num1:.12e} {num2:.12e} {num3:.12e} {num4:.12e}\n")
line = lines[i + 7].replace("D", "e")
num1 = float(line[3:22])
num2 = float(line[22:41])
fm.write(f" {num1:.12e} {num2:.12e}\n")
i += 8
if i >= len(lines):
break
else:
if "PGM / RUN BY / DATE" == lines[i][60:79]:
fm.write(lines[i])
if "LEAP SECONDS" == lines[i][60:72]:
fm.write(lines[i])
if "END OF HEADER" == lines[i][60:73]:
in_header = False
fm.write(lines[i])
i += 1
except Exception as e:
logger.error("Error at line %d of file %s: %s", i, file, e)
break
def _write_brdg(file: Path, prefix: str, fm: IO) -> None:
"""Write GLONASS broadcast ephemeris records from a RINEX 2 ``.g`` file.
Args:
file: Path to the RINEX 2 GLONASS broadcast file.
prefix: Constellation prefix character (``'R'``).
fm: Open file handle for the merged output.
"""
try:
with open(file) as fg:
lines = fg.readlines()
except Exception as e:
logger.error("Unable to open or read file %s: %s", file, e)
return
in_header = True
i = 1
while i < len(lines):
try:
if not in_header:
line = lines[i].replace("D", "e")
prn = int(line[0:2])
yyyy = int(line[3:5]) + 2000
mm = int(line[6:8])
dd = int(line[9:11])
hh = int(line[12:14])
mi = int(line[15:17])
ss = round(float(line[18:22]))
num2 = float(line[22:41])
num3 = float(line[41:60])
num4 = float(line[60:79])
fm.write(
f"R{prn:02d} {yyyy:04d} {mm:02d} {dd:02d}"
f" {hh:02d} {mi:02d} {int(ss):02d}"
f"{num2: .12e}{num3: .12e}{num4: .12e}\n"
)
for t in range(1, 4):
line = lines[i + t].replace("D", "e")
num1 = float(line[3:22])
num2 = float(line[22:41])
num3 = float(line[41:60])
num4 = float(line[60:79])
fm.write(f" {num1: .12e}{num2: .12e}{num3: .12e}{num4: .12e}\n")
i += 4
if i >= len(lines):
break
else:
if "END OF HEADER" == lines[i][60:73]:
in_header = False
i += 1
except Exception as e:
logger.error("Error at line %d of file %s: %s", i, file, e)
break
[docs]
def merge_broadcast_files(
brdn: Path,
brdg: Path,
output_folder: Path,
) -> Path | None:
"""Merge GPS and GLONASS RINEX 2 broadcast files into a RINEX 3 BRDM file.
Both input files must have matching day-of-year (characters 4–6) and
year (characters 9–10) in their filenames — e.g. ``brdn0200.25n`` and
``brdg0200.25g``. A mismatch causes the function to return ``None``
with an error log.
Inspired by ``PrideLab/PRIDE-PPPAR/scripts/merge2brdm.py``.
Args:
brdn: Path to the GPS broadcast ephemeris (``.n``) file.
brdg: Path to the GLONASS broadcast ephemeris (``.g``) file.
output_folder: Directory where the merged BRDM file will be written.
Returns:
Path to the merged BRDM file, or ``None`` on failure.
"""
logger.info("Merging %s and %s into a single BRDM file.", brdn, brdg)
ddd = brdn.name[4:7]
yy = brdn.name[9:11]
if brdg.name[4:7] != ddd or brdg.name[9:11] != yy:
logger.error("Inconsistent file names: %s vs %s (DOY or year mismatch)", brdn, brdg)
return None
brdm = output_folder / f"brdm{ddd}0.{yy}p"
with open(brdm, "w") as fm:
fm.write(
" 3.04 NAVIGATION DATA M (Mixed) RINEX VERSION / TYPE\n"
)
_write_brdn(brdn, "G", fm)
_write_brdg(brdg, "R", fm)
if brdm.exists():
logger.info("Files merged into %s", brdm)
return brdm
return None