4

I have an slow, CPU intensive operation: doWork(int x), that is called with a single integer parameter with different values, as follows:

static String doWork(int x) {
  // do work that depends on i, takes ~60 seconds
  ...
}

public static void main(String args[]) {
  for (int i = 1; i < 100; i++) {
    System.println(doWork(i));
  }
}

As each doWork() call completes the result is output to the console. I'd like parallelize this - all of the doWork() calls are independent and don't mutate any shared state. Now, I can do it the old way, messing around with ExecutorSevice and Future.get() and so on, but I'd like to do it more cleanly with streams1.

So something like this seems like it could almost work:

public static void main(String args[]) {
    IntStream.rangeClosed(1, 100).parallel()
        .forEach(i -> System.out.println(doWork(i)));
}

... but the problem is that I want to preserve the output order on the console (the line for doWork(1) should come first, and so on). I can't use forEachOrdered() because that serializes the whole operation: only a single thread would be used. The root of the problem is that forEachOrdered provides too strong a guarantee: that the consumer method is called sequentially on one element at a time. I want the consumers to be called in parallel, but the output to be in-order.

So I should probably look at a map -> collect type idiom instead, where I collect the output from every doWork() call into a string and print it once:

public static void main(String[] args) {
    System.out.println(IntStream.rangeClosed(1, 100).parallel()
        .mapToObj(Main::doWork).collect(Collectors.joining("\n")));
}

Almost! The collect() method keeps encounter order, so my elements are ordered. The problem now is that there is no incremental output - the whole job has to finished before any output occurs. I really want to preserve the behavior where the updates dribble out onto the console.

I guess I want some kind of ordered consumption terminal operation, that doesn't force the whole pipeline to be ordered. Basically it would collect results internally like a normal collector, but when the current "leftmost" element gets collected, it would pass it through to the consumer - so the consumer sees a stream of ordered elements, but everything is still happening in parallel.

Is there anything out there like? It doesn't seem possible to build it on the existing Collector interface, since it doesn't give you a way to determine what the order of elements is.


1 ...and perhaps even more efficiently since fork/join is used under the covers, so maybe I get to make use of the some of the heuristics built in to that framework?

BeeOnRope
  • 60,350
  • 16
  • 207
  • 386

2 Answers2

6

You're pretty close. Just combine the map and forEachOrdered solutions:

IntStream.rangeClosed(1, 100)
         .parallel()
         .mapToObj(Main::doWork)
         .forEachOrdered(System.out::println);
shmosel
  • 49,289
  • 6
  • 73
  • 138
  • 2
    Hah, awesome. It works. I had thought of it but somehow convinced myself that it doesn't work because `forEachOrdered` makes the whole operation serial - but it doesn't, because it only needs to call the consumer (`println`) serially, but the other parts can go in parallel. Duh. – BeeOnRope Dec 16 '16 at 00:23
  • 1
    FWIW, I didn't actually get a good result in my case, because the way fork/join chooses its elements is pretty random (e.g., starting with the 70th element) rather than preferring earlier elements (which probably makes sense because of how the reductions work or something), so you end up waiting forever for the first output (because the 1st element is like the 80th one processed). So I guess `ExecutorService` and `Future.get()` it is then :). This is still the obvious, correct answer though. – BeeOnRope Dec 16 '16 at 00:32
  • @BeeOnRope: it’s not random, it’s splitting the range in the middle, then splitting the resulting ranges again in their middles, until enough chunks are created to have work for every CPU/core. Combining this with `forEachOrdered` does indeed reduce the potential. The alternative would be buffering, which your initial `collect` based solution already does. – Holger Dec 16 '16 at 09:54
  • The reason, why the work is split that way instead preferring earlier elements or even assigning each element to a different thread, is discussed [here](http://stackoverflow.com/q/32526921/2711488) – Holger Dec 16 '16 at 10:09
  • @Holger - right, well I mean "random appearing", since the result of this splitting, combined with the other heuristics in fork/join, plus the runtime state (e.g., how many of the common pool threads are already in use) makes the ordering unpredictable. Certainly there also doesn't seem to be any bias towards keeping stuff in order where it doesn't hurt the parallelism (i.e., it seems the divided lists are tackled in kind of a back-to-front manner). – BeeOnRope Dec 16 '16 at 18:38
  • @Holger - about buffering, yes - if buffering (the entire output) was OK, many solutions are straightforward. The problem is each task takes minutes and there are 100s of them so I really want to see the partial results as they proceed (often I use the partial results before a run even finishes, and then terminate the run). – BeeOnRope Dec 16 '16 at 18:39
2

FWIW, this is what I ended up with, since the answer by shmosel is correct but didn't provide particularly useful "almost FIFO" order because of how parallel streams work:

IntStream.rangeClosed(1, 100)
    .mapToObj(i -> CompletableFuture.supplyAsync(() -> doWork(i)))
    .collect(Collectors.toList())
    .forEach(f -> System.out.println(f.join()));

Basically it uses the stream to submit, in order, all the work to the default executor (same as the one used for parallel streams). Then it collect()s the resultant futures (that's needed to actually have the job submissions all actually occur), and then it iterates (the forEach call) the resultant Future objects, getting them one by one.

This implementation causes the work to be done in a more or less FIFO manner (more or less, of course if you have 3 threads in the pool, about 3 jobs will be running at once, but they are generally the first three). In the case of 3 threads and jobs that take about the same amount of time, you'll see the output appear, in order, in spurts of 3 results at a time.

BeeOnRope
  • 60,350
  • 16
  • 207
  • 386
  • 1
    There is no need for a 3rd party library function like `Futures.getUnchecked()`. Just use `join()` instead of `get()`. – Holger Dec 16 '16 at 09:55
  • @Holger : huh, cool - I hadn't noticed that method. It's exactly what I was looking for. – BeeOnRope Dec 16 '16 at 18:31