3

I am building an app using Play Framework 2.5.0 and ReactiveMongo and I am spending a lot of time, stuck, on something that would be very easy to do in most web languages.

That thing is inserting many documents at once. To do so, I must use the ReactiveMongo function bulkInsert.

I found this google group that had a very simple example, however it is from 2013 and now the signature changed

from

def bulkInsert[T](enumerator: Enumerator[T]) 

to

def bulkInsert(documents: Stream[P.Document], ordered: Boolean, writeConcern: WriteConcern)(implicit ec: ExecutionContext): Future[MultiBulkWriteResult]

So here I tried to take that example and find a way to convert Enumerator to a Stream (did not find any way to do so) :

val schemasDocs: Seq[JsObject] = {
  jsonSchemas.fields.map {
    case (field, value) => Json.obj(field -> value)
  }
}
val enumerator = Enumerator.enumerate(schemasDocs)
val schemasStream = Source.fromPublisher(Streams.enumeratorToPublisher(enumerator)) // my attempt to turn enumerator into a Stream
val schemasInsert = {
  getCollection("schemas").flatMap(
    _.bulkInsert(schemasStream, true)
  )
}

Now I find myself diving in the Akka, ReactiveMongo and Play API to try and create a Stream of JsObjects from a Seq of JsObjects..

Then I tried a different approach: the example from the ReactiveMongo's website

val bulkDocs = schemasDocs.map(implicitly[collection.ImplicitlyDocumentProducer](_))
collection.bulkInsert(ordered=true)(bulkDocs: _*)

gives me an error that is as hard to debug :

type mismatch; found : Seq[reactivemongo.play.json.collection.JSONCollection#ImplicitlyDocumentProducer] required: Seq[x$48.ImplicitlyDocumentProducer]

I would rather not use Streams and use the second solution, as I don't like to have things I don't understand in my code..

cchantep
  • 9,118
  • 3
  • 30
  • 41
Daniel
  • 1,172
  • 14
  • 31
  • Please have a look at the [examples](https://github.com/ReactiveMongo/ReactiveMongo/blob/0.11.x/driver/src/test/scala/BSONCollectionSpec.scala#L72). – cchantep Jul 04 '16 at 19:50
  • I still get that same error : type mismatch; found : Seq[reactivemongo.play.json.collection.JSONCollection#ImplicitlyDocumentProducer] required: Seq[x$12.ImplicitlyDocumentProducer], I really don't get it – Daniel Jul 05 '16 at 08:55
  • Your code is not sufficiently complete to understand it – cchantep Jul 05 '16 at 09:47
  • I had also the "type mismatch" issue with the ImplicitlyDocumentProducer, I fixed this by adding the corresponding object with the implicit Json formatter. `object Person {` `implicit val formatter = Json.format[FuelStation]` `}` – Benoit Feb 13 '17 at 14:05
  • for json `implicit val formatter = Json.format[FuelStation]` or for bson : `implicit val writer = Macros.writer[Person]` – Benoit Feb 13 '17 at 14:31

1 Answers1

2

I just found how to handle bulkInsert. There is an example

build.sbt

...
libraryDependencies ++= Seq(
  "org.reactivemongo" %% "play2-reactivemongo" % "0.11.14"
)
...

plugins.sbt

addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.12")

CxTransactionsRepository.scala

package cx.repository

import cx.model.CxTransactionEntity
import play.modules.reactivemongo.ReactiveMongoApi
import reactivemongo.play.json.collection.JSONCollection

import scala.concurrent.{ExecutionContext, Future}

class CxTransactionsRepository @Inject()(val reactiveMongoApi: ReactiveMongoApi)(implicit ec: ExecutionContext){

  private val cxTransactionsCollectionFuture: Future[JSONCollection] = reactiveMongoApi.database.map(_.collection[JSONCollection]("cxTransactions"))

  def bulkInsert(seq: Seq[CxTransactionEntity]): Future[Int] = {
    for {
      transactions <- cxTransactionsCollectionFuture
      writeResult <- transactions.bulkInsert(ordered = false)(seq.map(implicitly[transactions.ImplicitlyDocumentProducer](_)): _*)
    } yield {
      writeResult.n
    }
  }

}
mgosk
  • 1,874
  • 14
  • 23