6

Does Java support any queue object or mechanism to handle batch treatment?

ex: we have a queue(or any desired queue object), some producer push item into the queue one by one, my target is when we have 10 items or more than 10 items in this queue, we can trigger some handler to treat it in one batch.

or it is not triggered automatically, we need to find a way to loop the queue gracefully at the handler side.

do we have typical and high performance object or lib to handle this?

thanks, Emre

Tarun Gupta
  • 1,232
  • 2
  • 13
  • 26
Emre He
  • 497
  • 11
  • 23

5 Answers5

2

Batch processing in Queue could be achievable with wait/notify, something like you would block thread call against the resource upto it is available or not.

public class MyQueue implements Queue<Object>{
        public synchronized List<Object> peek() {
        if(this.list.size()>=10)
                  this.list.wait();
        return Collections.subList(0,10);
    }
        @Override
    public boolean add(Object e) {
        this.list.add(e);
                if(this.list.size()>=10)
                  this.list.notifyAll(); 
        return false;
    }
}

it is not triggered automatically

In that case you can call wait with specified time out.

Subhrajyoti Majumder
  • 40,646
  • 13
  • 77
  • 103
1

You can use BlockingQueue.drainTo() to automatically obtain batches of tasks to be performed. This is suitable to over 100K task per second.

If you need higher performance queuing you can use the more complex Disruptor or Java Chronicle which can queue into the millions of tasks per second, both supporting auto-batching.

Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • so you mean, we need to implement it at method side, right? we add a loop to run BlockingQueue.drainTo() to get the the list with queue items, then invoke processor to handle it. – Emre He Oct 08 '12 at 09:39
  • You could do that. Both producers and consumers have methods ;) – Peter Lawrey Oct 08 '12 at 09:42
  • I've build a library to handle batching and flushing on fixed timeout: https://github.com/fulmicotone/io.fulmicotone.fqueue – eold Oct 04 '20 at 22:04
1

Here's a quick attempt at processing objects in batches, using a background thread to collect and process objects pushed onto a queue by other threads:

public abstract class Batcher<E> implements Runnable {

    public static interface BatchProcessor<E> {
        public void processBatch(List<E> batch);
    }

    private final BlockingQueue<E> queue;
    private final BatchProcessor<E> processor;

    private Batcher(BlockingQueue<E> queue, BatchProcessor<E> processor) {
        this.queue = queue;
        this.processor = processor;
    }

    @Override
    public void run() {
        try {
            while (true) {
                List<E> batch = new ArrayList<E>();
                for (int i = 0; i < 10; i++) {
                    batch.add(queue.take());
                }
                processor.processBatch(batch);
            }
        } catch (InterruptedException e) {
            return;
        }
    }

}

To use this, you create a BlockingQueue and put objects on it, create an instance of an implementation of BatchProcessor to process the batches, then create an instance of Batcher to pump objects from the former to the latter.

Tom Anderson
  • 46,189
  • 17
  • 92
  • 133
0

Have a look at the API documentation of interface java.util.Queue, which has several implementations.

There's also a standard API, Java Message Service (JMS) to deal with queueing systems for exchanging messages between different processes.

Jesper
  • 202,709
  • 46
  • 318
  • 350
0

I think CountDownLatch is what you need, or possibly CyclicBarrier. That would allow you to setup a synchronization point that will trigger consumers after a certain number of operations have occurred, and you can use a standard queue as the container object.

riffraff
  • 2,429
  • 1
  • 23
  • 32