5

Let's say I have 5 threads that must make a combined total of 1,000,000 function calls for a parallel Monte Carlo Method program. I assigned 1,000,000 / 5 function calls for each of the 5 threads. However, after many tests (some tests ranging up to 1 trillion iterations) I realized that some threads were finishing much faster than others. So instead I would like to dynamically assign workload to each of these threads. My first approach involved a AtomicLong variable that was set to an initial value of, let's say, 1 billion. After each function call, I would decrement the AtomicLong by 1. Before every function call the program would check to see if the AtomicLong was greater than 0, like this:

AtomicLong remainingIterations = new AtomicLong(1000000000);
ExecutorService threadPool = Executors.newFixedThreadPool(5);

for (int i = 0; i < 5; i++) {//create 5 threads
   threadPool.submit(new Runnable() {
       public void run() {
          while (remainingIterations.get() > 0) {//do a function call if necessary 
             remainingIterations.decrementAndGet();//decrement # of remaining calls needed
             doOneFunctionCall();//perform a function call
          }
       }
   });
}//more unrelated code is not show (thread shutdown, etc.)

This approach seemed to be extremely slow, am I using AtomicLong correctly? Is there a better approach?

Arman
  • 655
  • 2
  • 7
  • 23

3 Answers3

3

am I using AtomicLong correctly?

Not quite. The way you are using it, two threads could each check remainingIterations, each see 1, then each decrement it, putting you at -1 total.

As for you slowness issue, it is possible that, if doOneFunctionCall() completes quickly, your app is being bogged down by the lock-contention surrounding your AtomicLong.

The nice thing about an ExecutorService is that it logically decouples the work being done from the threads that are doing it. You can submit more jobs than you have threads, and the ExecutorService will execute them as soon as it is able:

ExecutorService threadPool = Executors.newFixedThreadPool(5);

for (int i = 0; i < 1000000; i++) {
   threadPool.submit(new Runnable() {
       public void run() {
           doOneFunctionCall();
       }
   });
}

This might be balancing your work a bit too much in the other direction: creating too many short-lived Runnable objects. You can experiment to see what gives you the best balance between distributing the work and performing the work quickly:

ExecutorService threadPool = Executors.newFixedThreadPool(5);

for (int i = 0; i < 1000; i++) {
   threadPool.submit(new Runnable() {
       public void run() {
           for (int j = 0; j < 1000; j++) {
               doOneFunctionCall();
           }
       }
   });
}
Andrew Rueckert
  • 4,858
  • 1
  • 33
  • 44
  • Thank you for clearing up my misconceptions about the ExecutorService class. This was very helpful! – Arman Sep 28 '16 at 00:30
0

Look at ForkJoinPool. What you are attempting is called divide-and-conquer. In F/J you set the number of threads to 5. Each thread has a queue of pending Tasks. You can evenly set the number of Tasks for each thread/queue and when a thread runs out of work it work-steals from another thread's queue. This way you don't need the AtomicLong.

There a many examples of using this Class. If you need more info, let me know.

edharned
  • 1,884
  • 1
  • 19
  • 20
  • I've never seen the ForkJoinPool class before (still learning), if you could point me in a direction with it that would be very helpful. – Arman Sep 27 '16 at 22:16
  • Google java.util.concurrent.ForkJoinPool or just java divide-and-conquer. There are so many examples out there that you should be able to clone one. Even looking at the JavaDoc for java.util.concurrent.RecursiveAction or java.util.concurrent.RecursiveTask. – edharned Sep 27 '16 at 22:29
  • Here's is the official tutorial: https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html Lots of related links on the side bar on the left. – markspace Sep 27 '16 at 22:55
0

An elegant approach to avoid the creation of 1B tasks is to use a synchronous queue and a ThreadPoolExecutor, doing so submit will be blocked until a thread becomes available. I didn't test actual performance though.

BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService threadPool = new ThreadPoolExecutor(5, 5,
    0L, TimeUnit.MILLISECONDS,
    queue);
for (int i = 0; i < 1000000000; i++) {
    threadPool.submit(new Runnable() {
        public void run() {
            doOneFunctionCall();
        }
    });
}
MarcC
  • 413
  • 3
  • 12