I am doing a very large calculation (atmospheric absorption) that has a lot of individual narrow peaks that all get added up at the end. For each peak, I have pre-calculated the range over which the value of the peak shape function is above my chosen threshold, and I am then going line by line and adding the peaks to my spectrum. A minimum example is given below:
X = 1:1e7;
K = numel(a); % count the number of peaks I have.
spectrum = zeros(size(X));
for k = 1:K
grid = X >= rng(1,k) & X <= rng(2,k);
spectrum(grid) = spectrum(grid) + peakfn(X(grid),a(k),b(k),c(k)]);
end
Here, each peak has some parameters that define the position and shape (a
,b
,c
), and a range over which to do the calculation (rng
). This works great, and on my machine it benchmarks at around 220 seconds to do a complete data set. However, I have a 4 core machine and I would eventually like to run this on a cluster, so I'd like to parallelize it and make it scaleable.
Because each loop relies on the results of the previous iteration, I cannot use parfor
, so I am taking my first step into learning how to use spmd
blocks. My first try looked like this:
X = 1:1e7;
cores = matlabpool('size');
K = numel(a);
spectrum = zeros(size(X),cores);
spmd
n = labindex:cores:K
N = numel(n);
for k = 1:N
grid = X >= rng(1,n(k)) & X <= rng(2,n(k));
spectrum(grid,labindex) = spectrum(grid,labindex) + peakfn(X(grid),a(n(k)),b(n(k)),c(n(k))]);
end
end
finalSpectrum = sum(spectrum,2);
This almost works. The program crashes at the last line because spectrum
is of type Composite
, and the documentation for 2013a
is spotty on how to turn Composite
data into a matrix (cell2mat
does not work). This also does not scale well because the more cores I have, the larger the matrix is, and that large matrix has to get copied to each worker, which then ignores most of the data. Question 1: how do I turn a Composite data type into a useable array?
The second thing I tried was to use a codistributed
array.
spmd
spectrum = codistributed.zeros(K,cores);
disp(size(getLocalPart(spectrum)))
end
This tells me that each worker has a single vector of size [K 1], which I believe is what I want, but when I try to then meld the above methods
spmd
spectrum = codistributed.zeros(K,cores);
n = labindex:cores:K
N = numel(n);
for k = 1:N
grid = X >= rng(1,n(k)) & X <= rng(2,n(k));
spectrum(grid) = spectrum(grid) + peakfn(X(grid),a(n(k)),b(n(k)),c(n(k))]); end
finalSpectrum = gather(spectrum);
end
finalSpectrum = sum(finalSpectrum,2);
I get Matrix dimensions must agree
errors. Since it's in a parallel block, I can't use my normal debugging crutch of stepping through the loop and seeing what the size of each block is at each point to see what's going on. Question 2: what is the proper way to index into and out of a codistributed array in an spmd
block?
Regarding question#1, the Composite
variable in the client basically refers to a non-distributed variant array stored on the workers. You can access the array from each worker by {}
-indexing using its corresponding labindex (e.g: spectrum{1}
, spectrum{2}
, ..).
For your code that would be: finalSpectrum = sum(cat(2,spectrum{:}), 2);
Now I tried this problem myself using random data. Below are three implementations to compare (see here to understand the difference between distributed and nondistributed arrays). First we start with the common data:
len = 100; % spectrum length
K = 10; % number of peaks
X = 1:len;
% random position and shape parameters
a = rand(1,K); b = rand(1,K); c = rand(1,K);
% random peak ranges (lower/upper thresholds)
ranges = sort(randi([1 len], [2 K]));
% dummy peakfn() function
fcn = @(x,a,b,c) x+a+b+c;
% prepare a pool of MATLAB workers
matlabpool open
spectrum = zeros(size(X));
for i=1:size(ranges,2)
r = ranges(:,i);
idx = (r(1) <= X & X <= r(2));
spectrum(idx) = spectrum(idx) + fcn(X(idx), a(i), b(i), c(i));
end
s1 = spectrum;
clear spectrum i r idx
spmd
spectrum = zeros(1,len);
ind = labindex:numlabs:K;
for i=1:numel(ind)
r = ranges(:,ind(i));
idx = (r(1) <= X & X <= r(2));
spectrum(idx) = spectrum(idx) + ...
feval(fcn, X(idx), a(ind(i)), b(ind(i)), c(ind(i)));
end
end
s2 = sum(vertcat(spectrum{:}));
clear spectrum i r idx ind
spmd
spectrum = zeros(numlabs, len, codistributor('1d',1));
ind = labindex:numlabs:K;
for i=1:numel(ind)
r = ranges(:,ind(i));
idx = (r(1) <= X & X <= r(2));
spectrum(labindex,idx) = spectrum(labindex,idx) + ...
feval(fcn, X(idx), a(ind(i)), b(ind(i)), c(ind(i)));
end
end
s3 = sum(gather(spectrum));
clear spectrum i r idx ind
All three results should be equal (to within an acceptably small margin of error)
>> max([max(s1-s2), max(s1-s3), max(s2-s3)])
ans =
2.8422e-14