1

I want to implement a variety of of a publisher/subscriber pattern using Java and currently running out of ideas.

There is 1 publisher and N subscribers, the publisher publish objects then each subscriber need to process the each of the objects once and only once in the correct order. The publisher and each subscriber run in their own thread.

In my original implementation, each subscriber has its own blocking queue, and the publisher put objects into each of the subscriber's queue. This works ok but the publisher will be blocked if any of the subscriber's queue is full. This leads to degration of performance as each subscriber takes different time in processing the object.

Then in another implementation, the publisher hold the object in its own queue. Along with the object, an AtomicInteger counter is associated with it with the number of subscribers out there. Each subscriber then peek the queue and decrease the counter, and remove it from the queue when the counter reach zero.

In this way the publisher is free from blocking but now the subscriber will need to wait for each other to process the object, removing the object from the queue, before the next object can be peeked.

Is there any better way to do this? I assume this should be a quite common pattern.

Wudong
  • 2,320
  • 2
  • 32
  • 46

3 Answers3

0

Your "many queues" implementation is the way to go. I don't think you need to necessarily be concerned with one full queue blocking the producer, because the overall time to completion won't be affected. Let's say you have three consumers, two consume at a rate of 1 per second and the third consumes at a rate of 1 per five seconds, meanwhile the producer produces at a rate of 1 per two seconds. Eventually the third queue will fill up, and so the producer will block on it and will also stop putting items in the first and second queues. There are ways around this, but they're not going to change the fact that the third consumer will always be the bottleneck. If you're producing/consuming 100 items, then this will take at least 500 seconds because of the third consumer (5 seconds times 100 items), and this will be the case even if the first and second consumers finish after 200 seconds (because you've done something clever to allow the producer to continue to fill their queues even after the third queue is full) or if they finish after 500 seconds (because the producer blocked on the third queue).

Zim-Zam O'Pootertoot
  • 17,888
  • 4
  • 41
  • 69
  • Although it depends on the use-case - perhaps subsumer 3 is allowed to silently give up or drop items if it falls behind. Perhaps the important metric is the average time between publishing and consuming. But I agree that this appears to be the best approach from a purely throughput point of view. – selig May 31 '13 at 16:39
  • thanks for the answer. However, my problem is, all the consumer, in the long run, will process all the objects in a similar amount of time. however, each consumer took different time in processing one single object. There shouldn't be a single consumer blocking all others work. – Wudong May 31 '13 at 16:41
  • @Wudong In that case you'll need to use blocking queues with unlimited capacity i.e. `LinkedBlockingQueues` - there's no way for the producer to continue to produce if one of your queues is full. (You could store the excess items somewhere else, but that's just a more complicated version of using blocking queues with unlimited capacity.) – Zim-Zam O'Pootertoot May 31 '13 at 16:45
0

Definately

each subscriber has its own blocking queue, and the publisher put objects into each of the subscriber's queue.`

this is the way to go. you can use threaded approach to put it in queue... so if one queue is full publisher will not wait..

for example.

s1 s2 s3 are subscribers and addToQueue is method in each subscriber which adds to corrosponding queue. The addQueue Method is which waits till queue is non empty .. so call to addQueue will be a blocking call ideally synchronised code...

Then in publisher you can do something similar to below code

NOTE: code might not be in working condition as it is.. but should give you idea.

List<subscriber> slist;// Assume its initialised
public void publish(final String message){

    for (final subscriber s: slist){


          Thread t=new Thread(new Runnable(){
             public void run(){
                s.addToQueue(message);
             }
           });

      t.start();
    }

}
rahul maindargi
  • 5,359
  • 2
  • 16
  • 23
  • This is an interesting idea but will it perform? each thread for a message send seems a bit too much. – Wudong Jun 01 '13 at 21:39
  • @Wudong I have never tried it... you can try and let us know. but I think it should do the trick..also note: threads will ends as soon as message is added in Queue..so as long as queue ins not full there will not be many thread waiting.... – rahul maindargi Jun 04 '13 at 08:18
0

There is 1 publisher and N subscribers, the publisher publish objects then each subscriber need to process the each of the objects once and only once in the correct order. The publisher and each subscriber run in their own thread.

I would change this architecture. I initially considered the queue per subscriber but I don't like that mechanism. For example, if the first subscriber takes longer to run, all of the jobs will end up in that queue and you will only be doing 1 thread of work.

Since you have to run the subscribers in order, I'd have a pool of threads which run each message through all of the subscribers. The calls to the subscribers will need to be reentrant which may not be possible.

So you would have a pool of 10 threads (let's say) and each one dequeues from the publisher's queue, and does something like the following:

public void run() {
    while (!shutdown && !Thread.currentThread().isInterrupted()) {
        Article article = publisherQueue.take();
        for (Subscriber subscriber : subscriberList) {
           subscriber.process(article);
        }
    }
}
Gray
  • 115,027
  • 24
  • 293
  • 354
  • How can I guarantee the order each subscriber processes the "article" in this implementation? If each article is processed by a thread, it is very possible that they are processed in a different order they are produced. – Wudong Jun 01 '13 at 21:46
  • Wow. I didn't realize from your post that the had to go through all subscribers in the same order. Does the order of subscribers matter at all or is it just the final order of articles? – Gray Jun 02 '13 at 14:15
  • it is just the order of the "articles" matters, subscribers's order doesn't matter. – Wudong Jun 03 '13 at 06:58
  • What if I want to add some kind of dependencies amongst subscribers? Consider a case, if I have 3 subscribers A, B, C but C should consume a message only after A and B are done consuming the message. Is there any neat way I can achieve this? – isahilarora Aug 27 '17 at 20:21