Source code for amocatlas.read_move

from pathlib import Path
from typing import Union

import xarray as xr
import numpy as np
import pandas as pd

from amocatlas import logger, utilities
from amocatlas.utilities import apply_defaults

log = logger.log  # ✅ use the global logger

# Default source and file list
MOVE_DEFAULT_SOURCE = (
    "https://dods.ndbc.noaa.gov/thredds/fileServer/oceansites/DATA_GRIDDED/MOVE/"
)
MOVE_DEFAULT_FILES = [
    "OS_MOVE_20000206-20221014_DPR_VOLUMETRANSPORT.nc",
    "OS_MOVE_20000101-20221021_GRD_CURRENTS-AT-SITES-MOVE3-MOVE4.nc",
    "OS_MOVE_20000101-20221018_GRD_TEMPERATURE-SALINITY-AT-SITES-MOVE1-MOVE3.nc",
]
MOVE_TRANSPORT_FILES = ["OS_MOVE_20000206-20221014_DPR_VOLUMETRANSPORT.nc"]

# Global metadata for MOVE
MOVE_METADATA = {
    "description": "MOVE transport estimates dataset from UCSD mooring project",
    "project": "Meridional Overturning Variability Experiment (MOVE)",
    "weblink": "https://dods.ndbc.noaa.gov/thredds/fileServer/oceansites/DATA_GRIDDED/MOVE/",
    "comment": "Dataset accessed and processed via http://github.com/AMOCcommunity/amocatlas",
    # DOI can be added here when available
    "acknowledgement": "The MOVE project is made possible with funding from the NOAA Climate Program Office. Initial funding came from the German Bundesministerium fuer Bildung und Forschung.",
}

# File-specific metadata placeholder
MOVE_FILE_METADATA = {
    "OS_MOVE_20000206-20221014_DPR_VOLUMETRANSPORT.nc": {
        "data_product": "MOVE transport time series",
        # Add specific acknowledgments here if needed in future
    },
}


[docs] @apply_defaults(MOVE_DEFAULT_SOURCE, MOVE_DEFAULT_FILES) def read_move( source: str, file_list: str | list[str], transport_only: bool = True, data_dir: Union[str, Path, None] = None, redownload: bool = False, ) -> list[xr.Dataset]: """Load the MOVE transport dataset from a URL or local file path into xarray Datasets. Parameters ---------- source : str, optional URL or local path to the NetCDF file(s). Defaults to the MOVE data repository URL. file_list : str or list of str, optional Filename or list of filenames to process. Defaults to MOVE_DEFAULT_FILES. transport_only : bool, optional If True, restrict to transport files only. data_dir : str, Path or None, optional Optional local data directory. redownload : bool, optional If True, force redownload of the data. Returns ------- list of xr.Dataset List of loaded xarray datasets with basic inline and file-specific metadata. Raises ------ ValueError If the source is neither a valid URL nor a directory path. FileNotFoundError If the file cannot be downloaded or does not exist locally. """ log.info("Starting to read MOVE dataset") if transport_only: file_list = MOVE_TRANSPORT_FILES if isinstance(file_list, str): file_list = [file_list] local_data_dir = Path(data_dir) if data_dir else utilities.get_default_data_dir() local_data_dir.mkdir(parents=True, exist_ok=True) datasets = [] for file in file_list: if not file.lower().endswith(".nc"): log.warning("Skipping non-NetCDF file: %s", file) continue download_url = ( f"{source.rstrip('/')}/{file}" if utilities._is_valid_url(source) else None ) file_path = utilities.resolve_file_path( file_name=file, source=source, download_url=download_url, local_data_dir=local_data_dir, redownload=redownload, ) # Open dataset try: log.info("Opening MOVE dataset: %s", file_path) ds = xr.open_dataset(file_path, decode_times=False) except Exception as e: log.error("Failed to open NetCDF file: %s: %s", file_path, e) raise FileNotFoundError(f"Failed to open NetCDF file: {file_path}: {e}") # Clean up time variable if "TIME" in ds.variables: time_raw = ds["TIME"].values valid = (time_raw > 0) & (time_raw < 30000) n_invalid = (~valid).sum() if n_invalid > 0: log.info( f"Found {n_invalid} invalid time values in {file_path}; replacing with NaN." ) clean_time = xr.where(valid, time_raw, np.nan) base = np.datetime64("1950-01-01") time_converted = base + clean_time * np.timedelta64(1, "D") # Replace the time in the dataset ds["TIME"] = ("TIME", time_converted) ds["TIME"].attrs.update( { "units": "days since 1950-01-01", } ) log.debug(f"Converted time using base 1950-01-01 for {file_path}") else: log.warning(f"No TIME variable found in {file_path}") # Filter out NaT time values and corresponding dataset entries time_pd = pd.to_datetime(ds["TIME"].values) valid_time_mask = ~pd.isna(time_pd) if (~valid_time_mask).any(): n_removed = (~valid_time_mask).sum() log.info( f"Removing {n_removed} entries with invalid NaT time values from {file_path}" ) ds = ds.isel(TIME=valid_time_mask) # Attach metadata file_metadata = MOVE_FILE_METADATA.get(file, {}) log.info("Attaching metadata to dataset from file: %s", file) utilities.safe_update_attrs( ds, { "source_file": file, "source_path": str(file_path), **MOVE_METADATA, **file_metadata, }, ) datasets.append(ds) if not datasets: log.error("No valid NetCDF files found in %s", file_list) raise FileNotFoundError(f"No valid NetCDF files found in {file_list}") log.info("Successfully loaded %d MOVE dataset(s)", len(datasets)) return datasets