0

I am currently working on a priority workflow use case where I have to implement the producer consumer logic. The use case is - Different sets of jobs which are classified into 3 types that go into the blocking queue, and there are 3 threads or a thread group for consuming.

Jobs in the Queue

a1, a2, a3...an, b1,b2,b3...bn c1,c2,c3...cn, d1..., e1.....

Consumer Thread

CT1, CT2, CT3

My problem is how can I co-ordinate this Consumer Thread or Group so that:

CT1 process a1-an jobs

CT2 process b1-bn jobs

CT3 process c1-cn jobs

. . . . and more threads for a new set of jobs.

Please provide any pointers for the approach.

wolfcastle
  • 5,850
  • 3
  • 33
  • 46
Himansu
  • 21
  • 4

2 Answers2

1

You could also distribute the jobs to different queues and let the different threads or threadpools look in the different queues for jobs.

Jobs in queue a: a1, a2, ..., an

Jobs in queue b: b1, b2, ..., bn

Jobs in queue c: c1, c2, ..., cn

...

Threads, which are executing jobs: CT1, CT2, CT3

CT1 is executing jobs from queue a, CT2 executes jobs from queue b, CT3 executes jobs from queue c.

Soana
  • 711
  • 7
  • 15
  • Thanks for reply Soana, but as per the requirement unfortunately the Queue has to be one and "No of Thread" are configurable. – Himansu May 15 '14 at 08:38
  • In that case my answer won't do, it also describes N queues. I asked a similar question before: see http://stackoverflow.com/q/21491215/1341546 - unfortunately, no answers. – tariksbl May 15 '14 at 12:49
  • Sorry, I couldn't see that from your question – Soana May 15 '14 at 13:39
0

If I understand the problem, you want a worker thread or pool bound to a blockingqueue, and this thread-queue pair to be created on demand as new work types are added.

If this is the case you could

  • Define a WorkHandler class that pairs a blockingqueue with a thread pool, configuring the thread pool as a consumer of the queue
  • Use guava LoadingCache or java 8 ConcurrentHashMap (using .computeIfAbsent()) to get or lazy-create a WorkHandler for a given work type
  • To process each new work item, lookup its work type in the cache, and add the item to the WorkHandler it provides.
tariksbl
  • 1,069
  • 6
  • 20