0

To simplify my case, let's assume that I'm implementing a Binary Search using Java's Fork-Join framework. My goal is to find a specific integer value (the target integer) in an array of integers. This can be done by breaking the array by half until it's small enough to perform a serial search. The result of the algorithm needs to be a boolean value indicating whether the target integer was found in the array or not.

A similar problem is explored in Klaus Kreft's presentation in slide 28 onward. However, Kreft's goal is to find the largest number in the array so all entries have to be scanned. In my case, it is not necessary to scan the full array because once the target integer was found, the search can be stopped.

My problem is that once I encounter the target integer many tasks have already been inserted to the thread pool and I need to cancel them since there is no point in continuing the search. I tried to call getPool().terminate() from inside a RecursiveTask but that didn't help much since many tasks are already queued and I even noticed that new onces are queued too even after shutdown was called..

My current solution is to use a static volatile boolean that is initiated as 'false' and to check its value at the beginning of the task. If it's still 'false' then the task begins its works, if it's 'true', the task immediately returns. I can actually use a RecursiveAction for that.

So I think that this solution should work, but I wonder if the framework offers some standard way of handling cases like that - i.e. defining a stop condition to the recursion that consequently cancels all queued tasks.

Note that if I want to stop all running tasks immediately when the target integer was found (by one of the running tasks) I have to check the boolean after each line in these tasks and that can affect performance since the value of that boolean cannot be cached (it's defined as volatile).

So indeed, I think that some standard solution is needed and can be provided in the form of clearing the queue and interuppting the running tasks. But I haven't found such a solution and I wonder if anyone else knows about it or has a better idea.

Thank you for your time, Assaf

EDIT: here is my testing code:

package xxx;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class ForkJoinTest {

    static final int ARRAY_SIZE = 1000;
    static final int THRESHOLD = 10;

    static final int MIN_VALUE = 0;
    static final int MAX_VALUE = 100;

    static Random rand = new Random();


    // a function for retrieving a random int in a specific range
    public static int randInt(int min, int max) {
        return rand.nextInt((max - min) + 1) + min;
    }

    static volatile boolean result = false;
    static int[] array = new int[ARRAY_SIZE];
    static int target;

    @SuppressWarnings("serial")
    static class MyAction extends RecursiveAction {

        int startIndex, endIndex;

        public MyAction(int startIndex, int endIndex) {
            this.startIndex = startIndex;
            this.endIndex = endIndex;
        }

        // if the target integer was not found yet: we first check whether 
        // the entries to search are too few. In that case, we perform a 
        // sequential search and update the result if the target was found. 
        // Otherwise, we break the search into two parts and invoke the 
        // search in these two tasks.
        @Override
        protected void compute() {
            if (!result) {
                if (endIndex-startIndex<THRESHOLD) { 
                    // 
                    for (int i=startIndex ; i<endIndex ; i++) {
                        if (array[i]==target) {
                            result = true;
                        }
                    }
                } else {
                    int middleIndex = (startIndex + endIndex) / 2;
                    RecursiveAction action1 = new MyAction(startIndex, middleIndex);
                    RecursiveAction action2 = new MyAction(middleIndex+1, endIndex);
                    invokeAll(Arrays.asList(action1,action2));
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        for (int i=0 ; i<ARRAY_SIZE ; i++) {
            array[i] = randInt(MIN_VALUE, MAX_VALUE);
        }
        target = randInt(MIN_VALUE, MAX_VALUE);
        ForkJoinPool pool = new ForkJoinPool();
        pool.invoke(new MyAction(0,ARRAY_SIZE));
        System.out.println(result);
    }

}
Assaf
  • 184
  • 1
  • 11
  • Can you post some code? You could use a specific queue that you can clear, and maybe interrupt the running threads, but looking the code is easier to give you proper suggestions. – Simone Gianni Sep 04 '14 at 12:36
  • I maintain an open-source fork/join framework that provides a parallel sequential search that handles your need for "find first" You can use it as is or use the code as an example of how to do it yourself. The sourceForge link is: http://sourceforge.net/projects/tymeacdse/?source=navbar – edharned Sep 04 '14 at 13:58
  • Thanks @edharned, I'll take a look. Does it depend on Java's fork/join framework? do you also use a volatile boolean / AtomicBoolean to stop the search? – Assaf Sep 04 '14 at 14:25
  • @Assaf No it doesn't use the Java F/J framework. It does F/J properly. The sequential parallel search is one of the 17 built-in-functions. It uses a volatile boolean to stop other threads once "find first" is found. There is also find-any, find-last, find-all. – edharned Sep 04 '14 at 14:34

2 Answers2

0

I think you may be inventing a barrier to the correct solution.

You say that your boolean stop flag must be volatile and so will interfere with the speed of the solution - well, yes and no - accessing a volatile does indeed do cache flushing but have you considered an AtomicBoolean?

I believe the correct solution is to use an AtomicBoolean flag to get all processes to stop. You should check is in as finely grained fashion as is reasonable to get your system to stop quickly.

It would be a mistake to attempt to clear all queues and interrupt all threads - this would lead to a horrible mess.

    static AtomicBoolean finished = new AtomicBoolean();
    ....

        protected void compute() {
            if (!finished.get()) {
                if (endIndex - startIndex < THRESHOLD) {
                    //
                    for (int i = startIndex; i < endIndex && !finished.get(); i++) {
                        if (array[i] == target) {
                            finished.set(true);
                            System.out.print("Found at " + i);
                        }
                    }
                } else {
                    ...
                }
            }
        }
OldCurmudgeon
  • 64,482
  • 16
  • 119
  • 213
  • Thanks, so you suggest to switch to AtomicBoolean and to add a check for its value as part of the loop. I should have added this type of check also to my original code with the volatile. But can you explain why AtomicBoolean is preferable over volatile in this case? performance-wise I thought that they are pretty much the same since both are lock-free and are not cached. – Assaf Sep 04 '14 at 14:23
  • @Assaf - There is little difference - accessing a `volatile` flushes all caches while accessing an `AtomicBoolean` *should* be much less intrusive - it is not always better but it will not be worse. In your scenario there is little difference [Volatile boolean vs AtomicBoolean](http://stackoverflow.com/questions/3786825/volatile-boolean-vs-atomicboolean) - my point is that you should *not* take the other route of clearing queues and interrupting threads. – OldCurmudgeon Sep 04 '14 at 14:40
0

I left a comment above on how to do this by looking at an open source product that does this in many built-in-functions. Let me put some detail here.

If you want to cancel tasks that are beginning or are currently executing, then each task needs to know about every other task. When one task finds what it wants, that task need to inform every other task to stop. You cannot do this with dyadic recursive division (RecursiveTask, etc.) since you create new tasks recursively and the old tasks will never know about the new ones. I’m sure you could pass a reference to a stop-me field to each new task, but it will get very messy and debugging would be “interesting.”

You can do this with Java8 CountedCompleter(). The framework was butchered to support this class so things that should be done by the framework needs doing manually, but it can work.

Each task needs a volatile boolean and a method to set it to true. Each tasks need an array of references to all the other tasks. Create all the tasks up front, each with an empty array of to-be references to the other tasks. Fill in the array of references to every other task. Now submit each task (see the doc for this class, fork() addPendingCount() etc.)

When one tasks finds what it wants, it uses the array of references to the other tasks to set their boolean to true. If there is a race condition with multiple threads, it doesn’t matter since all threads set “true.” You will also need to handle tryComplete(), onCompletion() etc. This class is very muddled. It is used for the Java8 stream processing which is a story in itself.

What you cannot do is purge pending tasks from the deques before they begin. You need to wait until the task starts and check the boolean for true. If the execution is lengthy, then you may also want to check the boolean for true periodically. The overhead of a volatile read is not that bad and there really is no other way.

edharned
  • 1,884
  • 1
  • 19
  • 20
  • Thanks again for the references and for the long explanation. I noticed that Java 8 has some new developments but it's a bit hard to understand when to prefer what. Need to do some more reading. One thing I don't understand in your description is why using an array of flags (which dictates that all tasks have to be created in advance) instead of a single global flag (either volatile or AtomicBoolean as suggested by @OldCurmudgeon above). Both the array and the single flag solutions will void tasks that are about to start, and both can "stop" tasks in the middle if the flag is checked. – Assaf Sep 06 '14 at 00:30
  • @Assaf The exact way you do this is up to you. If you have a single volatile boolean in one object then you need a pointer to that reference: pointer.isTrue(); which is more overhead then just checking your own local variable: if (stop-me) ... When you're only checking in the beginning of the task, who cares. But when you check periodically, then the overhead matters. – edharned Sep 06 '14 at 13:58
  • The searching example at https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountedCompleter.html shows an example of creating an AtomicReference/Atomic, then passing this atomic to the subtasks. The subtasks have to call atomic.get() and do nothing if the result is not null. What I don't understand is why do you need CountedCompleter, because a RecursiveAction would do the same thing. – snaran Jul 27 '17 at 20:59
  • @snaran This question was answered three years ago. Since then, Doug has added many examples in the CC class. The example you mention is horrendously complex, not something one would normally want. I don't know how a RecursiveAction could do this easily since there is no single focal point for all the Tasks. The key work is "easily", anyone can make a messy program (as Doug did with the CC example.) But, if you have an example then show us. – edharned Jul 27 '17 at 22:11