0

I have the following design:

There is a Task which extends TimerTask and it is scheduled to run every minute. This Task will try to take items from a central queue (as a single consumer) and write their representation into a file.

Additionally, there are multiple producers which put items into the central queue from time to time.

I am interested that each time the Task is executed (run() method executed) it will extract all the items from the queue if there are items, if there are no items do nothing.

Producers should sleep on the Queue if it is full.

My Solution for this problem is:

Create ExtractTask which extends TimerTask. ExtractTask will contain a BlockingQueue. Each producer will receive a reference to the queue instance by executing method getQueue(). Producers will execute BlockingQueue.put() method. The consumer will execute BlockingQueue.poll() method inside run().

Can you suggest a better design? does my design contain any problematic scenario cases? any synchronization problems this design may encounter?

Ravi Bhatt
  • 3,147
  • 19
  • 21
Michael
  • 2,827
  • 4
  • 30
  • 47

5 Answers5

2

I would:

  • keep the queues separated from the tasks in your design,
  • inject the queues instead of doing a lookup,
  • use a SchedulerService instead of a TimerTask

Other than that you've got it.

If you're willing to risk a dependency on Spring you should look into Spring Integration. All the components you describe are in there. You could also solve the problem using many other frameworks, like Camel or Akka; my main point here is to not maintain this code yourself if you don't absolutely have to.

Disclaimer: I'm somewhat biased about Spring Integration

iwein
  • 25,788
  • 10
  • 70
  • 111
  • +1, I am not following you as regard to the queues injection, you wish to elaborate? I am also biased about Spring Integration, although i am relatively new to Spring, I am familiar how to init the objects , but less with timer functionality, nice examples will be appreciated. – Michael Oct 22 '12 at 15:12
  • 1
    @Michael buy my book, it's full of examples :p. More seriously, though, you could look at https://github.com/SpringSource/spring-integration-samples – iwein Oct 22 '12 at 15:18
  • Thanks, For now it seems to be the best answer and a very good one, i will wait for a while for more answers, to give the chance to some more interesting ideas, then i will accept the best one. – Michael Oct 22 '12 at 15:32
1

Design seems good. There's not much detail here, so hard to be sure. I'd reccomend injecting all dependencies into the timer task.

Also, you could probably acheive this in Apache Camel without much custom code. See https://camel.apache.org/timer.html

David Roussel
  • 5,788
  • 1
  • 30
  • 35
1

Since you have asked about the design, i would suggest few things:

  • I would personally go for Executor service over Timer Task. Have a look here. Using executor makes sure that you can execute the tasks in separate threads in future if requirements change.
  • Try and separate your queue from the task object.
  • generally use DI in the code so that its testable.
  • I would have producers such that they receive the queue in their constructor.
Community
  • 1
  • 1
Ravi Bhatt
  • 3,147
  • 19
  • 21
  • Thanks for the advice, as i have answered and asked some one here who deleted his post, how do you in-force my requirement of running the Task every minute , while implementing the Executor service, (I have only one single repeating task) – Michael Oct 22 '12 at 15:26
  • Scheduled Thread Pool Executor. http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html Hope this helps. – Ravi Bhatt Oct 22 '12 at 15:30
1

Based on your design I can think of something like this below. The ConsumerTask can utilize generics but I had a hard time figuring out how to do the same with the Producer thread. Both producer and consumer have a limit to the number of items produced/consumed. From the TimerTask logic is essential to cancel the timer in the run() method of the TimerTask itself for it to stop. Only a POISON PILL approach can be used in this case to shutdown . If you use the Executors.newSingleThreadExecutor() or scheduledThreadPoolExecutor() the shutdown() and shutdownNow() methods can be used to stop either producer or consumer. Although TimerTask is a good example to check out the working of the ConcurrentQueue it wont be used in production systems.

EDIT Add generic capabilities to the Producer thread. The constructor now takes a template class which implements the method to add items to the queue. I have defined an abstract class AddItem which contains and addItem() method which is called whenever the Producer wants to add items to the Queue.

import java.util.Date;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ConsumerTask<T> extends TimerTask {
    Timer timer;
    ConcurrentLinkedQueue<T> itemQueue;
    AtomicLong count = new AtomicLong(0);
    final long limit;

    public ConsumerTask(ConcurrentLinkedQueue<T> itemQ, long lim, int seconds) {
        limit = lim;
        timer = new Timer();
        timer.scheduleAtFixedRate(this, new Date(), seconds * 1000);
        itemQueue = itemQ;
    }

    public void run() {
        T item = itemQueue.peek();
        if (item != null) {
            if (count.incrementAndGet() <= limit) {
                System.out.println("Extracting Item : " + itemQueue.poll());
            } else {
                System.out
                        .println("Consumed : " + (count.get() - 1) + " items");
                timer.cancel();
            }

        }
    }

    public static void main(String args[]) throws InterruptedException {
        ConcurrentLinkedQueue<Integer> itemQ = new ConcurrentLinkedQueue<Integer>();
        ConsumerTask<Integer> ct = new ConsumerTask<Integer>(itemQ, 10, 1);

        new Thread(new Producer<Integer>(itemQ, new IntegerAddItem(itemQ), 20))
                .start();
        new Thread(ct).start();

    }
}

abstract class AddItem<T> {
    ConcurrentLinkedQueue<T> itemQ;
    T t;

    public AddItem(ConcurrentLinkedQueue<T> itemQ) {
        this.itemQ = itemQ;
    }

    abstract boolean addItem();

    public boolean addItem(T t) {
        return itemQ.add(t);
    }
}

class IntegerAddItem extends AddItem<Integer> {
    public IntegerAddItem(ConcurrentLinkedQueue<Integer> itemQ) {
        super(itemQ);
    }

    AtomicInteger item = new AtomicInteger(0);

    @Override
    boolean addItem() {
        return addItem(item.incrementAndGet());
    }

}

class Producer<T> implements Runnable {
    private final ConcurrentLinkedQueue<T> itemQueue;
    AtomicInteger item = new AtomicInteger(0);
    AtomicLong count = new AtomicLong(0);
    AddItem<T> addMethod;
    final long limit;

    public Producer(ConcurrentLinkedQueue<T> itemQ, AddItem<T> addMethod,
            long limit) {
        itemQueue = itemQ;
        this.limit = limit;
        this.addMethod = addMethod;
    }

    public void run() {
        while (count.getAndIncrement() < limit) {
            addMethod.addItem();
            try {
                Thread.sleep(new Random().nextInt(5000));
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                Thread.currentThread().interrupt();
            }

        }
    }
}
clinton
  • 612
  • 3
  • 6
0

you said that the consumer will extract all the items when the timer is executed.

you should take care that the operation of extracting all item from the queue is not blocking operation, it is a repetition of poll() blocking method call, this means that at a time of extracting the items producers will be able to add items to the queue.

Akram Berkawy
  • 4,920
  • 2
  • 19
  • 27
  • doesn't poll is synchronized? i suggested poll and not take, since it is not blocking when the queue is empty, and returns null. I suggested put, since it is blocking and the producers will sleep if the queue is full. – Michael Oct 22 '12 at 15:20
  • Depending on the implementation of the queue there will be more or less locking. poll/put is typically the way to go, although you can optimize using drainTo and offer... – iwein Oct 22 '12 at 15:26
  • I would like to use such a method like drainTo, although according to the specification it seems that this method is not safe. – Michael Oct 22 '12 at 15:28
  • poll is synchronized but what i mean if your implementation of the consumer was: while item = queue->poll() if item is null break doWork(item) endwhile such an implementation may cause the loop to be infinite. – Akram Berkawy Oct 22 '12 at 15:31