6

I wrote a function that generates a labyrinth based on randomness. Most of the time, this function is very fast. But every once in a while, due to bad luck with the random numbers, it takes a couple of seconds.

I would like to start this function multiple times in parallel and let the fastest function "win".

Does the Scala standard library (or the Java standard library) provide a fitting tool for this job?

fredoverflow
  • 256,549
  • 94
  • 388
  • 662
  • 4
    It might be easier (and more efficient) to find out why the program sometimes takes a couple of seconds and fix it... – user253751 Sep 27 '14 at 11:35
  • you may want [ExecutorCompletionService](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html), but i agree with @immibis – hsluo Sep 27 '14 at 11:55
  • @immibis It seems labyrinth generation (with no dead ends) is just a hard problem, and every solution I have found/can think of requires heavy backtracking. – fredoverflow Sep 27 '14 at 12:00

3 Answers3

5

You can use Future:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val futures = for (_ <- 1 to 4) yield Future { /* computation */ }
val resultFuture = Future.firstCompletedOf(futures)

If you want to block (I presume you do), you can use Await.result:

import scala.concurrent.Await
import scala.concurrent.duration.Duration

val result = Await.result(resultFuture, Duration.Inf)
  • Is there an easy way to cancel the pending calculations when the first calculation finishes? – fredoverflow Sep 27 '14 at 23:28
  • @FredOverflow see this question: http://stackoverflow.com/questions/16009837/how-to-cancel-future-in-scala –  Sep 27 '14 at 23:34
2

A java 8 solution with CompletableFuture:

public class FirstDoneWithCompletableFutureEx {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        int jobs = 10;
        CompletableFuture<?>[] futures = new CompletableFuture[jobs];
        for (int i = 0; i < jobs; i++) {
            futures[i] = CompletableFuture.supplyAsync(() -> {
                //computation    
                return new Object();
            });
        }

        //first job done
        Object firstDone = CompletableFuture.anyOf(futures).get();
    }
}

A java 5,6,7 solution with CompletionService:

public class FirstDoneWithCompletionServiceEx {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        int jobs = 10;
        ExecutorService executorService = Executors.newFixedThreadPool(jobs);
        CompletionService<Object> completionService = new ExecutorCompletionService<>(executorService);

        for (int i = 0; i < jobs; i++)
            completionService.submit(
                    new Callable<Object>() {
                        @Override
                        public Object call() throws Exception {
                            //computation
                            return new Object();
                        }
                    }
            );

        //get first job done
        Object firstDone = completionService.take().get();

        executorService.shutdownNow();
    }
}
dcernahoschi
  • 14,968
  • 5
  • 37
  • 59
0

7 years later, I stumbled upon ExecutorService.invokeAny, which does exactly what I want:

Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do (before the given timeout elapses).

Upon normal or exceptional return, tasks that have not completed are cancelled.

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

Implementation sketch:

class LabyrinthGenerator implements Callable<Labyrinth> {
    private static final int N = 16;
    private static final ExecutorService pool = Executors.newFixedThreadPool(N);

    public static Labyrinth generate() {
        while (true) {
            List<LabyrinthGenerator> generators = Stream.generate(LabyrinthGenerator::new)
                                                        .limit(N)
                                                        .collect(Collectors.toList());
            try {
                return pool.invokeAny(generators, 1, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException ex) {
                // all attempts failed or timed out, try again
            }
        }
    }

    @Override
    public Labyrinth call() throws Exception {
        // ...
        if (Thread.currentThread().isInterrupted()) {
            // some other generator found a result, abort
        }
        // ...
    }
}
fredoverflow
  • 256,549
  • 94
  • 388
  • 662