4

I am doing a very large calculation (atmospheric absorption) that has a lot of individual narrow peaks that all get added up at the end. For each peak, I have pre-calculated the range over which the value of the peak shape function is above my chosen threshold, and I am then going line by line and adding the peaks to my spectrum. A minimum example is given below:

X = 1:1e7;
K = numel(a); % count the number of peaks I have.
spectrum = zeros(size(X));
for k = 1:K
    grid = X >= rng(1,k) & X <= rng(2,k);
    spectrum(grid) = spectrum(grid) + peakfn(X(grid),a(k),b(k),c(k)]);
end

Here, each peak has some parameters that define the position and shape (a,b,c), and a range over which to do the calculation (rng). This works great, and on my machine it benchmarks at around 220 seconds to do a complete data set. However, I have a 4 core machine and I would eventually like to run this on a cluster, so I'd like to parallelize it and make it scaleable.

Because each loop relies on the results of the previous iteration, I cannot use parfor, so I am taking my first step into learning how to use spmd blocks. My first try looked like this:

X = 1:1e7;
cores = matlabpool('size');
K = numel(a);
spectrum = zeros(size(X),cores);
spmd
    n = labindex:cores:K
    N = numel(n);
    for k = 1:N
        grid = X >= rng(1,n(k)) & X <= rng(2,n(k));
        spectrum(grid,labindex) = spectrum(grid,labindex) + peakfn(X(grid),a(n(k)),b(n(k)),c(n(k))]);
    end
end
finalSpectrum = sum(spectrum,2);

This almost works. The program crashes at the last line because spectrum is of type Composite, and the documentation for 2013a is spotty on how to turn Composite data into a matrix (cell2mat does not work). This also does not scale well because the more cores I have, the larger the matrix is, and that large matrix has to get copied to each worker, which then ignores most of the data. Question 1: how do I turn a Composite data type into a useable array?

The second thing I tried was to use a codistributed array.

spmd
    spectrum = codistributed.zeros(K,cores);
    disp(size(getLocalPart(spectrum)))
end

This tells me that each worker has a single vector of size [K 1], which I believe is what I want, but when I try to then meld the above methods

spmd
    spectrum = codistributed.zeros(K,cores);
    n = labindex:cores:K
    N = numel(n);
    for k = 1:N
        grid = X >= rng(1,n(k)) & X <= rng(2,n(k));
        spectrum(grid) = spectrum(grid) + peakfn(X(grid),a(n(k)),b(n(k)),c(n(k))]);        end
    finalSpectrum = gather(spectrum);
end
finalSpectrum = sum(finalSpectrum,2);

I get Matrix dimensions must agree errors. Since it's in a parallel block, I can't use my normal debugging crutch of stepping through the loop and seeing what the size of each block is at each point to see what's going on. Question 2: what is the proper way to index into and out of a codistributed array in an spmd block?

Amro
  • 123,847
  • 25
  • 243
  • 454
craigim
  • 3,884
  • 1
  • 22
  • 42

1 Answers1

4

Regarding question#1, the Composite variable in the client basically refers to a non-distributed variant array stored on the workers. You can access the array from each worker by {}-indexing using its corresponding labindex (e.g: spectrum{1}, spectrum{2}, ..).

For your code that would be: finalSpectrum = sum(cat(2,spectrum{:}), 2);


Now I tried this problem myself using random data. Below are three implementations to compare (see here to understand the difference between distributed and nondistributed arrays). First we start with the common data:

len = 100;    % spectrum length
K = 10;       % number of peaks
X = 1:len;

% random position and shape parameters
a = rand(1,K); b = rand(1,K); c = rand(1,K);

% random peak ranges (lower/upper thresholds)
ranges = sort(randi([1 len], [2 K]));

% dummy peakfn() function
fcn = @(x,a,b,c) x+a+b+c;

% prepare a pool of MATLAB workers
matlabpool open

1) Serial for-loop:

spectrum = zeros(size(X));
for i=1:size(ranges,2)
    r = ranges(:,i);
    idx = (r(1) <= X & X <= r(2));
    spectrum(idx) = spectrum(idx) + fcn(X(idx), a(i), b(i), c(i));
end
s1 = spectrum;

clear spectrum i r idx

2) SPMD with Composite array

spmd
    spectrum = zeros(1,len);
    ind = labindex:numlabs:K;
    for i=1:numel(ind)
        r = ranges(:,ind(i));
        idx = (r(1) <= X & X <= r(2));
        spectrum(idx) = spectrum(idx) + ...
            feval(fcn, X(idx), a(ind(i)), b(ind(i)), c(ind(i)));
    end
end
s2 = sum(vertcat(spectrum{:}));

clear spectrum i r idx ind

3) SPMD with co-distributed array

spmd
    spectrum = zeros(numlabs, len, codistributor('1d',1));
    ind = labindex:numlabs:K;
    for i=1:numel(ind)
        r = ranges(:,ind(i));
        idx = (r(1) <= X & X <= r(2));
        spectrum(labindex,idx) = spectrum(labindex,idx) + ...
            feval(fcn, X(idx), a(ind(i)), b(ind(i)), c(ind(i)));
    end
end
s3 = sum(gather(spectrum));

clear spectrum i r idx ind

All three results should be equal (to within an acceptably small margin of error)

>> max([max(s1-s2), max(s1-s3), max(s2-s3)])
ans =
   2.8422e-14
Amro
  • 123,847
  • 25
  • 243
  • 454
  • This is fantastic, thank you. I'm benchmarking the various approaches now. One question, though. Why use `feval` instead of a direct call to the function? – craigim Jan 15 '14 at 18:04
  • @craigim: It's just a habit, wasn't really necessary here.. You see `parfor` has [some limitations](http://www.mathworks.com/help/distcomp/programming-considerations.html#brdtqhq-1): a code like `f(1)` can be considered ambiguous since it either denotes a function call (`f` is a function passed an input argument `1`) or an array indexing (getting the first element of an array `f`). Using `feval` makes it clear to the parser (and the person reading the code) which one I meant :) – Amro Jan 16 '14 at 03:03
  • I am getting two intermittent errors using the codistributed arrays: `Error using **subsasgn** Assignment has more non-singleton rhs dimensions than non-singleton subscripts` and `Error using **labReceive** A communication mismatch error was encountered: The other lab became idle during labReceive.` It seems to depend on how large `K` is, with large `K` giving the 1st error and smaller `K` giving the 2nd. Sometimes, I hit on a magic `K` that gives no error, but that isn't an option for me. Any thoughts on how to squash these errors or even what they might mean? Neither yields any Google hits. – craigim Feb 07 '14 at 22:00
  • @craigim: I'm not sure, it would help if you can come up with a small reproducible example which fails and throws those errors... For starters, are you sure that the number of ranges `K` is bigger than the number of open workers `numlabs`? Also check that the values in `ranges` are all well behaved – Amro Feb 07 '14 at 23:23
  • The trick is the word "small". I will see what I can come up with, but not until Monday morning. `K` is the number of peaks, and even for my low-`K` test numbers in the thousands, while for high-`K` it can range into the millions. The `Ranges` are all well behaved because in my current code I've been trying to compare the two parallel schemes above and I'm using the exact same input data, with the same code to index the ranges. The composite array method throws no errors and gives the correct results, while the codistributed array method does not. I will figure out a MWE and post it Monday. – craigim Feb 07 '14 at 23:41
  • @craigim: ok ping me with a comment when you update the question, I'll have a look then. In the mean time, you can you try adding assertions in between the lines to pinpoint the exact problem; For example, check that the matrix is distributed as expected: `spmd, size(getLocalPart(spectrum)), end` (they should all be of size `[1,len]`). Also make sure that `peakfn` function returns an output matrix of same size as input. That's because the first error indicates a mismatch in number of dimensions during assignment (e.g: this reproduces the error message `x=zeros(3); x(1,1:2)=rand(1,2,2);`) – Amro Feb 08 '14 at 01:10
  • As for the other error message, it seems like a synchronization issue (again I'm not sure).. Perhaps throw in a [`labBarrier`](http://www.mathworks.com/help/distcomp/labbarrier.html) call to coordinate workers access to distributed arrays. – Amro Feb 08 '14 at 01:30