1

I know this question was answered many times, but I'm struggling to understand how it works.

So in my application the user must be able to select items which will be added to a queue (displayed in a ListView using an ObservableList<Task>) and each item needs to be processed sequentially by an ExecutorService.

Also that queue should be editable (change the order and remove items from the list).

private void handleItemClicked(MouseEvent event) {
    if (event.getClickCount() == 2) {
        File item = listView.getSelectionModel().getSelectedItem();
        Task<Void> task = createTask(item);
        facade.getTaskQueueList().add(task); // this list is bound to a ListView, where it can be edited
        Future result = executor.submit(task); 
        // where executor is an ExecutorService of which type?

        try {
            result.get();
        } catch (Exception e) {
            // ...
        }
    }
}

Tried it with executor = Executors.newFixedThreadPool(1) but I don't have control over the queue.
I read about ThreadPoolExecutor and queues, but I'm struggling to understand it as I'm quite new to Concurrency.

I need to run that method handleItemClicked in a background thread, so that the UI does not freeze, how can I do that the best way?

Summed up: How can I implement a queue of tasks, which is editable and sequentially processed by a background thread?

Please help me figure it out

EDIT Using the SerialTaskQueue class from vanOekel helped me, now I want to bind the List of tasks to my ListView.

ListProperty<Runnable> listProperty = new SimpleListProperty<>();
listProperty.set(taskQueue.getTaskList()); // getTaskList() returns the LinkedList from SerialTaskQueue
queueListView.itemsProperty().bind(listProperty); 

Obviously this doesn't work as it's expecting an ObservableList. There is an elegant way to do it?

lenny
  • 2,978
  • 4
  • 23
  • 39

1 Answers1

1

The simplest solution I can think of is to maintain the task-list outside of the executor and use a callback to feed the executor the next task if it is available. Unfortunately, it involves synchronization on the task-list and an AtomicBoolean to indicate a task executing.

The callback is simply a Runnable that wraps the original task to run and then "calls back" to see if there is another task to execute, and if so, executes it using the (background) executor.

The synchronization is needed to keep the task-list in order and at a known state. The task-list can be modified by two threads at the same time: via the callback running in the executor's (background) thread and via handleItemClicked method executed via the UI foreground thread. This in turn means that it is never exactly known when the task-list is empty for example. To keep the task-list in order and at a known fixed state, synchronization of the task-list is needed.

This still leaves an ambiguous moment to decide when a task is ready for execution. This is where the AtomicBoolean comes in: a value set is always immediatly availabe and read by any other thread and the compareAndSet method will always ensure only one thread gets an "OK".

Combining the synchronization and the use of the AtomicBoolean allows the creation of one method with a "critical section" that can be called by both foreground- and background-threads at the same time to trigger the execution of a new task if possible. The code below is designed and setup in such a way that one such method (runNextTask) can exist. It is good practice to make the "critical section" in concurrent code as simple and explicit as possible (which, in turn, generally leads to an efficient "critical section").

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class SerialTaskQueue {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newSingleThreadExecutor();
        // all operations on this list must be synchronized on the list itself.
        SerialTaskQueue tq = new SerialTaskQueue(executor);
        try {
            // test running the tasks one by one
            tq.add(new SleepSome(10L));
            Thread.sleep(5L);
            tq.add(new SleepSome(20L));
            tq.add(new SleepSome(30L));

            Thread.sleep(100L);
            System.out.println("Queue size: " + tq.size()); // should be empty
            tq.add(new SleepSome(10L));

            Thread.sleep(100L);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdownNow();
        }
    }

    // all lookups and modifications to the list must be synchronized on the list.
    private final List<Runnable> tasks = new LinkedList<Runnable>();
    // atomic boolean used to ensure only 1 task is executed at any given time
    private final AtomicBoolean executeNextTask = new AtomicBoolean(true);
    private final Executor executor;

    public SerialTaskQueue(Executor executor) {
        this.executor = executor;
    }

    public void add(Runnable task) {

        synchronized(tasks) { tasks.add(task); }
        runNextTask();
    }

    private void runNextTask() {
        // critical section that ensures one task is executed.
        synchronized(tasks) {
            if (!tasks.isEmpty()
                    && executeNextTask.compareAndSet(true, false)) {
                executor.execute(wrapTask(tasks.remove(0)));
            }
        }
    }

    private CallbackTask wrapTask(Runnable task) {

        return new CallbackTask(task, new Runnable() {
            @Override public void run() {
                if (!executeNextTask.compareAndSet(false, true)) {
                    System.out.println("ERROR: programming error, the callback should always run in execute state.");
                }
                runNextTask();
            }
        });
    }

    public int size() {
        synchronized(tasks) { return tasks.size(); }
    }

    public Runnable get(int index) {
        synchronized(tasks) { return tasks.get(index); }
    }

    public Runnable remove(int index) {
        synchronized(tasks) { return tasks.remove(index); }
    }

    // general callback-task, see https://stackoverflow.com/a/826283/3080094
    static class CallbackTask implements Runnable {

        private final Runnable task, callback;

        public CallbackTask(Runnable task, Runnable callback) {
            this.task = task;
            this.callback = callback;
        }

        @Override public void run() {
            try {
                task.run();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    callback.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // task that just sleeps for a while
    static class SleepSome implements Runnable {

        static long startTime = System.currentTimeMillis();

        private final long sleepTimeMs;
        public SleepSome(long sleepTimeMs) {
            this.sleepTimeMs = sleepTimeMs;
        }
        @Override public void run() {
            try { 
                System.out.println(tdelta() + "Sleeping for " + sleepTimeMs + " ms.");
                Thread.sleep(sleepTimeMs);
                System.out.println(tdelta() + "Slept for " + sleepTimeMs + " ms.");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        private String tdelta() { return String.format("% 4d ", (System.currentTimeMillis() - startTime)); }
    }
}

Update: if groups of tasks need to be executed serial, have a look at the adapted implementation here.

Community
  • 1
  • 1
vanOekel
  • 6,358
  • 1
  • 21
  • 56
  • thanks, that helped me. Another question is now how can I bind that `List tasks = new LinkedList();` list to my `ListView`? – lenny Oct 03 '15 at 16:15
  • 1
    @lenny42 I'm not familiar with `ObservableList`, but can you do it the other way around? Use something like `tasks = FXCollections.observableArrayList()` (see http://stackoverflow.com/a/26195354/3080094)? – vanOekel Oct 03 '15 at 21:32
  • @lenny42 Or use the [ModifiableObservableListBase](http://docs.oracle.com/javase/8/javafx/api/javafx/collections/ModifiableObservableListBase.html) and use an updated version of the `SerialTaskQueue` as delegate as shown in the javadoc example? – vanOekel Oct 03 '15 at 21:43
  • Using `tasks = FXCollections.observableArrayList()` just works fine, thanks! – lenny Oct 07 '15 at 10:31