"""Standardisation functions for AMOC observing array datasets.
These functions take raw loaded datasets and:
- Rename variables to standard names
- Add variable-level metadata
- Add or update global attributes
- Prepare datasets for downstream analysis
Currently implemented:
- SAMBA
"""
import xarray as xr
from collections import OrderedDict
import re
import warnings
from datetime import datetime, timezone
from amocatlas import logger, utilities, defaults, contributors
from amocatlas.logger import log_debug
log = logger.log # Use the global logger
# Extracted from OG1.0 spec “## Global attributes” (cf. turn1view0) :contentReference[oaicite:0]{index=0}
_GLOBAL_ATTR_ORDER = defaults.GLOBAL_ATTR_ORDER
# Institution corrections are now handled in contributors.py
[docs]
def normalize_and_add_vocabulary(
attrs: dict, normalizations: dict[str, tuple[dict[str, str], str]]
) -> dict:
"""For each (attr, (value_map, vocab_url)) in `normalizations`.
- If `attr` exists in attrs:
* Map attrs[attr] using value_map (or leave it if unmapped)
* Add attrs[f"{attr}_vocabulary"] = vocab_url
Parameters
----------
attrs : dict
Metadata attributes, already cleaned & renamed.
normalizations : dict
Keys are canonical attr names (e.g. "platform"), values are
(value_map, vocabulary_url) tuples.
Returns
-------
dict
attrs with normalized values and added <attr>_vocabulary entries.
"""
for attr, (value_map, vocab_url) in normalizations.items():
if attr in attrs:
raw = attrs[attr]
mapped = value_map.get(raw, raw)
if mapped != raw:
log_debug("Normalized '%s': %r → %r", attr, raw, mapped)
attrs[attr] = mapped
vocab_key = f"{attr}_vocabulary"
# only set if not already present
if vocab_key not in attrs:
attrs[vocab_key] = vocab_url
log_debug("Added vocabulary for '%s': %s", attr, vocab_url)
return attrs
[docs]
def get_dynamic_version() -> str:
"""Get the actual software version using multiple detection methods.
Priority:
1. Git describe (for development in git repo)
2. Installed package version (for pip/conda installs)
3. Fallback to __version__ file
Returns
-------
str
Software version string
"""
import subprocess
import os
# Method 1: Try git describe for development versions
try:
# Get the directory of this file to find git repo root
current_dir = os.path.dirname(os.path.abspath(__file__))
repo_root = os.path.dirname(current_dir) # Go up from amocatlas/ to repo root
result = subprocess.run(
["git", "describe", "--tags", "--dirty", "--always"],
cwd=repo_root,
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
git_version = result.stdout.strip()
# Strip everything after vX.X.X pattern (remove commit info and dirty flag)
version_match = re.match(r"(v?\d+\.\d+\.\d+)", git_version)
if version_match:
clean_version = version_match.group(1)
log_debug(
f"Using cleaned git version: {clean_version} (from {git_version})"
)
return clean_version
log_debug(f"Using git version: {git_version}")
return git_version
except (subprocess.SubprocessError, FileNotFoundError, OSError):
# Git not available or repository not found
pass
# Method 2: Try installed package version
try:
import importlib.metadata
installed_version = importlib.metadata.version("amocatlas")
except (importlib.metadata.PackageNotFoundError, ImportError):
pass
else:
log_debug(f"Using installed package version: {installed_version}")
return installed_version
# Method 3: Fallback to __version__ file
from amocatlas._version import __version__
log_debug(f"Using fallback version from __version__: {__version__}")
return __version__
def _standardize_role_names(role_string: str, role_map: dict) -> str:
"""Standardize individual role names in a comma-separated role string.
Args:
role_string: Comma-separated string of role names
role_map: Dictionary mapping role names to standard NERC G04 terms
Returns:
Standardized comma-separated role string
"""
if not role_string or not role_string.strip():
return role_string
roles = [role.strip() for role in role_string.split(",")]
standardized_roles = []
for role in roles:
if not role: # Skip empty roles
standardized_roles.append("")
continue
# Apply role mapping if available
standardized_role = role_map.get(role, role)
standardized_roles.append(standardized_role)
return ", ".join(standardized_roles)
def _consolidate_contributors(cleaned: dict) -> dict:
"""Consolidate creators, PIs, publishers, and contributors into unified fields.
Uses the new modular contributor functions from contributors.py for enhanced
ORCID lookup and name standardization.
These include:
- contributor_name, contributor_role, contributor_email, contributor_id aligned one-to-one
- contributing_institutions, with placeholders for vocabularies/roles
"""
log_debug("Starting _consolidate_contributors with attrs: %s", cleaned)
role_map = defaults.CONTRIBUTOR_ROLE_MAP
# Step A: Extract and consolidate contributor fields using new modular approach
# Collect all contributor-related fields from various sources
raw_names = []
raw_roles = []
raw_emails = []
raw_ids = []
# Process each contributor category into dictionaries, then combine
all_contributors = {}
current_index = 1
# Phase 1: Process role-specific fields first (creator, PI, publisher)
role_priority_fields = ["creator_name", "principal_investigator", "publisher_name"]
for key in role_priority_fields:
if key in cleaned:
name_raw = cleaned.pop(key, "")
email_raw = cleaned.pop(key.replace("_name", "_email"), "")
url_raw = cleaned.pop(key.replace("_name", "_url"), "")
role_value = role_map.get(key, "")
# Process this category using modular functions
category_dict = contributors.parse_contributors(
name_raw, url_raw, email_raw, role_value
)
# Add to combined dict with sequential indices
for contributor in category_dict.values():
if contributor["name"]: # Only add non-empty names
all_contributors[str(current_index)] = contributor
current_index += 1
# Phase 2: Process any remaining role-mapped fields (except contributor_name)
for key in list(cleaned.keys()):
if key in role_map and key != "contributor_name":
raw = cleaned.pop(key)
role_value = role_map[key]
# Process as single-role category
category_dict = contributors.parse_contributors(raw, "", "", role_value)
# Add to combined dict with sequential indices
for contributor in category_dict.values():
if contributor["name"]:
all_contributors[str(current_index)] = contributor
current_index += 1
# Phase 3: Process contributor_name and related fields LAST
explicit_contributor_role = cleaned.pop("contributor_role", "")
if "contributor_name" in cleaned:
name_raw = cleaned.pop("contributor_name", "")
email_raw = cleaned.pop("contributor_email", "")
id_raw = cleaned.pop("contributor_id", "")
# Use explicit role if available, otherwise use mapped empty role
role_value = (
_standardize_role_names(explicit_contributor_role, role_map)
if explicit_contributor_role.strip()
else role_map["contributor_name"]
)
# Process this category using modular functions
category_dict = contributors.parse_contributors(
name_raw, id_raw, email_raw, role_value
)
# Add to combined dict with sequential indices
for contributor in category_dict.values():
if contributor["name"]:
all_contributors[str(current_index)] = contributor
current_index += 1
# Extract any remaining URLs/emails that weren't processed
remaining_emails = []
remaining_ids = []
for key in list(cleaned.keys()):
if key.endswith("_email"):
raw = cleaned.pop(key)
if raw and str(raw).strip():
parts = [
v.strip()
for v in str(raw).replace(";", ",").split(",")
if v.strip()
]
remaining_emails.extend(parts)
for key in list(cleaned.keys()):
if key in ("contributor_url", "creator_url", "publisher_url"):
raw = cleaned.pop(key)
if raw and str(raw).strip():
parts = [
v.strip()
for v in str(raw).replace(";", ",").split(",")
if v.strip()
]
remaining_ids.extend(parts)
# Convert back to the arrays format for the existing modular processing
if all_contributors:
raw_names = [c["name"] for c in all_contributors.values()]
raw_roles = [c["role"] for c in all_contributors.values()]
raw_emails = [c["email"] for c in all_contributors.values()]
raw_ids = [c["id"] for c in all_contributors.values()]
# Add any remaining emails/IDs that didn't get associated
while len(raw_emails) < len(raw_names):
raw_emails.append("")
while len(raw_ids) < len(raw_names):
raw_ids.append("")
# Note: remaining_emails and remaining_ids are deliberately not extended here
# to avoid misalignment. All emails/IDs should be properly associated during
# the three-phase processing above.
else:
raw_names = []
raw_roles = []
raw_emails = remaining_emails
raw_ids = remaining_ids
# Pad lists to same length for processing
max_contributors = max(
len(raw_names), len(raw_roles), len(raw_emails), len(raw_ids)
)
# Only proceed if we have any contributor information
if max_contributors > 0:
# Pad shorter lists with empty strings
while len(raw_names) < max_contributors:
raw_names.append("")
while len(raw_roles) < max_contributors:
raw_roles.append("")
while len(raw_emails) < max_contributors:
raw_emails.append("")
while len(raw_ids) < max_contributors:
raw_ids.append("")
# Convert to comma-separated strings for processing
names_str = ", ".join(raw_names)
roles_str = ", ".join(raw_roles)
emails_str = ", ".join(raw_emails)
ids_str = ", ".join(raw_ids)
log_debug(
"Raw contributor data - Names: %r, Roles: %r, Emails: %r, IDs: %r",
names_str,
roles_str,
emails_str,
ids_str,
)
# Use the new modular contributor processing
try:
processed = contributors.process_contributor_metadata(
names_str, ids_str, emails_str, roles_str
)
# Update cleaned dictionary with processed results
cleaned.update(processed)
# Add NERC G04 vocabulary URL if we have contributor roles
if "contributor_role" in cleaned and cleaned["contributor_role"]:
cleaned["contributor_role_vocabulary"] = (
"https://vocab.nerc.ac.uk/collection/G04/current/"
)
log_debug("Processed contributor metadata: %s", processed)
except (ValueError, KeyError, TypeError, AttributeError) as e:
log_debug(f"Error in contributor processing: {e}, using fallback")
# Fallback to basic concatenation if modular processing fails
cleaned["contributor_name"] = names_str
cleaned["contributor_role"] = roles_str
cleaned["contributor_email"] = emails_str
cleaned["contributor_id"] = ids_str
# Add vocabulary URL in fallback case too
if roles_str:
cleaned["contributor_role_vocabulary"] = (
"https://vocab.nerc.ac.uk/collection/G04/current/"
)
# Step B: consolidate institution keys using new modular approach
# Collect all institution-related fields from various sources
raw_institutions = []
raw_vocabularies = []
raw_roles = []
# Extract institution names from various fields
for attr_key in list(cleaned.keys()):
if attr_key.lower() in (
"institution",
"publisher_institution",
"contributor_institution",
"contributing_institutions",
):
raw_inst = cleaned.pop(attr_key)
if raw_inst and str(raw_inst).strip():
raw_institutions.append(str(raw_inst).strip())
# Extract vocabulary URLs from vocabulary fields
for attr_key in list(cleaned.keys()):
if attr_key.lower() in (
"contributing_institutions_vocabulary",
"institution_vocabulary",
"publisher_institution_vocabulary",
):
raw_vocab = cleaned.pop(attr_key)
if raw_vocab and str(raw_vocab).strip():
raw_vocabularies.append(str(raw_vocab).strip())
# Extract roles from role fields
for attr_key in list(cleaned.keys()):
if attr_key.lower() in (
"contributing_institutions_role",
"institution_role",
"publisher_institution_role",
):
raw_role = cleaned.pop(attr_key)
if raw_role and str(raw_role).strip():
raw_roles.append(str(raw_role).strip())
# Convert to comma-separated strings for modular processing
institutions_str = ", ".join(raw_institutions) if raw_institutions else ""
vocabularies_str = ", ".join(raw_vocabularies) if raw_vocabularies else ""
roles_str = ", ".join(raw_roles) if raw_roles else ""
log_debug(
"Raw institution data - Institutions: %r, Vocabularies: %r, Roles: %r",
institutions_str,
vocabularies_str,
roles_str,
)
# Use the new modular institution processing (includes corrections and registry lookup)
try:
processed = contributors.process_institution_metadata(
institutions_str, vocabularies_str, roles_str
)
# Update cleaned dictionary with processed results
cleaned.update(processed)
log_debug("Processed institution metadata: %s", processed)
except (ValueError, KeyError, TypeError, AttributeError) as e:
log_debug(f"Error in institution processing: {e}, using fallback")
# Fallback to basic values if modular processing fails
cleaned["contributing_institutions"] = institutions_str
cleaned["contributing_institutions_vocabulary"] = vocabularies_str
cleaned["contributing_institutions_role"] = roles_str
log_debug("Finished _consolidate_contributors: %s", cleaned)
return cleaned
[docs]
def standardize_time_coordinate(ds: xr.Dataset) -> xr.Dataset:
"""Standardize TIME coordinate to comply with AMOCatlas specifications.
All datasets with a TIME coordinate should have standardized attributes:
- data type: datetime64[ns]
- long_name: "Time elapsed since 1970-01-01T00:00:00Z"
- standard_name: "time"
- calendar: "gregorian"
- units: "seconds since 1970-01-01T00:00:00Z"
- vocabulary: "http://vocab.nerc.ac.uk/collection/OG1/current/TIME/"
Parameters
----------
ds : xr.Dataset
Dataset to standardize TIME coordinate for.
Returns
-------
xr.Dataset
Dataset with standardized TIME coordinate attributes.
"""
if "TIME" not in ds.coords and "TIME" not in ds.dims:
return ds
# Ensure TIME is a coordinate
if "TIME" in ds.dims and "TIME" not in ds.coords:
# If TIME is only a dimension, create a coordinate
log_debug("TIME dimension found without coordinate - creating coordinate")
if "TIME" in ds.data_vars:
# If there's a TIME data variable, promote it to coordinate
ds = ds.set_coords("TIME")
else:
# Create a simple index coordinate
ds = ds.assign_coords(TIME=range(ds.sizes["TIME"]))
time_coord = ds["TIME"]
# Convert to datetime64[ns] if not already
if time_coord.dtype.kind != "M": # Not datetime64 type
log_debug(
f"Converting TIME coordinate from {time_coord.dtype} to datetime64[ns]"
)
if time_coord.dtype.kind in ["f", "i"]: # numeric type (seconds since epoch)
# Convert numeric time to datetime64[ns]
import pandas as pd
try:
# Handle different epoch references - assume 1970-01-01 if no units specified
units = time_coord.attrs.get(
"units", "seconds since 1970-01-01T00:00:00Z"
)
if "since" in units.lower():
# Parse the units and convert
time_datetime = pd.to_datetime(
time_coord.values,
unit="s",
origin="1970-01-01",
errors="coerce",
)
else:
# Assume seconds since 1970-01-01
time_datetime = pd.to_datetime(
time_coord.values,
unit="s",
origin="1970-01-01",
errors="coerce",
)
ds["TIME"] = ("TIME", time_datetime.astype("datetime64[ns]"))
except (
ValueError,
TypeError,
OverflowError,
pd.errors.OutOfBoundsDatetime,
) as e:
log_debug(f"Failed to convert numeric TIME to datetime64[ns]: {e}")
# Keep original values but warn
ds["TIME"] = time_coord
else:
log_debug(f"Unknown TIME coordinate dtype: {time_coord.dtype}")
# Keep original values
ds["TIME"] = time_coord
elif time_coord.dtype != "datetime64[ns]":
# Convert datetime64 to nanosecond precision
log_debug("Converting datetime64 TIME coordinate to nanosecond precision")
import pandas as pd
time_datetime = pd.to_datetime(time_coord.values, errors="coerce").astype(
"datetime64[ns]"
)
ds["TIME"] = ("TIME", time_datetime)
# Set standard TIME coordinate attributes for datetime64 format
standard_time_attrs = {
"long_name": "Time",
"standard_name": "time",
"calendar": "gregorian",
"units": "datetime64[ns]", # Use datetime64 units for clarity
"vocabulary": "http://vocab.nerc.ac.uk/collection/P01/current/ELTMEP01/",
}
# Note: 'units' attribute not needed for datetime64 coordinates per CF conventions
# Update TIME coordinate attributes
ds["TIME"].attrs.update(standard_time_attrs)
log_debug("Standardized TIME coordinate attributes")
return ds
[docs]
def standardize_longitude_coordinate(ds: xr.Dataset) -> xr.Dataset:
"""Standardize LONGITUDE coordinate to comply with AMOCatlas specifications.
All datasets with a LONGITUDE coordinate should have standardized attributes:
- data type: double
- long_name: "longitude east (WGS84)"
- standard_name: "longitude"
- units: "degree_east"
Parameters
----------
ds : xr.Dataset
Dataset to standardize LONGITUDE coordinate for.
Returns
-------
xr.Dataset
Dataset with standardized LONGITUDE coordinate attributes.
"""
if "LONGITUDE" not in ds.coords and "LONGITUDE" not in ds.dims:
return ds
# Ensure LONGITUDE is a coordinate
if "LONGITUDE" in ds.dims and "LONGITUDE" not in ds.coords:
log_debug("LONGITUDE dimension found without coordinate - creating coordinate")
if "LONGITUDE" in ds.data_vars:
ds = ds.set_coords("LONGITUDE")
else:
ds = ds.assign_coords(LONGITUDE=range(ds.sizes["LONGITUDE"]))
# Convert to double precision if not already
if ds["LONGITUDE"].dtype != "float64":
log_debug(
f"Converting LONGITUDE coordinate from {ds['LONGITUDE'].dtype} to float64"
)
ds["LONGITUDE"] = ds["LONGITUDE"].astype("float64")
# Set standard LONGITUDE coordinate attributes
standard_lon_attrs = {
"long_name": "Longitude",
"description": "Longitude east (WGS84)",
"standard_name": "longitude",
"units": defaults.PREFERRED_UNITS["longitude"],
}
ds["LONGITUDE"].attrs.update(standard_lon_attrs)
log_debug("Standardized LONGITUDE coordinate attributes")
return ds
[docs]
def standardize_latitude_coordinate(ds: xr.Dataset) -> xr.Dataset:
"""Standardize LATITUDE coordinate to comply with AMOCatlas specifications.
All datasets with a LATITUDE coordinate should have standardized attributes:
- data type: double
- long_name: "Latitude north (WGS84)"
- standard_name: "latitude"
- units: "degree_north"
Parameters
----------
ds : xr.Dataset
Dataset to standardize LATITUDE coordinate for.
Returns
-------
xr.Dataset
Dataset with standardized LATITUDE coordinate attributes.
"""
if "LATITUDE" not in ds.coords and "LATITUDE" not in ds.dims:
return ds
# Ensure LATITUDE is a coordinate
if "LATITUDE" in ds.dims and "LATITUDE" not in ds.coords:
log_debug("LATITUDE dimension found without coordinate - creating coordinate")
if "LATITUDE" in ds.data_vars:
ds = ds.set_coords("LATITUDE")
else:
ds = ds.assign_coords(LATITUDE=range(ds.sizes["LATITUDE"]))
# Convert to double precision if not already
if ds["LATITUDE"].dtype != "float64":
log_debug(
f"Converting LATITUDE coordinate from {ds['LATITUDE'].dtype} to float64"
)
ds["LATITUDE"] = ds["LATITUDE"].astype("float64")
# Set standard LATITUDE coordinate attributes
standard_lat_attrs = {
"long_name": "Latitude",
"description": "Latitude north (WGS84)",
"standard_name": "latitude",
"units": defaults.PREFERRED_UNITS["latitude"],
}
ds["LATITUDE"].attrs.update(standard_lat_attrs)
log_debug("Standardized LATITUDE coordinate attributes")
return ds
[docs]
def standardize_depth_coordinate(ds: xr.Dataset) -> xr.Dataset:
"""Standardize DEPTH coordinate to comply with AMOCatlas specifications.
All datasets with a DEPTH coordinate should have standardized attributes:
- data type: double
- long_name: "Depth below surface of the water"
- standard_name: "depth"
- units: "meters"
Parameters
----------
ds : xr.Dataset
Dataset to standardize DEPTH coordinate for.
Returns
-------
xr.Dataset
Dataset with standardized DEPTH coordinate attributes.
"""
if "DEPTH" not in ds.coords and "DEPTH" not in ds.dims:
return ds
# Ensure DEPTH is a coordinate
if "DEPTH" in ds.dims and "DEPTH" not in ds.coords:
log_debug("DEPTH dimension found without coordinate - creating coordinate")
if "DEPTH" in ds.data_vars:
ds = ds.set_coords("DEPTH")
else:
ds = ds.assign_coords(DEPTH=range(ds.sizes["DEPTH"]))
# Convert to double precision if not already
if ds["DEPTH"].dtype != "float64":
log_debug(f"Converting DEPTH coordinate from {ds['DEPTH'].dtype} to float64")
ds["DEPTH"] = ds["DEPTH"].astype("float64")
# Set standard DEPTH coordinate attributes
standard_depth_attrs = {
"long_name": "Depth",
"description": " Depth below surface of the water",
"standard_name": "depth",
"units": defaults.PREFERRED_UNITS["length"],
}
ds["DEPTH"].attrs.update(standard_depth_attrs)
log_debug("Standardized DEPTH coordinate attributes")
return ds
[docs]
def standardize_sigma0_coordinate(ds: xr.Dataset) -> xr.Dataset:
"""Standardize SIGMA0 coordinate to comply with AMOCatlas specifications.
All datasets with a SIGMA0 coordinate should have standardized attributes:
- data type: double
- long_name: "Potential density anomaly to 1000 kg/m3, surface reference"
- standard_name: "sea_water_sigma_theta"
- units: "kg m-3"
Parameters
----------
ds : xr.Dataset
Dataset to standardize SIGMA0 coordinate for.
Returns
-------
xr.Dataset
Dataset with standardized SIGMA0 coordinate attributes.
"""
if "SIGMA0" not in ds.coords and "SIGMA0" not in ds.dims:
return ds
# Ensure SIGMA0 is a coordinate
if "SIGMA0" in ds.dims and "SIGMA0" not in ds.coords:
log_debug("SIGMA0 dimension found without coordinate - creating coordinate")
if "SIGMA0" in ds.data_vars:
ds = ds.set_coords("SIGMA0")
else:
ds = ds.assign_coords(SIGMA0=range(ds.sizes["SIGMA0"]))
# Convert to double precision if not already
if ds["SIGMA0"].dtype != "float64":
log_debug(f"Converting SIGMA0 coordinate from {ds['SIGMA0'].dtype} to float64")
ds["SIGMA0"] = ds["SIGMA0"].astype("float64")
# Set standard SIGMA0 coordinate attributes
standard_sigma0_attrs = {
"long_name": "Sigma0",
"description": "Potential density anomaly to 1000 kg/m3, surface reference",
"standard_name": "sea_water_sigma_theta",
"units": defaults.PREFERRED_UNITS["density"],
}
ds["SIGMA0"].attrs.update(standard_sigma0_attrs)
log_debug("Standardized SIGMA0 coordinate attributes")
return ds
[docs]
def standardize_units(ds: xr.Dataset) -> xr.Dataset:
"""Standardize variable units throughout the dataset.
Uses the comprehensive unit mapping from utilities module.
Parameters
----------
ds : xr.Dataset
Dataset to standardize units for.
Returns
-------
xr.Dataset
Dataset with standardized variable units.
"""
from .utilities import standardize_dataset_units
return standardize_dataset_units(ds, log_changes=True)
[docs]
def standardise_samba(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise SAMBA array dataset to consistent format.
.. deprecated::
This function is deprecated. Use :func:`standardise_data` instead.
Parameters
----------
ds : xr.Dataset
Raw SAMBA dataset to standardise.
file_name : str
Original filename for metadata.
Returns
-------
xr.Dataset
Standardised dataset with consistent metadata and formatting.
"""
warnings.warn(
"standardise_samba() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_data(ds, file_name)
[docs]
def standardise_rapid(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise RAPID array dataset to consistent format.
.. deprecated::
This function is deprecated. Use :func:`standardise_data` instead.
Parameters
----------
ds : xr.Dataset
Raw RAPID dataset to standardise.
file_name : str
Original filename for metadata.
Returns
-------
xr.Dataset
Standardised dataset with consistent metadata and formatting.
"""
warnings.warn(
"standardise_rapid() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_data(ds, file_name)
[docs]
def standardise_move(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise MOVE array dataset to consistent format.
Parameters
----------
ds : xr.Dataset
Raw MOVE dataset to standardise.
file_name : str
Original filename for metadata.
Returns
-------
xr.Dataset
Standardised dataset with consistent metadata and formatting.
"""
warnings.warn(
"standardise_move() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_array(ds, file_name)
[docs]
def standardise_osnap(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise OSNAP array dataset to consistent format."""
warnings.warn(
"standardise_osnap() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_array(ds, file_name)
[docs]
def standardise_fw2015(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise FW2015 array dataset to consistent format."""
warnings.warn(
"standardise_move() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_array(ds, file_name)
[docs]
def standardise_mocha(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise MOCHA array dataset to consistent format."""
warnings.warn(
"standardise_mocha() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_array(ds, file_name)
[docs]
def standardise_41n(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise 41N array dataset to consistent format."""
warnings.warn(
"standardise_41n() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_array(ds, file_name)
[docs]
def standardise_dso(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise DSO array dataset to consistent format."""
warnings.warn(
"standardise_dso() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_array(ds, file_name)
[docs]
def standardise_calafat2025(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise CALAFAT2025 array dataset to consistent format."""
warnings.warn(
"standardise_calafat2025() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_array(ds, file_name)
[docs]
def standardise_zheng2024(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise ZHENG2024 array dataset to consistent format."""
warnings.warn(
"standardise_zheng2024() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_array(ds, file_name)
[docs]
def standardise_47n(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise 47N array dataset to a consistent format.
Parameters
----------
ds : xr.Dataset
Raw 47N array dataset to standardise.
file_name : str
Original filename associated with the dataset, used for metadata.
Returns
-------
xr.Dataset
Standardised dataset with consistent metadata and formatting for the 47N array.
"""
warnings.warn(
"standardise_47n() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_array(ds, file_name)
[docs]
def standardise_fbc(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise FBC array dataset to consistent format."""
warnings.warn(
"standardise_fbc() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_array(ds, file_name)
[docs]
def standardise_arcticgateway(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise Arctic Gateway array dataset to consistent format."""
warnings.warn(
"standardise_arcticgateway() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_array(ds, file_name)
[docs]
def standardise_data(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise a dataset using YAML-based metadata.
Parameters
----------
ds : xr.Dataset
Raw dataset loaded from a reader with amocatlas_datasource metadata.
file_name : str
Filename (e.g., 'moc_transports.nc') expected to match ds.attrs["source_file"].
Returns
-------
xr.Dataset
Standardised dataset with renamed variables and enriched metadata.
Raises
------
ValueError
If file_name does not match ds.attrs["source_file"].
ValueError
If amocatlas_datasource is not found in dataset metadata.
"""
# 1) Validate source_file matches
src = ds.attrs.get("source_file")
if src and src != file_name:
raise ValueError(f"file_name {file_name!r} ≠ ds.attrs['source_file'] {src!r}")
# 2) Get datasource ID from dataset metadata
datasource_id = ds.attrs.get("processing_datasource")
if not datasource_id:
raise ValueError("Dataset missing required 'processing_datasource' metadata")
log_debug(f"Standardising {file_name} for {datasource_id.upper()}")
# 3) Collect new attrs from YAML
meta = utilities.load_array_metadata(datasource_id)
file_meta = meta["files"].get(file_name, {})
# 3.5) Remove unwanted variables if specified
variables_to_remove = file_meta.get("variables_to_remove", [])
if variables_to_remove:
# Handle case where YAML parser returns a string instead of list
if isinstance(variables_to_remove, str):
variables_to_remove = variables_to_remove.split()
log_debug(f"Converted string to list: {variables_to_remove}")
vars_removed = []
for var_name in variables_to_remove:
if var_name in ds.variables:
ds = ds.drop_vars(var_name)
vars_removed.append(var_name)
log_debug(f"Removed variable '{var_name}' from dataset")
else:
log_debug(f"Variable '{var_name}' not found for removal")
if vars_removed:
log_debug(f"Removed {len(vars_removed)} variables: {vars_removed}")
# Apply per-variable metadata BEFORE renaming (metadata refers to original variable names)
var_meta = file_meta.get("original_variable_metadata", {})
for var_name, attrs in var_meta.items():
if var_name in ds.variables:
ds[var_name].attrs.update(attrs)
log_debug(f"Applied metadata to original variable '{var_name}'")
# Rename variables and track what was actually renamed
# Prefer dataset's variable_mapping (which may have sanitized names) over YAML
rename_dict = ds.attrs.get(
"variable_mapping", file_meta.get("variable_mapping", {})
)
applied_mapping = {}
if rename_dict:
# Only rename variables that actually exist and need renaming
valid_renames = {
old: new
for old, new in rename_dict.items()
if old in ds.variables and old != new
}
if valid_renames:
ds = ds.rename(valid_renames)
applied_mapping.update(valid_renames)
log_debug("Applied variable renaming: %s", valid_renames)
# For variables that couldn't be renamed (case mismatch, etc.),
# try to find them with case-insensitive matching and track pass-through
failed_renames = {
old: new
for old, new in rename_dict.items()
if old not in ds.variables and old != new
}
if failed_renames:
log_debug("Failed to find exact matches for renaming: %s", failed_renames)
# Try case-insensitive matching for pass-through tracking
ds_vars_lower = {var.lower(): var for var in ds.variables}
for orig_name, std_name in failed_renames.items():
orig_lower = orig_name.lower()
if orig_lower in ds_vars_lower:
actual_var = ds_vars_lower[orig_lower]
# Track as pass-through: actual_name -> actual_name (no rename occurred)
applied_mapping[actual_var] = actual_var
log_debug(
"Pass-through (case mismatch): %s (expected %s -> %s)",
actual_var,
orig_name,
std_name,
)
# Track coordinates that were successfully renamed
coord_renames = {
old: new
for old, new in rename_dict.items()
if old in ds.coords and old != new
}
if coord_renames:
applied_mapping.update(coord_renames)
# Always track applied mapping (even if empty) for consistent reporting
if applied_mapping:
ds.attrs["applied_variable_mapping"] = applied_mapping
log_debug(
"Total applied mapping (renames + pass-throughs): %s", applied_mapping
)
else:
log_debug("No variable_mapping found or applied for %s", file_name)
# Handle convert_to_coord directive
convert_to_coord = file_meta.get("convert_to_coord")
if convert_to_coord:
# Check if this variable was renamed - look for the mapped name
target_var = convert_to_coord
if rename_dict and convert_to_coord in rename_dict:
target_var = rename_dict[convert_to_coord]
log_debug(
f"Using mapped variable name '{target_var}' for convert_to_coord (was '{convert_to_coord}')"
)
if target_var in ds.data_vars:
log_debug(f"Converting variable '{target_var}' to coordinate")
# Get the variable data and attributes
var_data = ds[target_var]
var_attrs = var_data.attrs.copy()
# Remove the variable from data_vars and add as coordinate
ds = ds.drop_vars(target_var)
ds = ds.assign_coords({target_var: var_data})
# Restore attributes
ds[target_var].attrs.update(var_attrs)
log_debug(f"Successfully converted '{target_var}' to coordinate")
else:
log_debug(
f"Variable '{target_var}' not found in dataset for coordinate conversion"
)
# Variable metadata was already applied before renaming (lines 1125-1130)
# Special handling for heat transport unit conversion (W to PW)
# Convert any remapped variable with units="W" and standard_name containing "northward_ocean_heat_transport"
for var_name in ds.variables:
var_attrs = ds[var_name].attrs
if var_attrs.get(
"units"
) == "W" and "northward_ocean_heat_transport" in var_attrs.get(
"standard_name", ""
):
log_debug(f"Converting heat transport variable '{var_name}' from W to PW")
# Convert data from watts to petawatts (divide by 10^15)
ds[var_name] = ds[var_name] / 1e15
# Update units attribute
ds[var_name].attrs["units"] = "PW"
# If any attributes are blank or value 'n/a', remove them
for var_name, attrs in list(var_meta.items()):
if var_name in ds.variables:
for attr_key, attr_value in attrs.items():
if attr_value in ("", "n/a"):
ds[var_name].attrs.pop(attr_key, None)
log_debug(
"Removed blank attribute '%s' from variable '%s'",
attr_key,
var_name,
)
# Remove any empty attributes from the dataset
for attr_key, attr_value in list(
ds.attrs.items()
): # Iterate over a copy of the items
if attr_value in ("", "n/a"):
ds.attrs.pop(attr_key, None)
log_debug("Removed blank attribute '%s' from dataset", attr_key)
# 3) Merge existing attrs + new global attrs + file-specific with conflict tracking
combined = {}
# Start with original file metadata (highest priority base)
for key, value in ds.attrs.items():
combined[key] = value
# Add array-level YAML metadata with conflict resolution
array_metadata = meta.get("metadata", {})
for key, value in array_metadata.items():
if key in combined:
resolved_value = resolve_metadata_conflict(
key, combined[key], value, "original file", "array-level YAML"
)
combined[key] = resolved_value
else:
combined[key] = value
# Add special mappings
special_mappings = {
"summary": meta["metadata"].get("description", ""),
"weblink": meta["metadata"].get("weblink", ""),
}
for key, value in special_mappings.items():
if value and key in combined:
resolved_value = resolve_metadata_conflict(
key, combined[key], value, "existing", "array-level YAML mapping"
)
combined[key] = resolved_value
elif value:
combined[key] = value
# Add file-specific metadata with conflict resolution
file_specific_fields = ("acknowledgment", "data_product", "citation")
for key in file_specific_fields:
if key in file_meta:
if key in combined:
resolved_value = resolve_metadata_conflict(
key, combined[key], file_meta[key], "existing", "file-specific YAML"
)
combined[key] = resolved_value
else:
combined[key] = file_meta[key]
# 4) Apply field renaming first, then overwrites, then contributor processing
# This ensures overwrites can target renamed fields while preventing institutional contamination
# 4.1) First do field renaming (normalize whitespace and merge aliases)
combined = utilities.normalize_whitespace(combined)
merged_attrs = merge_metadata_aliases(combined, defaults.METADATA_KEY_MAPPINGS)
# 4.2) Then apply overwrite directives to renamed fields
all_yaml_metadata = {}
all_yaml_metadata.update(meta.get("metadata", {})) # array-level
all_yaml_metadata.update(file_meta) # file-level
overwrite_applied = {}
overwrite_keys_to_remove = []
for key, value in all_yaml_metadata.items():
if key.endswith("_overwrite"):
# Extract the base key name (remove _overwrite suffix)
base_key = key[:-10] # Remove "_overwrite" (10 characters)
# Force overwrite the attribute even if it already exists
merged_attrs[base_key] = value
overwrite_applied[base_key] = value
overwrite_keys_to_remove.append(key) # Mark for cleanup
log_debug(
f"Applied overwrite: '{base_key}' = '{str(value)[:50]}{'...' if len(str(value)) > 50 else ''}'"
)
if overwrite_applied:
log_debug(
f"Applied {len(overwrite_applied)} metadata overrides: {list(overwrite_applied.keys())}"
)
# 4.3) Now do contributor consolidation on the renamed and overwritten fields
cleaned = _consolidate_contributors(merged_attrs)
# Remove _overwrite fields from cleaned metadata to prevent them from appearing in final dataset
for key in overwrite_keys_to_remove:
cleaned.pop(key, None)
log_debug(f"Removed processing directive: '{key}'")
# 5) Standardize date formats and add processing metadata
def standardize_date_format(date_string: str) -> str:
"""Standardize date to ISO 8601 format with Z timezone: YYYY-MM-DDTHH:MM:SSZ
Handles various input formats and converts to UTC with Z suffix.
"""
if not date_string or date_string.strip() == "":
return date_string
date_str = str(date_string).strip()
# If already in correct format, return as-is
if date_str.endswith("Z") and "T" in date_str and len(date_str) == 20:
return date_str
# Common date formats to try parsing
formats_to_try = [
"%Y-%m-%dT%H:%M:%SZ", # Already correct
"%Y-%m-%dT%H:%M:%S", # Missing Z
"%Y-%m-%d %H:%M:%S", # Space instead of T
"%Y-%m-%d", # Date only
"%Y-%m-%dT%H:%M:%S.%fZ", # With microseconds and Z
"%Y-%m-%dT%H:%M:%S.%f", # With microseconds, no Z
"%d-%m-%Y", # European format
"%m/%d/%Y", # US format
"%Y%m%d", # Compact format
]
for fmt in formats_to_try:
try:
dt = datetime.strptime(date_str, fmt)
# Return in standard format with Z
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
except ValueError:
continue
# If no format worked, return original
log_debug(f"Could not parse date format: {date_str}")
return date_string
def standardize_license_format(license_string: str) -> str:
"""Standardize license to SPDX identifier format where possible.
Converts common Creative Commons license variations to standard SPDX identifiers.
"""
if not license_string or license_string.strip() == "":
return license_string
license_str = str(license_string).strip()
# Creative Commons Attribution 4.0 variations
cc_by_4_patterns = [
"Creative Commons Attribution 4.0 International (CC BY 4.0)",
"Creative Commons Attribution 4.0 International",
"CC BY 4.0",
"CC-BY 4.0",
"CCBY4.0",
"CC BY-4.0",
]
# Check for CC BY 4.0 variations (case insensitive)
license_lower = license_str.lower()
if any(pattern.lower() in license_lower for pattern in cc_by_4_patterns):
if (
"cc" in license_lower
and "by" in license_lower
and "4.0" in license_lower
):
return "CC-BY-4.0"
# Other common licenses (can be extended)
license_mappings = {
"ODC-By": "ODC-BY", # Open Data Commons Attribution
"odc-by": "ODC-BY",
"MIT": "MIT",
"Apache-2.0": "Apache-2.0",
"BSD-3-Clause": "BSD-3-Clause",
}
# Check exact matches first
if license_str in license_mappings:
return license_mappings[license_str]
# Check case-insensitive matches
for original, standardized in license_mappings.items():
if license_str.lower() == original.lower():
return standardized
# If no standardization found, return original
return license_str
# Standardize date fields in metadata
date_fields = [
"date_created",
"date_modified",
"date_issued",
"date_metadata_modified",
]
for field in date_fields:
if field in cleaned and cleaned[field]:
cleaned[field] = standardize_date_format(cleaned[field])
# Standardize license field
if "license" in cleaned and cleaned["license"]:
cleaned["license"] = standardize_license_format(cleaned["license"])
def sanitize_source_path(path_string: str) -> str:
"""Sanitize source paths to remove specific user directory structures.
Replaces hardcoded paths with generic equivalents for portability.
Only affects display - other users will see their full paths unchanged.
"""
if not path_string or path_string.strip() == "":
return path_string
path_str = str(path_string).strip()
# Replace specific user path with generic equivalent
# This will only match for the specific user, others see full paths
specific_path = "/Users/eddifying/Cloudfree/github/"
if specific_path in path_str:
sanitized = path_str.replace(specific_path, "~/")
log_debug(f"Sanitized source path: {path_str} → {sanitized}")
return sanitized
return path_str
# Sanitize path fields in metadata
path_fields = ["source_path", "source_file"]
for field in path_fields:
if field in cleaned and cleaned[field]:
cleaned[field] = sanitize_source_path(cleaned[field])
# Remove old comment-based processing info if it exists
if "comment" in cleaned:
comment = cleaned["comment"]
if (
"Dataset accessed and processed via http://github.com/AMOCcommunity/amocatlas"
in comment
):
# Remove this text from comment, keeping other parts
cleaned_comment = comment.replace(
"Dataset accessed and processed via http://github.com/AMOCcommunity/amocatlas",
"",
).strip()
if cleaned_comment:
cleaned["comment"] = cleaned_comment
else:
cleaned.pop("comment", None)
# Add proper processing metadata
cleaned["processing_software"] = "http://github.com/AMOCcommunity/amocatlas"
cleaned["processing_version"] = get_dynamic_version()
cleaned["date_modified"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# Remove internal metadata fields from final dataset
cleaned.pop("amocatlas_datasource", None)
# Ensure Conventions includes OceanSITES-1.5
if "Conventions" in cleaned:
conventions = cleaned["Conventions"]
if "OceanSITES-1.5" not in conventions:
# Append OceanSITES-1.5 to existing conventions
cleaned["Conventions"] = f"{conventions}, OceanSITES-1.5"
else:
# Set default conventions if none exist
cleaned["Conventions"] = "CF-1.8, ACDD-1.3, OceanSITES-1.5"
# 6) Normalize and add vocabularies
normalizations = defaults.PLATFORM_NORMALIZATIONS
cleaned = normalize_and_add_vocabulary(cleaned, normalizations)
# 7) Standardize coordinate attributes
ds = standardize_time_coordinate(ds)
ds = standardize_longitude_coordinate(ds)
ds = standardize_latitude_coordinate(ds)
ds = standardize_depth_coordinate(ds)
ds = standardize_sigma0_coordinate(ds)
# 8) Standardize units
ds = standardize_units(ds)
# 9) Apply cleaned metadata and reorder according to canonical order
ds.attrs = cleaned
ds.attrs = reorder_metadata(ds.attrs)
# 10) Apply unit standardization again after metadata processing
# This ensures units are not overwritten by YAML metadata operations
ds = standardize_units(ds)
# ds = utilities.safe_update_attrs(ds, cleaned, overwrite=False)
return ds
[docs]
def standardise_array(ds: xr.Dataset, file_name: str) -> xr.Dataset:
"""Standardise a mooring array dataset using YAML-based metadata.
.. deprecated::
This function is deprecated. Use :func:`standardise_data` instead.
Parameters
----------
ds : xr.Dataset
Raw dataset loaded from a reader with amocatlas_datasource metadata.
file_name : str
Filename (e.g., 'moc_transports.nc') expected to match ds.attrs["source_file"].
Returns
-------
xr.Dataset
Standardised dataset with renamed variables and enriched metadata.
"""
warnings.warn(
"standardise_array() is deprecated and will be removed in a future version. "
"Use standardise_data() instead.",
DeprecationWarning,
stacklevel=2,
)
return standardise_data(ds, file_name)