7

I have request and multiple threads, which are looking for result in different ways and each thread should get some result at some point.. I need to take the result from the first finished thread, return this result and kill all of the remaining threads. I also have timeout when I'll return some default result..

I can think of two solutions, none of which seems "correct" to me..

1) Loop through the tasks, ask if they are finished, sleep for a while and return the first finished task it founds..

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.log4j.LogManager;

public class App {    
    public static final ExecutorService executors = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        ArrayList<Future<String>> taskList = new ArrayList<>();
        taskList.add(executors.submit(new Task1()));
        taskList.add(executors.submit(new Task2()));

        String result = null;
        long start = System.currentTimeMillis();
        long timeout = 1000;

        while ((start + timeout) < System.currentTimeMillis()) {
            try {
                for (Future<String> future : taskList) {
                    if (future.isDone()) {
                        result = future.get();
                        break;
                    }
                }

                if (result != null) {
                    break;
                }

                Thread.sleep(10);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

        if (result == null) {
            result = "default..";
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        // find result in one way
        return new String("result..");
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        // find result in some other way
        return new String("result..");
    }
}

2) Monitor each task with another thread by calling future.get(timeout, TimeUnit.MILLISECONDS); and the first finished thread would then call future.cancel(true); for all other threads...

The first solution seems to me like wasting processor time, and the second seems to me like wasting threads..

Finally, the Question is: Is there any better solution?

Thank you in advance

Edit: Thank you all for answers, I have solved this using "John H" 's answer:

There is a built in function that does this: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#invokeAny%28java.util.Collection,%20long,%20java.util.concurrent.TimeUnit%29

This will invoke all the tasks you give it, and wait for the first one to return an answer up to a time limit. If none of them return a result in time you can catch the TimeoutException and return the default value. Otherwise you can use the first value it returns and it will take care of cancelling the rest of the tasks.

Jaroslav
  • 867
  • 12
  • 19

3 Answers3

5

There is a built in function that does this: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#invokeAny%28java.util.Collection,%20long,%20java.util.concurrent.TimeUnit%29

This will invoke all the tasks you give it, and wait for the first one to return an answer up to a time limit. If none of them return a result in time you can catch the TimeoutException and return the default value. Otherwise you can use the first value it returns and it will take care of cancelling the rest of the tasks.

John H
  • 702
  • 3
  • 7
1

How about a shared CountDownLatch?

Your main thread will wait at cdl.await(30, TimeUnit.SECONDS); and the worker threads will call cdl.countDown(); when they're done.

Another option is to use a shared Exchanger, which would allow you to retrieve the result easily. Although there might be a small chance that with exchanger two worker threads would exchange the result between them, but a simple workaround would be to check that a worker thread retrieves "dummyString", therefore knowing that it was the main thread that got the result.

Main thread:

myExchanger.exchange("dummyString", 30, TimeUnit.SECONDS);.

Worker threads:

while(!myExchanger.exchange(result).equals("dummyString"));

Kayaman
  • 72,141
  • 5
  • 83
  • 121
0

You could consider a SynchronousQueue.

Main thread start all of the threads, giving each a reference to the queue, then just does a take on the queue (which will block until a worker thread does a put). First thread that discovers a result posts to the queue, releasing the main thread.

Main thread then iterates over all workers cancelling them.

A default can be achieved with a thread that just waits for the timeout and puts the default value.

This would also work with Runnables.

Sample code - seems to work but cancel does not.

public static void main(String[] args) {
  try {
    ArrayList<Future<String>> taskList = new ArrayList<>();
    BlockingQueue q = new SynchronousQueue();
    taskList.add(executors.submit(new Task1(q)));
    taskList.add(executors.submit(new Task2(q)));

    Object took = q.take();
    for (Future<String> task : taskList) {
      if (!task.isDone()) {
        task.cancel(true);
      }
    }
    System.out.println("Got " + took);
  } catch (InterruptedException ex) {
    Logger.getLogger(Test.class.getName()).log(Level.SEVERE, null, ex);
  }
}

It seems cancel is insufficiient. See here for alternatives.

Community
  • 1
  • 1
OldCurmudgeon
  • 64,482
  • 16
  • 119
  • 213