2

I have a parser, and after gathering the data for a row, I want to fire an aync function and let it process the row, while the main thread continues on and gets the next row.

I've seen this post: How do I execute two tasks simultaneously and wait for the results in Groovy? but I'm not sure it is the best solution for my situation.

What I want to do is, after all the rows are read, wait for all the async functions to finish before I go on. One concern with using a collection of Promises is that the list could be large (100,000+).

Also, I want to report status as we go. And finally, I'm not sure I want to automatically wait for a timeout (like on a get()), because the file could be huge, however, I do want to allow the user to kill the process for various reasons.

So what I've done for now is record the number of rows parsed (as they occur via rowsRead), then use a callback from the Promise to record another row being finished processing, like this:

def promise = processRow(row)
promise.whenBound {
    rowsProcessed.incrementAndGet()
}

Where rowsProcessed is an AtomicInteger.

Then in the code invoked at the end of the sheet, after all parsing is done and I'm waiting for the processing to finish, I'm doing this:

boolean test = true
while (test) {
    Thread.sleep(1000)  // No need to pound the CPU with this check
    println "read: ${sheet.rowsRead}, processed: ${sheet.rowsProcessed.get()}"
    if (sheet.rowsProcessed.get() == sheet.rowsRead) {
        test = false
    }
}

The nice thing is, I don't have an explosion of Promise objects here - just a simple count to check. But I'm not sure sleeping every so often is as efficient as checking the get() on each Promise() object.

So, my questions are:

  1. If I used the collection of Promises instead, would a get() react and return if the thread executing the while loop above was interrupted with Thread.interrupt()?
  2. Would using the collection of Promises and calling get() on each be more efficient than trying to sleep and check every so often?
  3. Is there another, better approach that I haven't considered?

Thanks!

Community
  • 1
  • 1
user1373467
  • 315
  • 3
  • 12

2 Answers2

3
  1. Call to allPromises*.get() will throw InterruptedException if the waiting (main) thread gets interrupted
  2. Yes, the promises have been created anyway, so grouping them in a list should not impose additional memory requirements, in my opinion.
  3. The suggested solutions with a CountDownLanch or a Phaser are IMO much more suitable than using busy waiting.
Vaclav Pech
  • 1,431
  • 8
  • 6
  • Thanks. Could you clarify a bit? I get what you are saying about busy-waiting and my original solution, but are you also saying that using a Phaser is better than doing a promises*.get()? I liked the CountDownLatch idea b/c it's simple, the Phaser looks powerful with the idea of setting barriers and such, but my need is quite a bit simpler. Both answers here have been useful and informative, thanks! – user1373467 Nov 18 '12 at 02:51
  • I tend to think both solutions, latch or promises*.join(), are equally good. – Vaclav Pech Nov 18 '12 at 07:43
  • I ended up using promises*.get() b/c of the problem with exception handling. I wasn't aware of join(), it looks like that's not on the Promise interface but probably is on the underlying DataflowVariable. Will it act like get() in terms of propagating any underlying exceptions? Is it safe to both register a whenBound closure on a Promise and later use get() or join() (i.e., will the whenBound closure always get called)? Thanks! – user1373467 Nov 18 '12 at 23:41
  • Just as you say, join() is currently not defined on Promises, but it perhaps should. I may introduce such a change soon. join() guarantees the thread will continue once the dataflow variable has been bound and doesn't re-throw potential exceptions. This is where it differs from get(). – Vaclav Pech Nov 19 '12 at 08:03
  • Since DataflowVariables guarantee that all readers will get to see the value you can freely combine get(), join() and whenBound(). – Vaclav Pech Nov 19 '12 at 08:05
  • Sounds like I want get() then, for now, so I know about any underlying exception that was thrown. Thanks! – user1373467 Nov 19 '12 at 13:46
2

An alternative to an AtomicInteger is to use a CountDownLatch. It avoids both the sleep and the large collection of Promise objects. You could use it like this:

latch = new CountDownLatch(sheet.rowsRead)
...
def promise = processRow(row)
promise.whenBound {
    latch.countDown()
}
...
while (!latch.await(1, TimeUnit.SECONDS)) {
    println "read: ${sheet.rowsRead}, processed: ${sheet.rowsRead - latch.count}"
}
ataylor
  • 64,891
  • 24
  • 161
  • 189
  • Thanks for the idea. That almost works. The only problem is, I don't know the number of rowsRead up front, so that count continues to get incremented while the processRow functions on previous rows are executing. I only know when I'm done when I hit my callback for the end of the sheet. – user1373467 Nov 15 '12 at 21:33
  • If you're using Java 7, I think you can use a [`Phaser`](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Phaser.html) in the same way as a `CountDownLatch`, with the ability to register more tasks or rows as they become available. – ataylor Nov 15 '12 at 21:42
  • Thanks for the tip! I'll give that a look. I need to get the 'ol rep above 15 so I can vote up the answer. – user1373467 Nov 15 '12 at 22:06
  • I really appreciate the suggestion. Because of exception handling, the promises*.get() works slightly better for the moment. But your answer was useful, so I did give it a vote. Thanks! – user1373467 Nov 19 '12 at 13:47