1

Have a multiple producer consumer pattern producer1 - > consumer1/producer2 -> consumer2/producer 3, each producer uses a completion service but Im having problems coding the logic for working out when it has finsihed.

The problem is that main puts some (x) tasks on producers 1 queue, this in turn will cause producer 1 to put (y) tasks on producer 2 and , producer 2 will put z tasks on producer 3. x, y and z are different so main cannot just look at producer 3 completion queue and take z futures because main doesn't know z.

So I tried the poison pill idea together with a CountDownLatch initilized to 3, main knows that producer 1 only has x tasks, so I can submit a poison pill at the end , then when producer 1 recieves this it can decrement latch and send a poison pill to producer 2, and producer 2 receives one it decrements latch the send ones to producer 3. When producer 3 receives it it decrements latch. main just does countdownlatch.await() and wont be able to continue until all tasks have completed. But this only works if each executor service is limited to one thread, because when the poison pill is received by a producer it only means all preceding tasks have been started not completed .

So how do I get round this, I think I must be missing an easier solution somewhere along the line.

Paul Taylor
  • 13,411
  • 42
  • 184
  • 351
  • Can you give us some code in form of your types? Does every producer generate the same output type? – hellectronic Nov 14 '11 at 17:25
  • @user294896: I'm using "poison pills" quite regularly but I'm not sure I understand what you mean by *"..because when the poison pill is received by a producer it only means all preceding tasks have been started..."* It's the consumer who should receive the poison pill (not the producer) and then act accordingly: for example by decrementing a latch and/or creating a new producer (if I understood your example correctly). But in any case: you should decide what you do when you *consume* a pill. Merely producing a poison pill shouldn't have any effect. Can you clarify a bit? – TacticalCoder Nov 14 '11 at 17:31
  • @user29489 When I say "received by a producer" it because the class is a producer AND a consumer. SO that class receives work and when its finished passes on new work to a differnt type of producer/consumer. – Paul Taylor Nov 14 '11 at 17:36
  • @user294896: so if I understand correctly you're decrementing a unique latch for all you different *"producer AND consumer"* [i] classes? If so it obviously won't work if you have several threads: you'd at least need 3 pills *for each thread* you want to run in parallel. But anyway I'm not sure I understand what you want to do: do you want all your, say, parallels *"consumer2/producer3"* to start their work simultaneously (ie when all *"consumer1/producer2"* are done consuming)? Thing is: you're not forced to always use the same kind of pills: there can be producer pills, consumer pills, etc. – TacticalCoder Nov 14 '11 at 17:47
  • @user294896 you are overly complicating what Im saying but it doesnt matter because I have a correct answer anyway, thankyou. – Paul Taylor Nov 14 '11 at 20:58

1 Answers1

0

You can shutdown and awaitTermination of each executorService in turn.

ExecutorService[] services = { executor1, executor2, executor3 };
for(ExecutorService service: services) {
    service.shutdown();
    service.awaitTermination(1, TimeUnit.MINUTES);
}
Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • 1
    So it does shutdown as soon as task are submitted to producer 1, awaitTermination() will not return until producer 1 has finished, and at that point it is safe to do shutdown() for producer 2 because it will have recieved all its tasks, and I dont need poison pills or latch or even a Completion Service, great thanks I'll try it now. – Paul Taylor Nov 14 '11 at 17:34
  • Yes, I should have included an explanation. Shutdown prevents new tasks and the service terminates when all tasks have finished. – Peter Lawrey Nov 14 '11 at 17:37