0

On the wave of this SO question and with many hints from another one, I'm trying to implement an AsyncTask variant with tasks that can be prioritized.

In my CustomAsyncTask class I have:

public abstract class CustomAsyncTask<Params, Progress, Result> {

    private static int CORE_POOL_SIZE = 1;
    private static int MAXIMUM_POOL_SIZE = 1;

    private static final int KEEP_ALIVE = 1;

    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "CustomAsyncTask #" + mCount.getAndIncrement());
        }
    };

    private static final BlockingQueue<DownloadTask> pPoolWorkQueue =
            new PriorityBlockingQueue<DownloadTask>(10, new DownloadTasksComparator());

    @SuppressWarnings({ "unchecked", "rawtypes" })
    public static Executor PRIORITY_THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                    TimeUnit.SECONDS, (PriorityBlockingQueue) pPoolWorkQueue, sThreadFactory);

   //...
}

The comparator:

public class DownloadTasksComparator implements Comparator<DownloadTask> {

    @Override
    public int compare(DownloadTask arg0, DownloadTask arg1) {
        int res;

        if (arg0 == null && arg1 == null) {
            res = 0;
        } else if (arg0 == null) {
            res = -1;
        } else if (arg1 == null) {
            res = 1;
        }

        res = arg0.getPriority() - arg1.getPriority();

        return res;
    }
}

In the DownloadTask class extending CustomAsyncTask I have a priority Integer field and a getPriority() method.

I'm calling the tasks execution as:

DownloadTask dt = new DownloadTask(..., PRIORITY_NORMAL, ...);
dt.executeOnExecutor(CustomAsyncTask.PRIORITY_THREAD_POOL_EXECUTOR);

This works: if the pool sizes are 1, the downloads get executed one by one; if pool size is 2, etc.

note: priority Integers have arbitrary values:

public static final int PRIORITY_HIGH = 10;
public static final int PRIORITY_NORMAL = 1;

But if I call the tasks as:

DownloadTask dt = new DownloadTask(..., PRIORITY_HIGH, ...);
dt.executeOnExecutor(CustomAsyncTask.PRIORITY_THREAD_POOL_EXECUTOR);

I have a java.lang.ClassCastException: my.pkg.name.CustomAsyncTask$3 cannot be cast to my.pkg.name.DownloadTask

and then

at my.pkg.name.DownloadTasksComparator.compare(DownloadTasksComparator.java:1)
at java.util.concurrent.PriorityBlockingQueue.siftUpUsingComparator(PriorityBlockingQueue.java:334)
at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:447)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1295)
at my.pkg.name.CustomAsyncTask.executeOnExecutor(CustomAsyncTask.java:494)
at my.pkg.name.GetDownloadTaskListener$1.finishDownload(GetDownloadTaskListener.java:180)
at my.pkg.name.DownloadTask.onPostExecute(DownloadTask.java:330)
at my.pkg.name.DownloadTask.onPostExecute(DownloadTask.java:1)
at my.pkg.name.CustomAsyncTask.finish(CustomAsyncTask.java:536)
at my.pkg.name.CustomAsyncTask.access$0(CustomAsyncTask.java:532)
at my.pkg.name.CustomAsyncTask$InternalHandler.handleMessage(CustomAsyncTask.java:549)
at android.os.Handler.dispatchMessage(Handler.java:99)
at android.os.Looper.loop(Looper.java:137)
at android.app.ActivityThread.main(ActivityThread.java:4745)
at java.lang.reflect.Method.invokeNative(Native Method)
at java.lang.reflect.Method.invoke(Method.java:511)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:786)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:553)
at dalvik.system.NativeStart.main(Native Method)

all from AndroidRuntime

I really don't have a clue...

EDIT: At this point, I've wrapped a small Eclipse project that implements things exactly the same way of the bigger application and suffers from the same issue. It borrows CustomAsyncTaskComparator and CustomAsyncTask verbatim. No visual feedback is given. The app's progress is just some LogCat output. But it gives the idea. When more than two tasks are enqueued, the app FCs.

https://www.dropbox.com/s/lrg4kscgw3f1xwr/ConcurrentTest.tar.gz

Community
  • 1
  • 1
dentex
  • 3,223
  • 4
  • 28
  • 47

3 Answers3

2

CustomAsyncTask, like AsyncTask, uses a static queue. Everything that goes on that queue, at present, will run through your DownloadTasksComparator. However, DownloadTasksComparator only works with DownloadTask. So long as you only use DownloadTask, and not other subclasses of CustomAsyncTask, you will be fine. However, apparently you have some other anonymous inner class extending CustomAsyncTask, and that's not a DownloadTask.

Make CustomAsyncTask be abstract, with a getPriority() method as an abstract method. Rename DownloadTasksComparator to CustomAsyncTaskComparator and have it compare CustomAsyncTask instances. Then, your other subclasses of CustomAsyncTask would need to implement their own getPriority() methods, to enable them to be sorted along with the DownloadTask instances in your work queue.

CommonsWare
  • 986,068
  • 189
  • 2,389
  • 2,491
  • Hello. It's not the case as you suggest in 1st paragraph, because `DownloadTask` class it's the only one extending the abstract class `CustomAsyncTask`. Anyway, I followed your instructions in 2nd paragraph, and now, as before, as soon as the comparator is called into the scene, I get the weird `java.lang.ClassCastException: my.pkg.name.CustomAsyncTask$3 cannot be cast to my.pkg.name.CustomAsyncTask` :-O the only difference seems to be that `$3` – dentex Jun 28 '14 at 09:59
  • @dentex: "because DownloadTask class it's the only one extending the abstract class CustomAsyncTask" -- not according to the stack trace. The `PriorityBlockingQueue` is comparing a `DownloadTask` to something else using `DownloadTasksComparator`. Now, it could be that you are putting other stuff on `pPoolWorkQueue` not shown here, but if `CustomAsyncTask` is otherwise a straight-up port of `AsyncTask`, then `my.pkg.name.CustomAsyncTask$3` is a non-static inner class of `CustomAsyncTask` that extends `CustomAsyncTask`. – CommonsWare Jun 28 '14 at 10:46
  • May this be happening because `DownloadTask` is defined in its own class: `public class DownloadTask extends CustomAsyncTask { ... }` with inside the public method `public DownloadTask(Context context, int priority, ...) { ... }` ? This is where I have `this.mPriority = priority` in order for the method `getPriority()` returning `mPriority` to work. Then I call the `DownloadTask dt = ...` as I show into the Question. I mean, do I have to make the `DownloadTask` class an inner class? – dentex Jun 28 '14 at 12:02
  • @dentex: Wait a moment... `java.lang.ClassCastException: my.pkg.name.CustomAsyncTask$3 cannot be cast to my.pkg.name.CustomAsyncTask` indicates that the `$3` inner class is *not* a `CustomAsyncTask`. Are you accidentally putting something else into `pPoolWorkQueue`? – CommonsWare Jun 28 '14 at 12:17
  • Not that I'm aware of... It's a bit frustrating. So let me dig a bit more into threading in general, because this is starting to be a little "blind-trials-and-blind-errors" like. One last thing: could the cast `(PriorityBlockingQueue) pPoolWorkQueue` inside the `Executor` be the problem? Because it would accept `BlockingQueue`. It doesn't compile without this cast, as seen on http://stackoverflow.com/a/7792813/1865860. – dentex Jun 28 '14 at 12:47
  • @dentex: "One last thing: could the cast (PriorityBlockingQueue) pPoolWorkQueue inside the Executor be the problem?" -- I wouldn't think so. – CommonsWare Jun 28 '14 at 12:53
  • As I mention in my answer, the inner class is actually an instance of `FutureTask` that `AsyncTask` uses internally for it's asynchronous operation, as it does not implement `Runnable` itself. – corsair992 Jun 28 '14 at 13:18
  • I've posted a link to a test project (see Question). Would be great if you could take a look. – dentex Jun 28 '14 at 17:05
1

As you may have noticed while looking through the AsyncTask implementation, it internally uses a FutureTask to handle the background task, and that is what gets handed on to the Executor and potentially queued on it's work queue. Since you are already deriving your own implementation, you could replace the FutureTask with a custom derivative that holds a reference to the AsyncTask, to be accessed from your Comparator implementation.

Also, instead of replacing the default static Executor of your custom AsyncTask derivative, you should instead use the executeOnExecutor() method in your subclasses so that it can be used in a generic manner.

corsair992
  • 3,050
  • 22
  • 33
  • Hello and thanks, but I'm sorry I don't know how to proceed in regards of `replace the FutureTask with a custom derivative that holds a reference to the AsyncTask`. 2nd paragraph: I tried supplying a fresh defined `Executor` instead of making a custom AsyncTask class, but the result it's the same: a class cast exception. – dentex Jun 28 '14 at 10:17
  • @dentex: I meant that you should implement a custom class extending `FutureTask` that will hold a reference to the `AsyncTask` that will be using it, thus providing a way for the `AsyncTask` and it's properties to be accessed from the `Comparator` implementations that will be passed the `FutureTask`. – corsair992 Jun 28 '14 at 13:20
  • I've posted a link to a test project (see Question). Would be great if you could take a look. – dentex Jun 28 '14 at 17:06
  • @dentex: Did you try implementing my suggestion? – corsair992 Jun 28 '14 at 17:24
  • No, unfortunately it's out of my league... (as the time being) – dentex Jun 28 '14 at 17:27
  • @dentex: Ok, [here](https://www.dropbox.com/s/mty066xqz09is2b/ConcurrentTestPatch.patch)'s a working patch that implements my solution. – corsair992 Jun 28 '14 at 19:54
  • Thank you very much... This is beautiful. Thanks for giving me this opportunity. Next app's version credits will have a Contributor! :) – dentex Jun 28 '14 at 20:19
  • LoL!!! every Comparator's `arg0` must be swapped with `arg1`. Eheheh! Great. I Finally had the chance to notice this. It was giving the opposite comparison result. :) – dentex Jun 28 '14 at 20:25
0

The problem is the PriorityBlockingQueue<CustomAsyncTask> you use to construct the ThreadPoolExecutor. ThreadPoolExecutor should accept BlockingQueue<Runnable>. ThreadPoolExecutor uses this queue to queue the runnables (runnables are submitted by calling execute(Runnable r)).

In AsyncTask's case, it calls Executor.execute(Runnable r) with a FutureTask object, so it's this FutureTask object being queued by the executor, not AsyncTask itself. So when the comparator tries to cast runnable to DownloadTask, the exception is thrown. I guess my.pkg.name.CustomAsyncTask$3 may be an inner runnable class.

Here is part of the source code of AsyncTask:

public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
        Params... params) {
    if (mStatus != Status.PENDING) {
        switch (mStatus) {
            case RUNNING:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task is already running.");
            case FINISHED:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task has already been executed "
                        + "(a task can be executed only once)");
        }
    }

    mStatus = Status.RUNNING;

    onPreExecute();

    mWorker.mParams = params;
    exec.execute(mFuture);

    return this;
}

If you're trying to run AsyncTask with priority using this PriorityBlockingQueue approach, you should subclass the runnable object passed to the execute method, making it with a priority. You are using a custom asynctask, so you have full control of the variables.

To those trying to run the original AsyncTask with priority, i don't think there are much can be done. AsyncTask "execute" a private member variable mFuture, your hands are tied.

I am currently working on a similar problem, but in my case the "priority" is the timestamp, and i can set it in the runnable implementation, so it's kind of unrestricted by AsyncTask.

This is my code, just in case it might help someone. Sorry for my poor English!

/* Executor used by AsyncTask. Use it with executeOnExecutor()*/
class PriorityExecutor extends ThreadPoolExecutor{

    //workQueue is a instance of PriorityBlockingQueue<Runnable>
    public PriorityExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory);

    }

    //wrap the runnable passed in.
    @Override
    public void execute(Runnable command) {
        super.execute(new PriorityRunnableWrapper(command));
    }
}

//a wrapper class
class PriorityRunnableWrapper implements Runnable ,Comparable<PriorityRunnableWrapper>{

    long addedTime;//the "priority"
    Runnable r;

    public PriorityRunnableWrapper(Runnable r){
        this.r = r;
        addedTime = System.nanoTime();//in my case the timestamp is the priority
    }

    @Override
    public void run() {
        r.run();
    }

    @Override
    public int compareTo(PriorityRunnableWrapper another) {
        if(addedTime == another.addedTime)return 0;
        return addedTime - another.addedTime > 0 ? -1 : 1;
    }

}
handhand
  • 736
  • 7
  • 12