11

I have a requirement for a task to be executed asynchronously while discarding any further requests until the task is finished.

Synchronizing the method just queues up the tasks and doesn't skip. I initially thought to use a SingleThreadExecutor but that queues up tasks as well. I then looked at the ThreadPoolExecutor but it reads the queue to get the task to be executed and therefore will have one task executing and a minimum of one task queued (the others can be discarded using ThreadPoolExecutor.DiscardPolicy).

The only thing I can think off is to use a Semaphore to block the queue. I've come with the following example to show what I'm trying to achieve. Is there a simpler way? Have I missed something obvious?

import java.util.concurrent.*;

public class ThreadPoolTester {
    private static ExecutorService executor = Executors.newSingleThreadExecutor();
    private static Semaphore processEntry = new Semaphore(1);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            kickOffEntry(i);

            Thread.sleep(200);
        }

        executor.shutdown();
    }

    private static void kickOffEntry(final int index) {
        if (!processEntry.tryAcquire()) return;
        executor.
            submit(
                new Callable<Void>() {
                    public Void call() throws InterruptedException {
                        try {
                            System.out.println("start " + index);
                            Thread.sleep(1000); // pretend to do work
                            System.out.println("stop " + index);
                            return null;

                        } finally {
                            processEntry.release();
                        }
                    }
                }
            );
    }
}

Sample output

start 0
stop 0
start 5
stop 5
start 10
stop 10
start 15
stop 15

Taking axtavt's answer and transforming the above example gives the following simpler solution.

import java.util.concurrent.*;

public class SyncQueueTester {
    private static ExecutorService executor = new ThreadPoolExecutor(1, 1, 
            1000, TimeUnit.SECONDS, 
            new SynchronousQueue<Runnable>(),
            new ThreadPoolExecutor.DiscardPolicy());

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            kickOffEntry(i);

            Thread.sleep(200);
        }

        executor.shutdown();
    }

    private static void kickOffEntry(final int index) {
        executor.
            submit(
                new Callable<Void>() {
                    public Void call() throws InterruptedException {
                        System.out.println("start " + index);
                        Thread.sleep(1000); // pretend to do work
                        System.out.println("stop " + index);
                        return null;
                    }
                }
            );
    }
}
Michael Rutherfurd
  • 13,815
  • 5
  • 29
  • 40

2 Answers2

13

It looks like executor backed by SynchronousQueue with desired policy does what you want:

executor = new ThreadPoolExecutor(
    1, 1, 
    1000, TimeUnit.SECONDS, 
    new SynchronousQueue<Runnable>(),
    new ThreadPoolExecutor.DiscardPolicy());
axtavt
  • 239,438
  • 41
  • 511
  • 482
0

if there is no queue, there is no need for an executor i'd say. using a semaphore alone seems enough. i'm using the code below to avoid running the same code when it is already running. just make sure the semaphore is static volatile, which makes the semaphore the only semaphore for the class and propagates the semaphore reference to other threads' heap as soon as it is changed

if (this.getSemaphore().tryAcquire()) {
        try {
            process();
        } catch (Exception e) {
        } finally {
            this.getSemaphore().release();
        }
}
else {
    logger.info(">>>>> Job already running, skipping go");
}
ajitatif
  • 117
  • 1
  • 9
  • The task needs to run in a separate thread so that the main thread isn't blocked or otherwise impacted (the "mainline" is actually a swing app) – Michael Rutherfurd Feb 10 '11 at 08:57