0

I'm receiving a big file, with N entries. For each entry, I'm creating a new thread. I need to wait for all the N threads to be terminated.

At the beginning I was using Phaser, but its implementation is limited to 65K parties. So, is blowing up because N could be like 100K.

Then, I tried CountDownLatch. This works fine, very simple concept and very simple implementation. But I don't know the number of N.

Phaser is my solution, but is has that limit.

Any ideas?

This post is related: Flexible CountDownLatch?

Community
  • 1
  • 1
Perimosh
  • 2,304
  • 3
  • 20
  • 38

3 Answers3

1

Sounds like the problem you are trying to solve is processing a large amount of tasks as fast as possible and wait for the processing to finish.

The problem with processing a large amount of tasks simultaneously is that it could cause too many context switches and would essentially cripple your machine and slow the processing down above a certain (hardware dependent) number of concurrent threads. This means you need to have an upper limit on the concurrent working threads being executed.

Phaser and CountDownLatch are both synchronisation primitives, their purpose is providing access control to critical code blocks, not managing parallel execution.

I would use an Executor service in this case. It supports the addition of tasks (in many forms, including Runnable).

You can easily create an ExecutorService using the Executors class. I'd recommend using a fixed size thread pool for this with 20-100 max threads - depending on how CPU intensive your tasks are. The more computation power required for a task, the less number of parallel threads can be processed without serious performance degradation.

There are multiple ways to wait for all the tasks to finish:

  • Collect all the Future instances returned by the submit method and simply call get on all of them. This ensures that each of the tasks are executed by the time your loop finished.
  • Shut down the executor service and wait for all the submitted tasks to finish. The downside of this method is that you have to specify a maximum amount of time to wait for the tasks to finish. Also, it's less elegant, you don't always want to shut the Executor down, it depends on if you're writing a single shot application or a server which keeps running afterwards - in case of a server app, you'll definitely have to go with the previous approach.

Finally, here is a code snippet illustrating all this:

List<TaskFromFile> tasks = loadFileAndCreateTasks();
ExecutorService executor = Executors.newFixedThreadPool(50);

for(TaskFromFile task : tasks) {
    // createRunnable is not necessary in case your task implements Runnable
    executor.submit(createRunnable(task));
}

// assuming single-shot batch job
executor.shutdown();
executor.awaitTermination(MAX_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS);
Zoltan
  • 808
  • 5
  • 6
  • I have already some of this implemented. I'm using a ExecutorService, with a thread pool and semaphores to avoid the CPU blowing up.. but, I will analyze the "awaitTermination". Thanks! – Perimosh Oct 28 '15 at 20:47
1

ReusableCountLatch is a CountDownLatch alternative that allows increments as well.

The usage is as follows:

ReusableCountLatch latch = new ReusableCountLatch(); // creates latch with initial count 0
ReusableCountLatch latch = new ReusableCountLatch(10); // creates latch with initial count 10

latch.increment(); // increments counter

latch.decrement(); // decrement counter

latch.waitTillZero(); // blocks until counts falls to zero

boolean succeeded = latch.waitTillZero(200, MILLISECONDS); // waits for up to 200 milliseconds until count falls to zero

int count = latch.getCount(); // gets actual count

To use it just add this gradle/maven dependency to your project: 'com.github.matejtymes:javafixes:1.3.1'

More details can be found here: https://github.com/MatejTymes/JavaFixes

Matej Tymes
  • 1,694
  • 1
  • 16
  • 30
0

With AtomicInteger you can easily achieve the same. Initialize with 1 and increment with every new thread. Once done in both worker and producer, decrement and get. If zero run your finishing Runnable.

Zbynek Vyskovsky - kvr000
  • 18,186
  • 3
  • 35
  • 43
  • Is not the case. I don't have workers and consumers. I have N threads, and I need to wait for all the N to be finished. Let's says I initialize the atomic integer in zero, then I fire N threads, and at the beginning of each one, there is a +1. If for some scheduling reason, a thread A increases +1 and decreases -1 consecutively, and there is no other thread B in the middle of these 2 operations, I will be done, when actually I'm not. – Perimosh Oct 28 '15 at 20:44
  • I called the launching thread as producer. You have to inc *before* starting the thread. You still need start with 1 and dec from the main once you launched all the threads, otherwise similar to above could happen. – Zbynek Vyskovsky - kvr000 Oct 28 '15 at 20:50
  • Decreasing from the main one could run in the same situation. MAIN thread put the counter in 1. Thread A put in 2, then thread A put in 1. Then MAIN thread decrease it to zero. Threads B, C... may execute, but MAIN thread will not wait for them. – Perimosh Oct 28 '15 at 21:07
  • Again, the inc() must be done _before_ thread.start(). So in the MAIN thread (and no other) it may not go to zero until the MAIN dec() is called. – Zbynek Vyskovsky - kvr000 Oct 28 '15 at 21:11
  • Sorry for not understand you, but, how can you get the MAIN thread wait for the other N threads? – Perimosh Oct 29 '15 at 03:48
  • For the barrier part you can use Object, synchronize it once fnished and if counter becomes zero call notifyAll, for others call wait. – Zbynek Vyskovsky - kvr000 Oct 29 '15 at 06:10
  • Now I get it. If I make sure before call "wait" I send the N threads, the Atomic will be set to N (or N+1, is regardless). Then at the end of each thread, the Atomic will "get and decrease". If Atomic == 0, call notifyAll. The notifyAll should be called only one time, and the last thread should do it. – Perimosh Oct 29 '15 at 14:19
  • Exactly. However, there is still one issue I wasn't aware of. You have to embed the `atomic.decAndGet()` in synchronized section before invoking `wait()` or at least check for current value. Otherwise it could happen that the last thread could call `notifyAll()` before the previous one calls `wait()`. – Zbynek Vyskovsky - kvr000 Oct 29 '15 at 14:35