0

In my code I have to run a task that makes heavy use of recursion and parallel stream processing in order to go deep into a tree of possible games moves and decide what's the best move. This takes a lot of time, so to prevent the user from waiting for too long for the computer to "think" I want to set a time out of, say, 1000 milliseconds. If the best move is not found withing 1000 msec then the computer will play a random move. My problem is that although I call cancel on Future (with may interrupt set to true), the task is not interrupted and the busy threads keep running in the background. I tried to periodically check for isInterrupted() on the current and then try to bail out, but this didn't help. Any ideas?

Below is my code:

public Move bestMove() {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Callable<Move> callable = () -> bestEntry(bestMoves()).getKey();
    Future<Move> future = executor.submit(callable);
    try {
        return future.get(1000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        System.exit(0);
    } catch (ExecutionException e) {
        throw new RuntimeException(e);
    } catch (TimeoutException e) {
        future.cancel(true);
        return randomMove();
    }
    return null;
}

private Move randomMove() {
    Random random = new Random();
    List<Move> moves = state.possibleMoves();
    return moves.get(random.nextInt(moves.size()));
}

private <K> Map.Entry<K, Double> bestEntry(Map<K, Double> map) {
    List<Map.Entry<K, Double>> list = new ArrayList<>(map.entrySet());
    Collections.sort(list, (e1, e2) -> (int) (e2.getValue() - e1.getValue()));
    return list.get(0);
}

private <K> Map.Entry<K, Double> worstEntry(Map<K, Double> map) {
    List<Map.Entry<K, Double>> list = new ArrayList<>(map.entrySet());
    Collections.sort(list, (e1, e2) -> (int) (e1.getValue() - e2.getValue()));
    return list.get(0);
}

private Map<Move, Double> bestMoves() {
    Map<Move, Double> moves = new HashMap<>();
    state.possibleMoves().stream().parallel().forEach(move -> {
        if (!Thread.currentThread().isInterrupted()) {
            Game newState = state.playMove(move);
            Double score = newState.isTerminal() ? newState.utility()
                    : worstEntry(new (newState).worstMoves()).getValue();
            moves.put(move, score);
        }
    });
    return moves;
}

private Map<Move, Double> worstMoves() {
    Map<Move, Double> moves = new HashMap<>();
    state.possibleMoves().stream().parallel().forEach(move -> {
        if (!Thread.currentThread().isInterrupted()) {
            Game newState = state.playMove(move);
            Double score = newState.isTerminal() ? -newState.utility()
                    : bestEntry(new (newState).bestMoves()).getValue();
            moves.put(move, score);
        }
    });
    return moves;
}

ps: I also tried without "parallel()" but again there is still a single thread left running.

Thanks in advance.

Stefan Zobel
  • 3,182
  • 7
  • 28
  • 38
user3186301
  • 81
  • 1
  • 5

2 Answers2

1

Thank you all for your answers. I think I found a simpler solution. First of all , I think the reason that future.cancel(true) didn't work is because it probably only set the interrupted flag on the thread that started the task. (that is, the thread that is associated with the future). However because the task itself uses parallel stream processing, it spawns workers on different threads which are never get interrupted, and therefore I cannot periodically check the isInterrupted() flag. The "solution" (or maybe more of work-around) that I found is to keep my own interrupted flag in my algorithm's objects, and manually set it to true when the task is cancelled. Because all threads work on the same instanced they all have access to the interrupted flag and they obey.

user3186301
  • 81
  • 1
  • 5
0

Future.cancel just set the thread as interrupted, then your code must treat it as follow:

public static void main(String[] args) throws InterruptedException {

    final ExecutorService executor = Executors.newSingleThreadExecutor();
    final Future<Integer> future = executor.submit(() -> count());
    try {
        System.out.println(future.get(1, TimeUnit.SECONDS));
    } catch (Exception e){
        future.cancel(true);
        e.printStackTrace();
    }finally {
        System.out.printf("status=finally, cancelled=%s, done=%s%n", future.isCancelled(), future.isDone());
        executor.shutdown();
    }

}

static int count() throws InterruptedException {
    while (!Thread.interrupted());
    throw new InterruptedException();
}

As you can see the count keep checking if the thread is available to keep running, you have to understand that actually there is not guarantee that a running Thread can be stopped if she don't want to.

Reference:


UPDATE 2017-11-18 23:22

I wrote a FutureTask extension that have the ability to try to stop the Thread even if the code doesn't respect the interrupt signal. Keep in mind that it is unsafe because the Thread.stop method is deprecated, anyway it is working and if you really need that you can use it (Please read Thread.stop deprecation notes before, for example, if you are using locks, then run .stop can cause deadlocks).

Test code

public static void main(String[] args) throws InterruptedException {
    final ExecutorService executor = newFixedSizeExecutor(1);
    final Future<Integer> future = executor.submit(() -> count());
    try {
        System.out.println(future.get(1, TimeUnit.SECONDS));
    } catch (Exception e){
        future.cancel(true);
        e.printStackTrace();
    }
    System.out.printf("status=finally, cancelled=%s, done=%s%n", future.isCancelled(), future.isDone());
    executor.shutdown();
}

static int count() throws InterruptedException {
    while (true);
}

Custom Executor

static ThreadPoolExecutor newFixedSizeExecutor(final int threads) {
    return new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()){
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new StoppableFutureTask<>(new FutureTask<>(callable));
        }
    };
}

static class StoppableFutureTask<T> implements RunnableFuture<T> {

    private final FutureTask<T> future;
    private Field runnerField;

    public StoppableFutureTask(FutureTask<T> future) {
        this.future = future;
        try {
            final Class clazz = future.getClass();
            runnerField = clazz.getDeclaredField("runner");
            runnerField.setAccessible(true);
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        final boolean cancelled = future.cancel(mayInterruptIfRunning);
        if(cancelled){
            try {
                ((Thread) runnerField.get(future)).stop();
            } catch (Exception e) {
                throw new Error(e);
            }
        }
        return cancelled;
    }

    @Override
    public boolean isCancelled() {
        return future.isCancelled();
    }

    @Override
    public boolean isDone() {
        return future.isDone();
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        return future.get();
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return future.get(timeout, unit);
    }

    @Override
    public void run() {
        future.run();
    }
}

output

java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
    at com.mageddo.spark.sparkstream_1.Main$StoppableFutureTask.get(Main.java:91)
    at com.mageddo.spark.sparkstream_1.Main.main(Main.java:20)
status=finally, cancelled=true, done=true

Process finished with exit code 0
deFreitas
  • 4,196
  • 2
  • 33
  • 43