4

I have a jobs_queue collection in MongoDB. It's a capped collection which I'm polling using a tailable cursor:

val cur =
  jobsQueue
    .find(Json.obj("done" -> Json.obj("$ne" -> true)))
    .options(QueryOpts().tailable.awaitData)
    .cursor[JsObject]

cur.enumerate() |>>> Iteratee.foreach { queuedDoc =>
  // do some processing and store the results back in the DB
}

This is being called from a regular Scala App, so there's no Akka or Play wrapping at all.

What would be the most appropriate way to make sure the App doesn't exit until I explicitly break out of the Iteratee.foreach? Also, I don't have to use play-iteratees at all if there's a simpler (even if slightly less elegant) way.


P.S. I do ensure the collection is capped:

val jobsQueueMaybe = db.collection[JSONCollection]("jobs_queue")
val jobsQueue: JSONCollection =
  jobsQueueMaybe.stats()
    .flatMap {
      case stats if !stats.capped =>
        jobsQueueMaybe.convertToCapped(size = 1024 * 1024, maxDocuments = None)
      case _ =>
        Future(jobsQueueMaybe)
    }
    .recover { case _ => jobsQueueMaybe.createCapped(size = 1024 * 1024, maxDocuments = None) }
    .map { _ => jobsQueueMaybe }

P.P.S.

I will also appreciate any criticism as to how I've designed this bit of logic, and how I could solve this by rethinking my approach and slightly overhauling the implementation.

Erik Kaplun
  • 37,128
  • 15
  • 99
  • 111
  • Not a scala or play framework player. But the common method here is "event handlers". You need to track when something is emitted from the `Cursor` object. That's the basic key idea. – Neil Lunn Jan 13 '15 at 13:12
  • well reactivemongo+iteratees should already handle that under the hood, no? – Erik Kaplun Jan 13 '15 at 13:16
  • Maybe so. Just told you that this isn't fully my bag. If it's an "event" emitter then it suits a tailable cursor. But really, this is just the base "Java" driver underneath. So whatever suits it's rules will work. No need to over complicate. – Neil Lunn Jan 13 '15 at 13:22
  • Possibly irrelevant but are you sure reactivemongo uses the "base java driver"? I thought it was a completely independent implementation. – Erik Kaplun Jan 13 '15 at 13:23
  • Nah. The Scala and ReactiveMongo implementation both depend on the basic Java Driver implementation which is an official driver. Even out of other community efforts. There is next to nothing now that does not depend on a base driver implemented from the core team, and on any language. That's pretty good really. Never got that support level from Oracle. – Neil Lunn Jan 13 '15 at 13:28
  • @NeilLunn: are you sure? https://github.com/ReactiveMongo/ReactiveMongo/blob/master/project/ReactiveMongo.scala — I'm not seeing the java driver mentioned. – Erik Kaplun Jan 15 '15 at 15:48
  • 1
    @NeilLunn - Your assertion is false. I work with ReactiveMongo every day and have my own version of it that I maintain. RM uses netty to communicate directly with the mongod daemon. This is how it achieves non-blocking I/O. It does not incorporate the Java driver. – Reid Spencer Jan 18 '15 at 18:44

1 Answers1

1

As the current workaround, I changed from Iteratee.foreach to Iteratee.foldM so that each iteration returns a Future; this way it seems that it forces ReactiveMongo to continue the computation until interrupted, as opposed to foreach that just seems to exit too early:

cur.enumerate() |>>> Iteratee.foldM(()) { (acc, queuedDoc) =>
  // always yield something like Future.successful(acc) or an actual `Future[Unit]`
}

then, I just need to wait until the entire program is terminated (which is signalled by something being put into a stopSignal: ConcurrentLinkedQueue:

while (stopSignal.isEmpty) Thread.sleep(1000)

But, while it works well, I'm not particularly in love with that solution.

Maybe my fears are unjustified, but I'd really like a somewhat more authoritative answer as to how I should be solving this.

Erik Kaplun
  • 37,128
  • 15
  • 99
  • 111