Source code for amocatlas.data_sources.dso

"""DSO (Denmark Strait Overflow) data reader for AMOCatlas.

This module provides functions to read and process data from the Denmark Strait
Overflow observing array. DSO monitors the transport of dense water flowing
southward through Denmark Strait, which is a key component of the Atlantic
Meridional Overturning Circulation.
"""

from pathlib import Path
from typing import Union

import numpy as np
import pandas as pd
import xarray as xr

# Import the modules used
from amocatlas import logger, utilities
from amocatlas.logger import log_error, log_info, log_warning
from amocatlas.utilities import apply_defaults
from amocatlas.reader_utils import ReaderUtils

log = logger.log  # Use the global logger

# Datasource identifier for automatic standardization
DATASOURCE_ID = "dso"

# Denmark Strait Overflow
# Default source and file list DSO

DSO_DEFAULT_SOURCE = (
    "https://icdc.cen.uni-hamburg.de/thredds/fileServer/ftpthredds/dso_transport/"
)
DSO_DEFAULT_FILES = ["DSO_transport_hourly_1996_2021.nc"]
DSO_TRANSPORT_FILES = ["DSO_transport_hourly_1996_2021.nc"]


DSO_METADATA = {
    "project": "Overflow time-series through Denmark Strait",
    "weblink": "https://www.cen.uni-hamburg.de/en/icdc/data/ocean/denmark-strait-overflow.html",
    "comment": "Dataset accessed and processed via http://github.com/AMOCcommunity/amocatlas",
    "acknowledgement": "The timeseries was generated by Institution of Oceanography Hamburg and Hafrannsóknastofnun / Marine and Freshwater Research Institute (Reykjavik, Iceland). They were supported through funding from the NACLIM, EU-FP7, grant agr. n.308299, until 2016, and from RACE II (Förderkennzeichen 03F0729B, until 2018), RACE-Synthese (Förderkennzeichen 03F0825B, until 2020) German Federal Ministry for Education and Research (BMBF). Nordic WOCE, VEINS, MOEN (contract no. EVK2-CT-2002-00141), ASOF-W (contract no. EVK2-CT-2002-00149), NAClim (grant agr. nr. 308299) THOR (grant agr. nr. 212643), AtlantOS, Blue Action. Collaborative Research Centre TRR 181 Energy Transfers in Atmosphere and Ocean funded by the Deutsche Forschungsgemeinschaft (DFG, German Research Foundation) - Project number 274762653. Thanks to ICDC, CEN, University of Hamburg for data support.",
    "doi": "doi:10.1002/2017JC012803",
    "paper": "Jochumsen, K., M. Moritz, N. Nunes, D. Quadfasel, K. M. Larsen, B. Hansen, H. Valdimarsson and S. Jonsson (2017): Revised transport estimates of the Denmark Strait Overflow, Journal of Geophysical Research J. Geophys. Res., 122, 3434-3450, doi:10.1002/2017JC012803.",
}

DSO_FILE_METADATA = {
    "DSO_transport_hourly_1996_2021.nc": {
        "data_product": "Overflow time-series through Denmark Strait",
    },
}


[docs] @apply_defaults(DSO_DEFAULT_SOURCE, DSO_DEFAULT_FILES) def read_dso( source: str, ## source: Union[str, Path, None], file_list: Union[str, list[str]], transport_only: bool = True, data_dir: Union[str, Path, None] = None, redownload: bool = False, track_added_attrs: bool = False, ) -> list[xr.Dataset]: """Load the Denmark Strait Overflow (DSO) datasets from a URL or local file path into xarray Datasets. Parameters ---------- source : str, optional Local path to the data directory (remote source is handled per-file). file_list : str or list of str, optional Filename or list of filenames to process. Defaults to DSO_DEFAULT_FILES. transport_only : bool, optional If True, restrict to transport files only. data_dir : str, Path or None, optional Optional local data directory. redownload : bool, optional If True, force redownload of the data. track_added_attrs : bool, optional If True, track which attributes were added during metadata enrichment. Returns ------- ------- list of xr.Dataset List of loaded xarray datasets with basic inline and file-specific metadata. Notes ----- The original DSO_transport_hourly_1996_2021.nc file contains a corrupted DEPTH coordinate value (9.97e+36). This function automatically detects and corrects this by setting the DEPTH to NaN and documenting the correction in the dataset's history attribute. Raises ------ ValueError If no source is provided for a file and no default URL mapping is found. FileNotFoundError If the file cannot be downloaded or does not exist locally. """ log.info("Starting to read DSO dataset") # Load YAML metadata with fallback global_metadata, yaml_file_metadata = ReaderUtils.load_array_metadata_with_fallback( DATASOURCE_ID, DSO_METADATA ) # Ensure file_list has a default if file_list is None: file_list = DSO_DEFAULT_FILES if transport_only: file_list = DSO_TRANSPORT_FILES if isinstance(file_list, str): file_list = [file_list] # Determine the local storage path local_data_dir = Path(data_dir) if data_dir else utilities.get_default_data_dir() local_data_dir.mkdir(parents=True, exist_ok=True) # Print information about files being loaded ReaderUtils.print_loading_info(file_list, DATASOURCE_ID, DSO_FILE_METADATA) datasets = [] added_attrs_per_dataset = [] if track_added_attrs else None for file in file_list: if not file.lower().endswith(".nc"): log_warning("Skipping non-NetCDF file: %s", file) continue download_url = ( f"{source.rstrip('/')}/{file}" if utilities.is_valid_url(source) else None ) file_path = utilities.resolve_file_path( file_name=file, source=source, download_url=download_url, local_data_dir=local_data_dir, redownload=redownload, ) # Use ReaderUtils for consistent dataset loading ds = ReaderUtils.safe_load_dataset(file_path) # Fix corrupted DEPTH value in DSO dataset # The original data contains a corrupted depth value (~9.97e36) # Mark as missing rather than inserting estimated value if "DEPTH" in ds.coords: depth_val = float(ds.DEPTH.values[0]) if depth_val > 1000000: # Clearly corrupted value log_info("Marking corrupted DEPTH value %.2e as NaN", depth_val) # Set depth to NaN to indicate missing/corrupted data # Create new DataArray with NaN value to avoid read-only array issues new_depth_values = np.full_like(ds["DEPTH"].values, np.nan) ds["DEPTH"] = ds["DEPTH"].copy(data=new_depth_values) # Update DEPTH attributes to reflect missing data ds["DEPTH"].attrs.update( { "long_name": "Depth below surface of the water", "standard_name": "depth", "units": "m", # TODO: This is clunky. We need a better way to update the list of preferred units so we don't have to do it in 5 different places. "QC_indicator": "bad data", "comment": "Original depth value was corrupted (9.97e+36), set to NaN", } ) # Document the fix in the history attribute current_time = pd.Timestamp.now(tz="UTC").strftime("%Y-%m-%dT%H:%M:%SZ") existing_history = ds.attrs.get("history", "") corruption_note = f"{current_time} AMOCatlas: Corrupted DEPTH value in DSO_transport_hourly_1996_2021.nc marked as NaN (was 9.97e+36)" if existing_history: ds.attrs["history"] = f"{existing_history}; {corruption_note}" else: ds.attrs["history"] = corruption_note # Attach metadata with optional tracking if track_added_attrs: ds, attr_changes = ReaderUtils.attach_metadata_with_tracking( ds, file, file_path, global_metadata, yaml_file_metadata, DSO_FILE_METADATA, DATASOURCE_ID, track_added_attrs=True, ) added_attrs_per_dataset.append(attr_changes) else: ds = ReaderUtils.attach_metadata_with_tracking( ds, file, file_path, global_metadata, yaml_file_metadata, DSO_FILE_METADATA, DATASOURCE_ID, track_added_attrs=False, ) datasets.append(ds) if not datasets: log_error("No valid DSO NetCDF files found in %s", file_list) raise FileNotFoundError(f"No valid DSO NetCDF files found in {file_list}") log_info("Successfully loaded %d DSO dataset(s)", len(datasets)) if track_added_attrs: return datasets, added_attrs_per_dataset else: return datasets