"""DSO (Denmark Strait Overflow) data reader for AMOCatlas.
This module provides functions to read and process data from the Denmark Strait
Overflow observing array. DSO monitors the transport of dense water flowing
southward through Denmark Strait, which is a key component of the Atlantic
Meridional Overturning Circulation.
"""
from pathlib import Path
from typing import Union
import numpy as np
import pandas as pd
import xarray as xr
# Import the modules used
from amocatlas import logger, utilities
from amocatlas.logger import log_error, log_info, log_warning
from amocatlas.utilities import apply_defaults
from amocatlas.reader_utils import ReaderUtils
log = logger.log # Use the global logger
# Datasource identifier for automatic standardization
DATASOURCE_ID = "dso"
# Denmark Strait Overflow
# Default source and file list DSO
DSO_DEFAULT_SOURCE = (
"https://icdc.cen.uni-hamburg.de/thredds/fileServer/ftpthredds/dso_transport/"
)
DSO_DEFAULT_FILES = ["DSO_transport_hourly_1996_2021.nc"]
DSO_TRANSPORT_FILES = ["DSO_transport_hourly_1996_2021.nc"]
DSO_METADATA = {
"project": "Overflow time-series through Denmark Strait",
"weblink": "https://www.cen.uni-hamburg.de/en/icdc/data/ocean/denmark-strait-overflow.html",
"comment": "Dataset accessed and processed via http://github.com/AMOCcommunity/amocatlas",
"acknowledgement": "The timeseries was generated by Institution of Oceanography Hamburg and Hafrannsóknastofnun / Marine and Freshwater Research Institute (Reykjavik, Iceland). They were supported through funding from the NACLIM, EU-FP7, grant agr. n.308299, until 2016, and from RACE II (Förderkennzeichen 03F0729B, until 2018), RACE-Synthese (Förderkennzeichen 03F0825B, until 2020) German Federal Ministry for Education and Research (BMBF). Nordic WOCE, VEINS, MOEN (contract no. EVK2-CT-2002-00141), ASOF-W (contract no. EVK2-CT-2002-00149), NAClim (grant agr. nr. 308299) THOR (grant agr. nr. 212643), AtlantOS, Blue Action. Collaborative Research Centre TRR 181 Energy Transfers in Atmosphere and Ocean funded by the Deutsche Forschungsgemeinschaft (DFG, German Research Foundation) - Project number 274762653. Thanks to ICDC, CEN, University of Hamburg for data support.",
"doi": "doi:10.1002/2017JC012803",
"paper": "Jochumsen, K., M. Moritz, N. Nunes, D. Quadfasel, K. M. Larsen, B. Hansen, H. Valdimarsson and S. Jonsson (2017): Revised transport estimates of the Denmark Strait Overflow, Journal of Geophysical Research J. Geophys. Res., 122, 3434-3450, doi:10.1002/2017JC012803.",
}
DSO_FILE_METADATA = {
"DSO_transport_hourly_1996_2021.nc": {
"data_product": "Overflow time-series through Denmark Strait",
},
}
[docs]
@apply_defaults(DSO_DEFAULT_SOURCE, DSO_DEFAULT_FILES)
def read_dso(
source: str,
## source: Union[str, Path, None],
file_list: Union[str, list[str]],
transport_only: bool = True,
data_dir: Union[str, Path, None] = None,
redownload: bool = False,
track_added_attrs: bool = False,
) -> list[xr.Dataset]:
"""Load the Denmark Strait Overflow (DSO) datasets from a URL or local file path into xarray Datasets.
Parameters
----------
source : str, optional
Local path to the data directory (remote source is handled per-file).
file_list : str or list of str, optional
Filename or list of filenames to process.
Defaults to DSO_DEFAULT_FILES.
transport_only : bool, optional
If True, restrict to transport files only.
data_dir : str, Path or None, optional
Optional local data directory.
redownload : bool, optional
If True, force redownload of the data.
track_added_attrs : bool, optional
If True, track which attributes were added during metadata enrichment.
Returns
-------
------- list of xr.Dataset
List of loaded xarray datasets with basic inline and file-specific metadata.
Notes
-----
The original DSO_transport_hourly_1996_2021.nc file contains a corrupted DEPTH
coordinate value (9.97e+36). This function automatically detects and corrects
this by setting the DEPTH to NaN and documenting the correction in the dataset's
history attribute.
Raises
------
ValueError
If no source is provided for a file and no default URL mapping is found.
FileNotFoundError If the file cannot be downloaded or does not exist locally.
"""
log.info("Starting to read DSO dataset")
# Load YAML metadata with fallback
global_metadata, yaml_file_metadata = ReaderUtils.load_array_metadata_with_fallback(
DATASOURCE_ID, DSO_METADATA
)
# Ensure file_list has a default
if file_list is None:
file_list = DSO_DEFAULT_FILES
if transport_only:
file_list = DSO_TRANSPORT_FILES
if isinstance(file_list, str):
file_list = [file_list]
# Determine the local storage path
local_data_dir = Path(data_dir) if data_dir else utilities.get_default_data_dir()
local_data_dir.mkdir(parents=True, exist_ok=True)
# Print information about files being loaded
ReaderUtils.print_loading_info(file_list, DATASOURCE_ID, DSO_FILE_METADATA)
datasets = []
added_attrs_per_dataset = [] if track_added_attrs else None
for file in file_list:
if not file.lower().endswith(".nc"):
log_warning("Skipping non-NetCDF file: %s", file)
continue
download_url = (
f"{source.rstrip('/')}/{file}" if utilities.is_valid_url(source) else None
)
file_path = utilities.resolve_file_path(
file_name=file,
source=source,
download_url=download_url,
local_data_dir=local_data_dir,
redownload=redownload,
)
# Use ReaderUtils for consistent dataset loading
ds = ReaderUtils.safe_load_dataset(file_path)
# Fix corrupted DEPTH value in DSO dataset
# The original data contains a corrupted depth value (~9.97e36)
# Mark as missing rather than inserting estimated value
if "DEPTH" in ds.coords:
depth_val = float(ds.DEPTH.values[0])
if depth_val > 1000000: # Clearly corrupted value
log_info("Marking corrupted DEPTH value %.2e as NaN", depth_val)
# Set depth to NaN to indicate missing/corrupted data
# Create new DataArray with NaN value to avoid read-only array issues
new_depth_values = np.full_like(ds["DEPTH"].values, np.nan)
ds["DEPTH"] = ds["DEPTH"].copy(data=new_depth_values)
# Update DEPTH attributes to reflect missing data
ds["DEPTH"].attrs.update(
{
"long_name": "Depth below surface of the water",
"standard_name": "depth",
"units": "m", # TODO: This is clunky. We need a better way to update the list of preferred units so we don't have to do it in 5 different places.
"QC_indicator": "bad data",
"comment": "Original depth value was corrupted (9.97e+36), set to NaN",
}
)
# Document the fix in the history attribute
current_time = pd.Timestamp.now(tz="UTC").strftime("%Y-%m-%dT%H:%M:%SZ")
existing_history = ds.attrs.get("history", "")
corruption_note = f"{current_time} AMOCatlas: Corrupted DEPTH value in DSO_transport_hourly_1996_2021.nc marked as NaN (was 9.97e+36)"
if existing_history:
ds.attrs["history"] = f"{existing_history}; {corruption_note}"
else:
ds.attrs["history"] = corruption_note
# Attach metadata with optional tracking
if track_added_attrs:
ds, attr_changes = ReaderUtils.attach_metadata_with_tracking(
ds,
file,
file_path,
global_metadata,
yaml_file_metadata,
DSO_FILE_METADATA,
DATASOURCE_ID,
track_added_attrs=True,
)
added_attrs_per_dataset.append(attr_changes)
else:
ds = ReaderUtils.attach_metadata_with_tracking(
ds,
file,
file_path,
global_metadata,
yaml_file_metadata,
DSO_FILE_METADATA,
DATASOURCE_ID,
track_added_attrs=False,
)
datasets.append(ds)
if not datasets:
log_error("No valid DSO NetCDF files found in %s", file_list)
raise FileNotFoundError(f"No valid DSO NetCDF files found in {file_list}")
log_info("Successfully loaded %d DSO dataset(s)", len(datasets))
if track_added_attrs:
return datasets, added_attrs_per_dataset
else:
return datasets