I have a simple method that reads from mongo collection of people as stream, and for each person doing some model change and inserting to a new collection:
def processPeople()(implicit m: Materializer): Future[Done] = {
val peopleSource: Source[Person, Future[State]] = collection.find(json())
.cursor[Person]()
.documentSource()
.throttle(50, 1.second)
peopleSource.runForeach(person => insertPerson(person))
}
def insertPerson(person: Person): Future[String] = {
peopleCollection.insert(person) map { _ =>
person.id
} recover {
case WriteResult.Code(11000) =>
println(WriteResult.lastError(_))
logger.error(s"fail to insert duplicate person ${person.id}")
throw DuplicateInsertion("duplicate person")
case _ =>
logger.error(s"fail to insert person ${person.id}")
throw new RuntimeException
}
}
the collection is about 370K documents, and after 200k from some reason I got:
reactivemongo.core.errors.DetailedDatabaseException: DatabaseException['Cursor not found, cursor id: 72642197351' (code = 43)]
and in the mongo console I saw:
I COMMAND [conn45] killcursors: found 0 of 1
does anyone understand why is that happening?