0

I want to implement multi-threading for a batch job. It is getting data in pages and for each thread I want to process data in one page.
I have memory limit for queue size, so for example I want to run 80 threads processing data and have 40 pages in queue for processing.
I just want to wait till some thread finishes process and then continue to get new pages of data.
I tried to achieve this goal with this code and it is working fine:

public class ThreadManager extends ThreadPoolExecutor {
    private final int queueSize;
    private final TimeUnit unit;
    private final long sleepTime;

    private ThreadManager(int poolSize, int queueSize, TimeUnit unit, long sleepTime) {
        super(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize));
        this.queueSize = queueSize;
        this.unit = unit;
        this.sleepTime = sleepTime;
    }

    @Override
    public void execute(@NotNull Runnable runnable) {
        sleepIfQueueMaxedOut();
        super.execute(runnable);
    }

    private void sleepIfQueueMaxedOut() {
        while (getQueue().size() >= queueSize) {
            try {
                unit.sleep(sleepTime);
            } catch (InterruptedException e) { }
        }
    }
}

In this question ThreadPoolExecutor Block When Queue Is Full? some suggestions are about using RejectedExecutionHandler and some are about using Semaphore.
My questions are as follows:

  1. Is my ThreadManager a good practice? Or what is the pros and cons of above approach?
  2. Is there any advantage for using RejectedExecutionHandler instead of my approach?
  3. Is there any advantage for using Semaphore instead of my approach?
  4. One of my mates suggested to use smallrye-mutiny (I am using Quarkus and I already have that dependency), but I don't know anyway to use mutiny for this scenario, I mean threads are started after I get pages of data, there is nothing related to getting data from client to use Uni or Multi. If some of you know how I can leverage mutiny, please guide me.

Update Note:
It seems that beside above link there is other answers to this question like How to implement blocking thread pool executor?, but I want to know what is the pros and cons of other approaches?, like RejectedExecutionHandler or Semaphore. I am changing the title of question, in case it cause misunderstanding.

Khashayar
  • 11
  • 1
  • Is `execute()` only called by a single thread? Otherwise there's no guarantee the queue won't max out again between waking and submitting. – shmosel Sep 14 '22 at 21:21
  • Another obvious downside is you have to guess at the correct sleep interval. Sleep too short and you're wasting CPU cycles; sleep too long and you're wasting time. – shmosel Sep 14 '22 at 21:23
  • @shmosel your comment about correct sleep interval is a good point and make sense. About your first comment, what I know about execute() API in ThreadPoolExecutor is that it is executing on just one thread, unless there is something that I don't know. – Khashayar Sep 14 '22 at 21:33
  • It runs on whichever thread calls it. My point is if *you* are calling it from more than one thread, you can end up with a race condition. – shmosel Sep 14 '22 at 21:35

0 Answers0