7

We have a large text file in which each line requires intensive process. The design is to have a class that reads the file and delegates the processing of each line to a thread, via thread pool. The file reader class should be blocked from reading the next line once there is no free thread in the pool to do the processing. So i need a blocking thread pool

In the current implementation ThreadPoolExecutor.submit() and ThreadPoolExecutor.execute() methods throw RejectedExecutionException exception after the configured # of threads get busy as i showed in code snippet below.

public class BlockingTp {

    public static void main(String[] args) {
        BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
        ThreadPoolExecutor executorService=
            new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, blockingQueue);
        int Jobs = 10;
        System.out.println("Starting application with " + Jobs + " jobs");
        for (int i = 1; i <= Jobs; i++)
            try {
                executorService.submit(new WorkerThread(i));
                System.out.println("job added " + (i));
            } catch (RejectedExecutionException e) {
                System.err.println("RejectedExecutionException");
            }
    }
}

class WorkerThread implements Runnable {
    int job;
    public WorkerThread(int job) {
        this.job = job;
    }
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (Exception excep) {
        }
    }
}

Output of above program is

Starting application to add 10 jobs
Added job #1
Added job #2
Added job #3
Added job #4
Added job #5
Added job #6
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException

Can some one throw some light i.e how i can implement blocking thread pool.

Gray
  • 115,027
  • 24
  • 293
  • 354
T-Bag
  • 10,916
  • 3
  • 54
  • 118

5 Answers5

13

Can some one throw some light i.e how i can implement blocking thread pool.

You need to set a rejection execution handler on your executor service. When the thread goes to put the job into the executor, it will block until there is space in the blocking queue.

BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
ThreadPoolExecutor executorService =
     new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, arrayBlockingQueue);
// when the blocking queue is full, this tries to put into the queue which blocks
executorService.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // block until there's room
            executor.getQueue().put(r);
            // check afterwards and throw if pool shutdown
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + executor);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("Producer interrupted", e);
        }
    }
});

So instead of the TRE throwing a RejectedExecutionException, it will call the rejection handler which will in turn try to put the job back on the queue. This blocks the caller.

hitchhiker
  • 1,099
  • 5
  • 19
  • 44
Gray
  • 115,027
  • 24
  • 293
  • 354
  • 3
    Inserting directly into the work queue like this is not safe, because it bypasses the logic that ThreadPoolExecutor uses to manage the state of its pool of workers. For instance, it does not correctly check whether the pool has been shut down, which means calls to execute() may hang instead of correctly rejecting the task. – David Aug 29 '18 at 19:44
  • Good point @David. Do you know of a better way of accomplishing a blocking pool? – Gray Aug 30 '18 at 14:31
  • What's the point/goal of `Thread.currentThread().interrupt()` here? – IsaacLevon Jan 23 '20 at 10:47
  • 1
    It's a FAQ @yaseco. Whenever you write code that catches `InterruptedException`, you should immediately re-interrupt the thread. The throwing of the exception clears the thread's interrupt flag and typically you want to make sure the caller knows that the thread is interrupted. See: https://stackoverflow.com/a/3976377/179850 and https://dzone.com/articles/how-to-handle-the-interruptedexception – Gray Jan 23 '20 at 18:09
  • Added a check if the executor has been shutdown. – Gray Feb 22 '22 at 22:45
  • @Gray the authors of the interrupt mechanism weren't stupid - the flag is down because it supposed to be. They didn't make an API where you need to write a rather unwieldy line _or_ your code is broken. An interrupt means whatever you want it to mean - they do not happen unless you program them to do so. For example, they can mean: If a thread is interrupted during the wait-for-room, reject it. Which you are doing. So re-raising the flag would be bad. – rzwitserloot Sep 14 '22 at 20:45
  • Sorry @rzwitserloot, did I say they were stupid? I know how interrupts work. Not sure about your other comments. Are you saying that my answer is unwieldy? How would you do it? Check out my multithreading score btw in case there are any questions. – Gray Sep 21 '22 at 21:46
  • Appeal to SO score instead of objective fact? Geez. That score measures quantity, not quality, by and large. How would I do it? Same way as you do it, no doubt. Just, without `Thread.currentThread().interrupt();`. Just get rid of that line. – rzwitserloot Sep 21 '22 at 22:00
0

Lets have a look at your code again:

for (int i = 1; i <= Jobs; i++)
  try {
    tpExe.submit(new WorkerThread(i));
    System.out.println("job added " + (i));
  } catch (RejectedExecutionException e) {
    System.err.println("RejectedExecutionException");
  }

So - when you try to submit, and the pool is busy, that exception is thrown. If you want to wrap around that, it could look like:

public void yourSubmit(Runnable whatever) {
  boolean submitted = false;
  while (! submitted ) {
    try {
      tpExe.submit(new WorkerThread(whatever));
      submitted = true;
    } catch (RejectedExecutionException re) {
      // all threads busy ... so wait some time
      Thread.sleep(1000);
    }

In other words: use that exception as "marker" that submits are currently not possible.

GhostCat
  • 137,827
  • 25
  • 176
  • 248
0

You can use semaphore for to control the resource.Reader will read and create asynchronous task by acquiring semaphore.If every thread is busy the reader thread will wait till thread is available.

public class MyExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}
gati sahu
  • 2,576
  • 2
  • 10
  • 16
0

Here is a RejectedExecutionHandler that supports the desired behavior. Unlike other implementations, it does not interact with the queue directly so it should be compatible with all Executor implementations and will not deadlock.

import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.BiFunction;

import static com.github.cowwoc.requirements.DefaultRequirements.assertThat;
import static com.github.cowwoc.requirements.DefaultRequirements.requireThat;

/**
 * Applies a different rejection policy depending on the thread that requested execution.
 */
public final class ThreadDependantRejectionHandler implements RejectedExecutionHandler
{
    private final ThreadLocal<Integer> numberOfRejections = ThreadLocal.withInitial(() -> 0);
    private final BiFunction<Thread, Executor, Action> threadToAction;

    /**
     * @param threadToAction indicates what action a thread should take when execution is rejected
     * @throws NullPointerException if {@code threadToAction} is null
     */
    public ThreadDependantRejectionHandler(BiFunction<Thread, Executor, Action> threadToAction)
    {
        requireThat(threadToAction, "threadToAction").isNotNull();
        this.threadToAction = threadToAction;
    }

    @SuppressWarnings("BusyWait")
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
    {
        if (executor.isShutdown())
            return;
        Thread currentThread = Thread.currentThread();
        Action action = threadToAction.apply(currentThread, executor);
        if (action == Action.RUN)
        {
            r.run();
            return;
        }
        if (action == Action.REJECT)
        {
            throw new RejectedExecutionException("The thread pool queue is full and the current thread is not " +
                "allowed to block or run the task");
        }

        assertThat(action, "action").isEqualTo(Action.BLOCK);
        int numberOfRejections = this.numberOfRejections.get();
        ++numberOfRejections;
        this.numberOfRejections.set(numberOfRejections);
        if (numberOfRejections > 1)
            return;
        try
        {
            ThreadLocalRandom random = ThreadLocalRandom.current();
            while (!executor.isShutdown())
            {
                try
                {
                    Thread.sleep(random.nextInt(10, 1001));
                }
                catch (InterruptedException e)
                {
                    throw new WrappingException(e);
                }
                executor.submit(r);
                numberOfRejections = this.numberOfRejections.get();
                if (numberOfRejections == 1)
                {
                    // Task was accepted, or executor has shut down
                    return;
                }
                // Task was rejected, reset the counter and try again.
                numberOfRejections = 1;
                this.numberOfRejections.set(numberOfRejections);
            }
            throw new RejectedExecutionException("Task " + r + " rejected from " + executor + " because " +
                "the executor has been shut down");
        }
        finally
        {
            this.numberOfRejections.set(0);
        }
    }

    public enum Action
    {
        /**
         * The thread should run the task directly instead of waiting for the executor.
         */
        RUN,
        /**
         * The thread should block until the executor is ready to run the task.
         */
        BLOCK,
        /**
         * The thread should reject execution of the task.
         */
        REJECT
    }
}
Gili
  • 86,244
  • 97
  • 390
  • 689
-1

This works for me.

class handler implements RejectedExecutionHandler{
    @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
T-Bag
  • 10,916
  • 3
  • 54
  • 118