1

I have a need to run some threads concurrently, but need to force each process to run in a new Thread (this is due to some ThreadLocal bleed that I don't have full control over). To do so, I have been using the SimpleAsyncTaskExecutor. However, the issue with this is that it doesn't maintain a queue that allows new tasks to be submitted once it's reached the concurrency limit. What I really need to do is have functionality like the SimpleAsyncTaskExecutor but where tasks can still be submitted even after the concurrency limit has been reached - I just want those tasks to wait in the queue until another slot frees up. This is what I have right now:

SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();

taskExecutor.setConcurrencyLimit(maxThreads);

return taskExecutor;

Is there some out-of-the-box solution for this, or do I need to write something custom?

cloudwalker
  • 2,346
  • 1
  • 31
  • 69
  • What about `SingleThreadExecutor`? We can keep submit task and will run sequential. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newSingleThreadExecutor() – Pasupathi Rajamanickam Sep 05 '19 at 18:33
  • Well, I want to be able to run up to number of tasks simultaneously, I just want them all to run in new threads. So, basically, have an unbounded queue that I can submit tasks to, allow number of tasks to run concurrently, but each task needs to run in a new Thread. – cloudwalker Sep 05 '19 at 18:37
  • What kind of "Thread bleed"? If you can't just submit `Runnable`s to a `fixedThreadPoolExecutorService`, it's not "thread bleed", it's a concurrency issue within your code. – daniu Sep 10 '19 at 06:49

2 Answers2

0

To ensure you need to execute every task in a new Thread, You are basically against use of any ThreadPool (ThreadLocal behavior in a ThreadPool is something you need to get rid of, sooner or later).

To overcome this, you can simply produce something like this,

 class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) {
         Thread t = new Thread(r);
         t.start();
         try {
             t.join();
         } catch (InterruptedException e) {
              e.printStackTrace();
         }
     }
 }

which executes the Runnable always in a new Thread.

Coming to a crude implementation, We can do something like

    final Executor executor = new ThreadPerTaskExecutor(); 
    final ExecutorService service = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 100; i++) {
      service.submit(new Runnable() {
        public void run() {
          try {
            System.out.println("Executed inside Thread pool with concurrency level 3"
                + Thread.currentThread().toString());
            executor.execute(new Runnable() {
              public void run() {
                try {
                  Thread.sleep(3000); //Some expensive operations here.
                  System.out.println(
                      "Executed inside new Thread always" + Thread.currentThread().toString());
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
              }
            });
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      });
    }

This can be improved with lambdas as well after Java 8. Hope this sheds the basic idea.

Mohamed Anees A
  • 4,119
  • 1
  • 22
  • 35
0

Is there some out-of-the-box solution for this, or do I need to write something custom?

I think there is no out-of-the-box solution for this, and you need to write your own code for this.

You can extend the SimpleAsyncTaskExecutor for simpler/quicker implementation. Example:

public class SimpleAsyncQueueTaskExecutor extends SimpleAsyncTaskExecutor {

    private Queue<Runnable> queue = new ConcurrentLinkedQueue<Runnable>();
    private AtomicInteger concurrencyValue = new AtomicInteger(0);

    private void checkAndExecuteFromQueue() {
        int count = concurrencyValue.get();
        if (isThrottleActive() && !queue.isEmpty() &&
                (count < getConcurrencyLimit())) {
            Runnable task = queue.poll();
            concurrencyValue.incrementAndGet();
            doExecute(new ConcurrencyThrottlingRunnable(task));
        }
    }

    private void afterExecute(Runnable task) {
        queue.remove(task);
        concurrencyValue.decrementAndGet();

        // Check and execute other tasks
        checkAndExecuteFromQueue();
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        Assert.notNull(task, "Runnable must not be null");
        if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
            queue.offer(task);
            checkAndExecuteFromQueue();
        } else {
            doExecute(task);
        }
    }

    private class ConcurrencyThrottlingRunnable implements Runnable {
        private final Runnable target;

        public ConcurrencyThrottlingRunnable(Runnable target) {
            this.target = target;
        }

        @Override
        public void run() {
            try {
                this.target.run();
            }
            finally {
                afterExecute(this.target);
            }
        }
    }

This example code just add a queue, and override the execute method. Hope this help.

binhgreat
  • 982
  • 8
  • 13