"""SAMBA array data reader for AMOCatlas.
This module provides functions to read and process data from the SAMBA
(South Atlantic Meridional Overturning Circulation) observing array
located at 34.5°S.
"""
from pathlib import Path
from typing import Union
import pandas as pd
import xarray as xr
from amocatlas import logger, utilities
from amocatlas.logger import log_error, log_info, log_warning, log_debug
from amocatlas.utilities import apply_defaults, sanitize_variable_name
from amocatlas.reader_utils import ReaderUtils
log = logger.log # Use the global logger
# Datasource identifier for automatic standardization
DATASOURCE_ID = "samba34s"
# Default file list
SAMBA_DEFAULT_FILES = [
"Upper_Abyssal_Transport_Anomalies.txt",
"MOC_TotalAnomaly_and_constituents.asc",
]
SAMBA_TRANSPORT_FILES = [
"Upper_Abyssal_Transport_Anomalies.txt",
"MOC_TotalAnomaly_and_constituents.asc",
]
# Mapping of filenames to remote URLs
SAMBA_FILE_URLS = {
"Upper_Abyssal_Transport_Anomalies.txt": "ftp://ftp.aoml.noaa.gov/phod/pub/SAM/2020_Kersale_etal_ScienceAdvances/Upper_Abyssal_Transport_Anomalies.txt",
"MOC_TotalAnomaly_and_constituents.asc": "https://www.aoml.noaa.gov/phod/SAMOC_international/documents/MOC_TotalAnomaly_and_constituents.asc",
}
# Global metadata for SAMBA
SAMBA_METADATA = {
"description": "SAMBA 34S transport estimates dataset",
"project": "South Atlantic MOC Basin-wide Array (SAMBA)",
"weblink": "https://www.aoml.noaa.gov/phod/SAMOC_international/",
"comment": "Dataset accessed and processed via http://github.com/AMOCcommunity/amocatlas",
"acknowledgement": "SAMBA data were collected and made freely available by the SAMOC international project and contributing national programs.",
# Add DOI here when available
}
# File-specific metadata placeholders
SAMBA_FILE_METADATA = {
"Upper_Abyssal_Transport_Anomalies.txt": {
"data_product": "Daily volume transport anomaly estimates for the upper and abyssal cells of the MOC",
"acknowledgement": "M. Kersalé et al., Highly variable upper and abyssal overturning cells in the South Atlantic. Sci. Adv. 6, eaba7573 (2020). DOI: 10.1126/sciadv.aba7573",
},
"MOC_TotalAnomaly_and_constituents.asc": {
"data_product": "Daily travel time values, calibrated to a nominal pressure of 1000 dbar, and bottom pressures from the two PIES/CPIES moorings",
"acknowledgement": "Meinen, C. S., Speich, S., Piola, A. R., Ansorge, I., Campos, E., Kersalé, M., et al. (2018). Meridional overturning circulation transport variability at 34.5°S during 2009–2017: Baroclinic and barotropic flows and the dueling influence of the boundaries. Geophysical Research Letters, 45, 4180–4188. https://doi.org/10.1029/2018GL077408",
},
}
[docs]
@apply_defaults(None, SAMBA_DEFAULT_FILES)
def read_samba(
source: Union[str, Path, None],
file_list: Union[str, list[str]],
transport_only: bool = True,
data_dir: Union[str, Path, None] = None,
redownload: bool = False,
track_added_attrs: bool = False,
) -> list[xr.Dataset]:
"""Load the SAMBA transport datasets from remote URL or local file path into xarray Datasets.
Parameters
----------
source : str, optional
URL or local path to the dataset directory. If None, will use predefined URLs per file.
file_list : str or list of str, optional
Filename or list of filenames to process.
Defaults to SAMBA_DEFAULT_FILES.
transport_only : bool, optional
If True, restrict to transport files only.
data_dir : str, Path or None, optional
Optional local data directory.
redownload : bool, optional
If True, force redownload of the data.
track_added_attrs : bool, optional
If True, track which attributes were added during metadata enrichment.
Returns
-------
list of xr.Dataset
List of loaded xarray datasets with basic inline and file-specific metadata.
Raises
------
ValueError
If no source is provided for a file and no default URL mapping found.
FileNotFoundError
If the file cannot be downloaded or does not exist locally.
"""
log_info("Starting to read SAMBA dataset")
# Load YAML metadata with fallback
global_metadata, yaml_file_metadata = ReaderUtils.load_array_metadata_with_fallback(
DATASOURCE_ID, SAMBA_METADATA
)
# Ensure file_list has a default
if file_list is None:
file_list = SAMBA_DEFAULT_FILES
if transport_only:
file_list = SAMBA_TRANSPORT_FILES
if isinstance(file_list, str):
file_list = [file_list]
local_data_dir = ReaderUtils.setup_data_directory(data_dir)
# Print information about files being loaded
ReaderUtils.print_loading_info(file_list, DATASOURCE_ID, SAMBA_FILE_METADATA)
datasets = []
added_attrs_per_dataset = [] if track_added_attrs else None
for file in file_list:
if not (file.lower().endswith(".txt") or file.lower().endswith(".asc")):
log_warning("Skipping unsupported file type: %s", file)
continue
download_url = SAMBA_FILE_URLS.get(file)
if not download_url:
log_error("No download URL defined for SAMBA file: %s", file)
raise FileNotFoundError(f"No download URL defined for SAMBA file {file}")
file_path = utilities.resolve_file_path(
file_name=file,
source=source,
download_url=download_url,
local_data_dir=local_data_dir,
redownload=redownload,
)
# Parse ASCII file
try:
column_names, _ = utilities.parse_ascii_header(file_path, comment_char="%")
df = utilities.read_ascii_file(file_path, comment_char="%")
# Sanitize column names to create valid Python identifiers
# This handles cases like "Total MOC anomaly (relative to record-length average of 14.7 Sv)"
sanitized_column_names = [
sanitize_variable_name(name) for name in column_names
]
df.columns = sanitized_column_names
# Store original column names mapping for later use in variable mapping
# This enables tracking of original names -> sanitized names -> standardized names
original_to_sanitized = dict(zip(column_names, sanitized_column_names))
except (
OSError,
IOError,
ValueError,
KeyError,
pd.errors.EmptyDataError,
pd.errors.ParserError,
) as e:
log_error("Failed to parse ASCII file: %s: %s", file_path, e)
raise FileNotFoundError(
f"Failed to parse ASCII file: {file_path}: {e}"
) from e
# Time handling - use sanitized column names
# Find the sanitized versions of time columns
time_cols_needed = ["Year", "Month", "Day", "Hour"]
if "Upper_Abyssal" in file:
time_cols_needed.append("Minute")
# Map original time column names to their sanitized versions
sanitized_time_cols = []
missing_cols = []
for col in time_cols_needed:
if col in original_to_sanitized:
sanitized_time_cols.append(original_to_sanitized[col])
elif col in df.columns:
sanitized_time_cols.append(col) # Fallback if already sanitized
else:
missing_cols.append(col)
if missing_cols:
raise KeyError(f"Required time columns {missing_cols} not found in data")
try:
if "Upper_Abyssal" in file:
df["TIME"] = pd.to_datetime(df[sanitized_time_cols])
else:
df["TIME"] = pd.to_datetime(
df[sanitized_time_cols[:4]]
) # Year, Month, Day, Hour only
df = df.drop(columns=sanitized_time_cols)
except (ValueError, KeyError, TypeError) as e:
log_error("Failed to construct TIME column for %s: %s", file, e)
raise ValueError(f"Failed to construct TIME column for {file}: {e}") from e
# Convert DataFrame to xarray Dataset
try:
ds = df.set_index("TIME").to_xarray()
except (ValueError, KeyError, TypeError, AttributeError) as e:
log_error(
"Failed to convert DataFrame to xarray Dataset for %s: %s",
file,
e,
)
raise ValueError(
f"Failed to convert DataFrame to xarray Dataset for {file}: {e}",
) from e
# Attach metadata with optional tracking
if track_added_attrs:
ds, attr_changes = ReaderUtils.attach_metadata_with_tracking(
ds,
file,
file_path,
global_metadata,
yaml_file_metadata,
SAMBA_FILE_METADATA,
DATASOURCE_ID,
track_added_attrs=True,
)
added_attrs_per_dataset.append(attr_changes)
else:
ds = ReaderUtils.attach_metadata_with_tracking(
ds,
file,
file_path,
global_metadata,
yaml_file_metadata,
SAMBA_FILE_METADATA,
DATASOURCE_ID,
track_added_attrs=False,
)
# Update variable_mapping to use sanitized names as keys
# This allows standardization to find the mapping from sanitized names to standard names
if "variable_mapping" in ds.attrs:
original_mapping = ds.attrs["variable_mapping"].copy()
updated_mapping = {}
for original_name, standard_name in original_mapping.items():
# Find the sanitized version of this original name
sanitized_name = original_to_sanitized.get(original_name)
if sanitized_name and sanitized_name in ds.data_vars:
updated_mapping[sanitized_name] = standard_name
log_debug(
f"Updated variable mapping: {original_name} -> {sanitized_name} -> {standard_name}"
)
else:
# Keep original mapping in case sanitization didn't change it
updated_mapping[original_name] = standard_name
ds.attrs["variable_mapping"] = updated_mapping
# Store the full mapping chain for reference (useful for reports)
ds.attrs["original_variable_mapping"] = original_mapping
ds.attrs["sanitization_mapping"] = original_to_sanitized
datasets.append(ds)
# Use ReaderUtils for validation
ReaderUtils.validate_datasets_loaded(datasets, file_list)
# Handle track_added_attrs parameter
if track_added_attrs:
return datasets, added_attrs_per_dataset
else:
return datasets