matlabparallel-processingdistributed-computingspmd

How do I index codistributed arrays in a spmd block


I am doing a very large calculation (atmospheric absorption) that has a lot of individual narrow peaks that all get added up at the end. For each peak, I have pre-calculated the range over which the value of the peak shape function is above my chosen threshold, and I am then going line by line and adding the peaks to my spectrum. A minimum example is given below:

X = 1:1e7;
K = numel(a); % count the number of peaks I have.
spectrum = zeros(size(X));
for k = 1:K
    grid = X >= rng(1,k) & X <= rng(2,k);
    spectrum(grid) = spectrum(grid) + peakfn(X(grid),a(k),b(k),c(k)]);
end

Here, each peak has some parameters that define the position and shape (a,b,c), and a range over which to do the calculation (rng). This works great, and on my machine it benchmarks at around 220 seconds to do a complete data set. However, I have a 4 core machine and I would eventually like to run this on a cluster, so I'd like to parallelize it and make it scaleable.

Because each loop relies on the results of the previous iteration, I cannot use parfor, so I am taking my first step into learning how to use spmd blocks. My first try looked like this:

X = 1:1e7;
cores = matlabpool('size');
K = numel(a);
spectrum = zeros(size(X),cores);
spmd
    n = labindex:cores:K
    N = numel(n);
    for k = 1:N
        grid = X >= rng(1,n(k)) & X <= rng(2,n(k));
        spectrum(grid,labindex) = spectrum(grid,labindex) + peakfn(X(grid),a(n(k)),b(n(k)),c(n(k))]);
    end
end
finalSpectrum = sum(spectrum,2);

This almost works. The program crashes at the last line because spectrum is of type Composite, and the documentation for 2013a is spotty on how to turn Composite data into a matrix (cell2mat does not work). This also does not scale well because the more cores I have, the larger the matrix is, and that large matrix has to get copied to each worker, which then ignores most of the data. Question 1: how do I turn a Composite data type into a useable array?

The second thing I tried was to use a codistributed array.

spmd
    spectrum = codistributed.zeros(K,cores);
    disp(size(getLocalPart(spectrum)))
end

This tells me that each worker has a single vector of size [K 1], which I believe is what I want, but when I try to then meld the above methods

spmd
    spectrum = codistributed.zeros(K,cores);
    n = labindex:cores:K
    N = numel(n);
    for k = 1:N
        grid = X >= rng(1,n(k)) & X <= rng(2,n(k));
        spectrum(grid) = spectrum(grid) + peakfn(X(grid),a(n(k)),b(n(k)),c(n(k))]);        end
    finalSpectrum = gather(spectrum);
end
finalSpectrum = sum(finalSpectrum,2);

I get Matrix dimensions must agree errors. Since it's in a parallel block, I can't use my normal debugging crutch of stepping through the loop and seeing what the size of each block is at each point to see what's going on. Question 2: what is the proper way to index into and out of a codistributed array in an spmd block?


Solution

  • Regarding question#1, the Composite variable in the client basically refers to a non-distributed variant array stored on the workers. You can access the array from each worker by {}-indexing using its corresponding labindex (e.g: spectrum{1}, spectrum{2}, ..).

    For your code that would be: finalSpectrum = sum(cat(2,spectrum{:}), 2);


    Now I tried this problem myself using random data. Below are three implementations to compare (see here to understand the difference between distributed and nondistributed arrays). First we start with the common data:

    len = 100;    % spectrum length
    K = 10;       % number of peaks
    X = 1:len;
    
    % random position and shape parameters
    a = rand(1,K); b = rand(1,K); c = rand(1,K);
    
    % random peak ranges (lower/upper thresholds)
    ranges = sort(randi([1 len], [2 K]));
    
    % dummy peakfn() function
    fcn = @(x,a,b,c) x+a+b+c;
    
    % prepare a pool of MATLAB workers
    matlabpool open
    

    1) Serial for-loop:

    spectrum = zeros(size(X));
    for i=1:size(ranges,2)
        r = ranges(:,i);
        idx = (r(1) <= X & X <= r(2));
        spectrum(idx) = spectrum(idx) + fcn(X(idx), a(i), b(i), c(i));
    end
    s1 = spectrum;
    
    clear spectrum i r idx
    

    2) SPMD with Composite array

    spmd
        spectrum = zeros(1,len);
        ind = labindex:numlabs:K;
        for i=1:numel(ind)
            r = ranges(:,ind(i));
            idx = (r(1) <= X & X <= r(2));
            spectrum(idx) = spectrum(idx) + ...
                feval(fcn, X(idx), a(ind(i)), b(ind(i)), c(ind(i)));
        end
    end
    s2 = sum(vertcat(spectrum{:}));
    
    clear spectrum i r idx ind
    

    3) SPMD with co-distributed array

    spmd
        spectrum = zeros(numlabs, len, codistributor('1d',1));
        ind = labindex:numlabs:K;
        for i=1:numel(ind)
            r = ranges(:,ind(i));
            idx = (r(1) <= X & X <= r(2));
            spectrum(labindex,idx) = spectrum(labindex,idx) + ...
                feval(fcn, X(idx), a(ind(i)), b(ind(i)), c(ind(i)));
        end
    end
    s3 = sum(gather(spectrum));
    
    clear spectrum i r idx ind
    

    All three results should be equal (to within an acceptably small margin of error)

    >> max([max(s1-s2), max(s1-s3), max(s2-s3)])
    ans =
       2.8422e-14