3

I have a Streaming Dataset in Spark with a certain schema. When I want to compute a query on it I call:

StreamingQuery query = querydf
                      .writeStream()
                      .outputMode(OutputMode.Update())
                      .format("console")
                      .start();           

query.awaitTermination();

In this way I can see in the console the result of the query every trigger. How can I write the result DataFrame in Mongo? For Straming Dataset is not possible. Should I convert the streaming Dataset into static Dataset every trigger and then save it? How can I do it?

zero323
  • 322,348
  • 103
  • 959
  • 935
VanBaffo
  • 259
  • 1
  • 2
  • 13
  • Is it possible to use the MongoSpark.save(..) method with structured streaming? – VanBaffo Jun 11 '18 at 12:48
  • It is not. `RDDs` are not exposed with Structured Streaming. – Alper t. Turker Jun 11 '18 at 13:28
  • That's true... I can't find any Java example on ForEachWriter implementation. Do you have any? – VanBaffo Jun 11 '18 at 13:31
  • Java docs provide a template (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/ForeachWriter.html) – Alper t. Turker Jun 11 '18 at 13:35
  • I seen that but I don't know how to configure in the proper way... how to insert the row of the dataframe in the mongodb? – VanBaffo Jun 11 '18 at 13:41
  • The same as you'd do it using plain Java program using [Java driver](https://mongodb.github.io/mongo-java-driver) – Alper t. Turker Jun 11 '18 at 14:01
  • Yes, but the Row has only the field values and not the key-value pairs like a JSON document... so how can I write into the mongodb in the process section in the right key-value pair format? – VanBaffo Jun 11 '18 at 14:34
  • [Spark Row to JSON](https://stackoverflow.com/q/36157810/8371915) – Alper t. Turker Jun 11 '18 at 14:35
  • Sure, this with static Dataframe, I have streaming dataframe and when implementing ForEachWriter I have to implement 3 functions: open, process and close, where process take as input a Row... so I cannot work with the entire dataframe but at row-level – VanBaffo Jun 11 '18 at 14:50

1 Answers1

0

You could create a MongoDbSink:

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Row, SQLContext}

class MongoDbSink(options: Map[String, String]) extends Sink with Logging {

  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
    val schema = data.schema
    val rdd = data.queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row])
    }

    // write RDD to MongoDB!!
  }
}

class MongoDbSinkProvider extends StreamSinkProvider with DataSourceRegister {
  def createSink(sqlContext: SQLContext,
                 parameters: Map[String, String],
                 partitionColumns: Seq[String],
                 outputMode: OutputMode): Sink = {
    new MongoDbSink(parameters)
  }

  def shortName(): String = "my-mongo-sink"
}

And then implement the write to MongoDb how you like.

In the .format() of writeStream specify the path to the MongoDbSinkProvider

bp2010
  • 2,342
  • 17
  • 34