6

I was running some tests with parallel processing and made a program that given a matrix of integers re-calcutes each position's value based on the neighbours.

I needed a copy of the matrix so the values wouldn't be overriden and used a CyclicBarrier to merge the results once the partial problems were solved:

CyclicBarrier cyclic_barrier = new CyclicBarrier(n_tasks + 1, new Runnable() {
    public void run() {
        ParallelProcess.mergeResult();
    }
});
ParallelProcess p = new ParallelProcess(cyclic_barrier, n_rows, r_cols); // init

Each task is assigned a portion of the matrix: I'm splitting it in equals pieces by rows. But it might happen that the divisions are not exact so there would be a small piece corresponding to the lasts row that wouldn't be submitted to the thread pool.

Example: if I have 16 rows and n_tasks = 4 no problem, all 4 will be submitted to the pool. But if I had 18 instead, the first 16 ones will be submitted, but not the last two ones.

So I'm forcing a submit if this case happens. Well, I'm not submitting actually, because I am using a fixed thread pool that I created like this ExecutorService e = Executors.newFixedThreadPool(n_tasks). Since all the slots in the pool are occupied and the threads are blocked by the barrier (mybarrier.await() is called in the run method) I couldn't submit it to the pool, so I just used Thread.start().

Let's go to the point. Since I need to take into consideration for the CyclicBarrier the possibility of that chunk remaining, the number of parties must be incremented by 1.

But if this case didn't happen, I would be one party short to trigger the barrier.

What's my solution?:

if (lower_limit != n_rows) { // the remaining chunk to be processed
    Thread t = new Thread(new ParallelProcess(lower_limit, n_rows));
    t.start();
    t.join();
}
else {
    cyclic_barrier.await();
}

I feel like I am cheating when using the cyclic_barrier.await() trick to raise the barrier by force.

Is there any other way I could approach this problem so I didn't have to do what I'm doing?

dabadaba
  • 9,064
  • 21
  • 85
  • 155
  • What about this: In the worker last to arrive to barrier (check via value returned by `barrier.await()`), you can check if there are items left to proceed. If yes, worker can submit new task to handle left-over items. Maybe, you'll still need to use some additional sync inside `mergeResult()` beyond `barrier.await()` -- just like you show with `t.join()`. – Victor Sorokin Jun 23 '14 at 18:10
  • @VictorSorokin I don't understand how checking the return value of `await()` would help me to check if there is a left portion. – dabadaba Jun 23 '14 at 18:20
  • Aren't values of `rows` and `n_threads` known beforehand (before you start submitting tasks)? – Victor Sorokin Jun 23 '14 at 18:21
  • Yes of course they are. – dabadaba Jun 23 '14 at 18:22
  • Then, why you can't add code to your worker `run()` which will check if `await()` returned 0 (last to arrive, hence barrier is released) and if it's 0, then worker will check whether all items were processed? – Victor Sorokin Jun 23 '14 at 18:24
  • Oh yeah I get what you're saying now. But exactly as you said, I would need some sort of synchronization in `mergeResult` because there would still be a portion of the solution to be resolved. It does not seem proper to me to call a merging method before all partial solutions are effectively calculated. – dabadaba Jun 23 '14 at 18:26
  • Yes, additional sync will be needed. Ok then, that's why I suggested in comments :) – Victor Sorokin Jun 23 '14 at 18:28
  • I don't get the point. You can simply check beforehand, how many subtasks you'll need, don't you? (E.g. 18 / 4 is 4 with a remainder of 2, so 5 subtasks are required) – isnot2bad Jun 23 '14 at 22:14

1 Answers1

1

Though this doesn't answer your question about CyclicBarriers, can I recommend using a Phaser? This does have the ability to include the number of parties, and it also allows you to run the mergeResult when a phase is tripped.

So, before you execute an async calculation, simply register. Then inside that calculation have the thread arrive on the phaser. When all threads have arrived, it will advance the phase and can invoke an overriden method onAdvance.

The submission:

ParallelProcess process = new ParallelProcess(lower_limit, n_rows));
phaser.register();
executor.submit(process);

The processor

public void run(){
   //do stuff
   phaser.arrive();
}

The phaser

Phaser phaser = new Phaser(){
    protected boolean onAdvance(int phase, int registeredParties) {
        ParallelProcess.mergeResult(); 
        return true;
    }
}
John Vint
  • 39,695
  • 7
  • 78
  • 108