3

I have N longs which are IDs. For every ID I need to execute a Runnable (ie. I don't care about a return value) and wait until all of them are finished. Each Runnable can take from a few seconds to a few minutes and it's safe to run about 100 threads in parallel.

In our current solution, we use Executors.newFixedThreadPool(), call submit() for each ID and then call get() on each returned Future.

The code works well and it's very simple in that I don't have to deal with threads, complicated waiting logic, etc. It has a downside: memory footprint.

All the still enqueued Runnable's consume memory (much more than 8 bytes than would be needed by a long: these are my Java classes with some internal state), and all the N Future instances consume memory too (these are Java classes with state as well, which I only use for waiting but I don't need the actual results). I looked at a heap dump and I would estimate that a little over 1 GiB of memory is taken up for N=10 million. 10 million longs in an array would only consume 76 MiB.

Is there a way to solve this problem with only holding the IDs in memory, preferably without resorting to low-level concurrent programming?

fejesjoco
  • 11,763
  • 3
  • 35
  • 65
  • You'd have to generate only e.g. 10k Runnables and submit them to the executor, and once half of them are finished generate another 5k, and so on. – ciamej Feb 04 '19 at 10:11
  • Would there be a way to submit just a `Runnable` wrapping your `Long` argument, and have your `Runnable` convert that long to your custom java class with internal state ? That would lower the footprint of the runnable queues, and lower that of the `Future`s, by having none of them hold on to your "internal state". In a word : defer the creation of heavy-weight state to inside the runnable. – GPI Feb 04 '19 at 10:49
  • @GPI that would help somewhat but the overhead of an empty object is 16 bytes so that's still a lot. – fejesjoco Feb 04 '19 at 11:32
  • "I have N longs" - in what form? In a file, or in a server? – Alexei Kaigorodov Feb 04 '19 at 14:41

4 Answers4

1

Yes: you could have a shared queue of longs. You submit n Runnables to the executor, where n is the number of threads in the executor, at the end of the run method, you get the next long from the queue, and resubmit a new Runnable.

Maurice Perry
  • 9,261
  • 2
  • 12
  • 24
1

Instead of creating million of Runnables, create specific thread pool which takes longs as tasks. Instead of waiting tasks to finish with Future.get(), use CountdownLatch.

That thread pool could be implemented like this:

int N = 1000000;// number of tasks;
int T = 100; // number of threads;
CountdownLatch latch = new CountdownLatch(N);
ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<>();

for (int k=0; k<N; k++) {
   queue.put(createNumber(k));
}
for (int k=0; k<T; k++) {
  new WorkingThread().start();
}
CountdownLatch.await();

class WorkingThread extends Thread {
  public void run() {
      while (latch.getCount() != 0) {
           processNumber(queue.take());
           latch.countDown();
      }
  }
}
Alexei Kaigorodov
  • 13,189
  • 1
  • 21
  • 38
  • 1
    I think you have a race condition on the acquisition of work units. Scenario : N=1, T=1000 and processNumber takes 1 minute. 1000 threads are going to see that the `latch.getCount()` is 1 but there will only ever be 1 task to get, 999 threads will then wait indifinitely inside queue.take(). Although I do think that consummer/producer is a good solution to the issue. – GPI Feb 04 '19 at 11:00
  • 1
    We could replace it with checking if the queue is empty and then quit. I think I'm going with this solution, with the change that I will still use an executor for these threads to get error handling for them. – fejesjoco Feb 04 '19 at 11:29
  • @fejesjoco in the comments of the question, you stated that having the 10M ids at "16 bytes per empty object" in the heap are "still a lot" of overhead. In this solution, you still have these 10M objects in the waiting queue... – GPI Feb 04 '19 at 14:33
  • @fejesjoco what kind of error handling do you want? End the whole process on the first task, count the error number, print error messages, or retry failed tasks? – Alexei Kaigorodov Feb 04 '19 at 14:44
1

What about using ExecutorCompletionService? Something like the following (which may contain bugs, I didn't test it):

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.function.LongFunction;

public class Foo {

  private final ExecutorCompletionService<Void> completionService;
  private final LongFunction<Runnable> taskCreator;
  private final long maxRunning; // max tasks running or queued

  public Foo(Executor executor, LongFunction<Runnable> taskCreator, long maxRunning) {
    this.completionService = new ExecutorCompletionService<>(executor);
    this.taskCreator = taskCreator;
    this.maxRunning = maxRunning;
  }

  public synchronized void processIds(long[] ids) throws InterruptedException {
    int completed = 0;

    int running = 0;
    for (long id : ids) {
      if (running < maxRunning) {
        completionService.submit(taskCreator.apply(id), null);
        running++;
      } else {
        completionService.take();
        running--;
        completed++;
      }
    }

    while (completed < ids.length) {
      completionService.take();
      completed++;
    }

  }

}

Another version of the above could use Semaphore and CountDownLatch, rather than a CompletionService.

public static void processIds(long[] ids, Executor executor,
                              int max, LongFunction<Runnable> taskSup) throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(ids.length);
  Semaphore semaphore = new Semaphore(max);

  for (long id : ids) {
    semaphore.acquire();

    Runnable task = taskSup.apply(id);
    executor.execute(() -> {
      try {
        task.run();
      } finally {
        semaphore.release();
        latch.countDown();
      }
    });

  }

  latch.await();
}
Slaw
  • 37,820
  • 8
  • 53
  • 80
  • This is very similar to the plain ExecutorService solution except the class collects a buffer of Future's instead of us. So the counting logic alone could be adopted without using ExecutorCompletionService. But both parts of this solution are interesting. – fejesjoco Feb 04 '19 at 12:14
  • Edited answer to add a slightly modified option. It's basically the same thing but removes the need for a blocking queue (i.e. `ExecutorCompletionService`). Not sure if you consider `Semaphore` and `CountDownLatch` "low level", but I don't think it's overly complicated. – Slaw Feb 04 '19 at 19:54
1

This is the kind of thing I'd usually do with a Producer/Consummer pattern, and a BlockingQueue coordonnating the two, or, using Akka actors if I have the at hand on the project.

But I figured I'd suggest something quiete a bit different, relying on Java's Stream behavior.

The intuition is that the lazy execution of streams will be used to throttle the creation of work units, futures, and their results.

public static void main(String[] args) {
    // So we have a list of ids, I stream it
    // (note : if we have an iterator, you could group it by a batch of, say 100,
    // and then flat map each batch)
    LongStream ids = LongStream.range(0, 10_000_000L);
    // This is were the actual tasks will be dispatched
    ExecutorService executor = Executors.newFixedThreadPool(4);

    // For each id to compute, create a runnable, which I call "WorkUnit"
    Optional<Exception> error = ids.mapToObj(WorkUnit::new)
             // create a parralel stream
             // this allows the stream engine to launch the next instructions concurrently
            .parallel()
            // We dispatch ("parallely") the work units to a thread and have them execute
            .map(workUnit -> CompletableFuture.runAsync(workUnit, executor))
            // And then we wait for the unit of work to complete
            .map(future -> {
                try {
                    future.get();
                } catch (Exception e) {
                    // we do care about exceptions
                    return e;
                } finally {
                    System.out.println("Done with a work unit ");
                }
                // we do not care for the result
                return null;
            })
            // Keep exceptions on the stream
            .filter(Objects::nonNull)
            // Stop as soon as one is found
            .findFirst();


    executor.shutdown();
    System.out.println(error.isPresent());
}

To be honest, I'm not quiete sure the behaviour is guaranteed by the specification, but from my experience it works. Each one of the parallel "chunck" grabs a few ids, the feed it to the pipeline (map to a work unit, dispatch to the thread pool, wait for a result, filter for exceptions), which means an equilibrium is reached fairly quickly balancing the number of active work units to that of the executor.

If the number of parallel "chuncks" is to be fine tuned, one should follow up here : Custom thread pool in Java 8 parallel stream

GPI
  • 9,088
  • 2
  • 31
  • 38
  • Streams to the rescue! This seems like the simplest solution. There's only one problem with throughput. I create the executor with 100 threads, but I'm still being limited by the common ForkJoinPool used by streams. With 12 CPUs it only reaches a parallelism of 64 threads. Overriding the default pool size to 100 fixes the issue. See https://stackoverflow.com/a/29272776/552139. – fejesjoco Feb 04 '19 at 22:39