daskpython-xarraynetcdf

xarray and dask: efficiently processing a large netcdf file


I am trying to do a simple calculation on a very large netcdf file, and am struggling to speed it up -- probably because I program primarily in julia and R. I think xarray/dask are the best approach for this issue, but I'm struggling to get it to work.

The netcdf file has three dimensions: lat (4320), lon (8640), and time (792). It is a global dataset of precipitation. I want to calculate the yearly mean of precipitation (the time dimension is in month units, so every 12) at a set of 129,966 lat/lon point locations.

Here is what I have as a rough skeleton:


import xarray as xr
import netCDF4 as nc
import numpy as np
import pandas as pd
from dask import delayed, compute
from dask.distributed import Client
import dask
from tqdm import tqdm

num_workers = 10  # Adjust this to the number of cores you want to use
threads_per_worker = 1  # Adjust as needed

client = Client(n_workers = num_workers, threads_per_worker = threads_per_worker, dashboard_address=':8787')

## this is the netcdf file
ds = xr.open_mfdataset(f'../data/terraclim/TerraClimate_{var}_*.nc', chunks='auto')

lon = ds['lon'].values
lat = ds['lat'].values

# Initialize arrays for indices
lonindx = np.empty(coords.shape[0], dtype=int)
latindx = np.empty(coords.shape[0], dtype=int)

# Loop through each row of coords, a pandas dataframe of lat and lon coordinates to extract data from
for i in tqdm(range(coords.shape[0]), desc="Processing"):
    lonindx[i] = np.argmin(np.abs(lon - coords.at[i, 'LON']))  
    latindx[i] = np.argmin(np.abs(lat - coords.at[i, 'LAT']))  

## indexes the netcdf file (ds) by lat and lon and returns an array of year-means
def compute_means(i):

    vals = ds[var][0:791, latindx[i], lonindx[i]].values
    means = [np.mean(year) for year in np.array_split(vals, 66)]

    return means

results = []
for i in range(coords.shape[0]):  
    res = dask.delayed(compute_means)(i)
    results.append(res)

results = dask.compute(*results)

When I run the above script, however, it crashes due to memory limitations. I think I may be missing a more appropriate way to use xarray's internal compatability with dask, but I am totally lost. Any help would be greatly appreciated! As would solutions in julia or R.


Solution

  • You have two issues that you need to deal with and neither has anything to do with the tools that you are using to process the data.

    1. Memory exhaustion A typical, annual TerraClimate data variable has 8,640 x 4,320 x 12 x 8 = 3,583,180,800 bytes. No matter how many CPU cores you throw at it, this is very likely to surpass the available RAM on any system but the most powerful ones if you start processing multiple files in parallel. TerraClimate, however, produces its data on a yearly basis and those are exactly the files that you have when looking at your open_mfdataset() command. So why not process the files year-by-year? On my home system (MacBook Pro M2, 16GB RAM) processing a single var/year combination (= 1 TerraClimate file) using R takes 43 seconds, meaning you can process all 58 years in less than an hour. Stitch the resulting arrays together with abind() and you are done. See the R code below.
    2. Data layout Typically, netCDF files have an internal data layout that - simply said - goes from the right-most dimension (lon in this case) to the left-most dimension (time). It is then VERY inefficient to profile the left-most dimension over the other dimensions as you do in compute_means() because reading the required data from file means that the file has to be traversed in its entirety to pick up data in 8-byte chunks scattered all over the place. This is your killer in terms of performance, as your I/O goes through the roof. (This may be mitigated by the fact that you try to break up the process in separate threads for each year, but hey! then why go through the MFDataset approach in the first place if the individual files are already organised that way?)

    Process by year, in R

    # install.packages("ncdfCF")
    library(ncdfCF)
    library(abind)
    
    var <- "ppt"
    years <- 1958:2024
    
    # This loops over the years and calculates the annual sum of precipitation
    # for every lat/lon cell. You could also use `mean` for other variables like
    # tmin, tmax.
    # The result is a list of matrices, one for each year.
    yrs <- lapply(years, function (y) {
      fn <- paste0("../data/terraclim/TerraClimate_", var, "_", y, ".nc"))
      ds <- open_ncdf(fn)
      data <- ds[[var]][]
      apply(data, 1:2, sum, na.rm = TRUE)
    })
    
    # Bind the list of matrices into one array, with the third dimension 
    # representing years.
    all_years <- abind(yrs, along = 3)
    
    

    You said you wanted to calculate "the yearly mean of precipitation". The above is the yearly sum of precipitation. If you meant to say "the mean of the yearly sums of precipitation", then you should take the mean like so: ppt_multiannual_mean <- apply(all_years, 1:2, mean, na.rm = TRUE).

    Note that the na.rm = TRUE is essential: 70% of the Earth's surface is ocean, after all.