4

We have the use case in Spark where we want to load historical data from our database to Spark and keep on adding new streaming data to Spark, then we can do analysis on the whole up-to-date dataset.

As far as I know, neither Spark SQL nor Spark Streaming can combine the historical data with the streaming data. Then I found the Structured Streaming in Spark 2.0 which seems to be built for this problem. But after some experiment, I still cannot figure it out. Here are my codes:

SparkSession spark = SparkSession
        .builder()
        .config(conf)
        .getOrCreate();

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

// Load historical data from MongoDB
JavaMongoRDD<Document> mongordd = MongoSpark.load(jsc);


// Create typed dataset with customized schema
JavaRDD<JavaRecordForSingleTick> rdd = mongordd.flatMap(new FlatMapFunction<Document, JavaRecordForSingleTick>() {...});
Dataset<Row> df = spark.sqlContext().createDataFrame(rdd, JavaRecordForSingleTick.class);
Dataset<JavaRecordForSingleTick> df1 = df.as(ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));


// ds listens to a streaming data source
Dataset<Row> ds = spark.readStream()
        .format("socket")
        .option("host", "127.0.0.1")
        .option("port", 11111)
        .load();

// Create the typed dataset with customized schema
Dataset<JavaRecordForSingleTick> ds1 = ds
        .as(Encoders.STRING())
        .flatMap(new FlatMapFunction<String, JavaRecordForSingleTick>() {
    @Override
    public Iterator<JavaRecordForSingleTick> call(String str) throws Exception {
    ...
    }
}, ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));


// ds1 and df1 have the same schema. ds1 gets data from the streaming data source, df1 is the dataset with historical data

ds1 = ds1.union(df1);
StreamingQuery query = ds1.writeStream().format("console").start();
query.awaitTermination();

I got an error "org.apache.spark.sql.AnalysisException: Union between streaming and batch DataFrames/Datasets is not supported;" when I union() two datasets.

Could anyone please help me? Am I going to the wrong direction?

zero323
  • 322,348
  • 103
  • 959
  • 935
Xiao Tan
  • 63
  • 4
  • Structured streaming in Spark 2.0 is in Alpha - a lot of stuff isn't supported yet. I'm wondering if you cannot use stateful streaming instead. In stateful streaming you can bootstrap your state with the historical data and then the streaming data is appended in the fashion you like. See this [Databrick's blogpost](https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html) for details. – Glennie Helles Sindholt Oct 03 '16 at 08:50
  • @GlennieHellesSindholt Hi Glennie, thank you for your suggestion. I think mapWithState() is best used to replace/update the current states (key value pairs) with the new streaming data. In my use case, my RDD is not key value paired and there is no need to update the old data. Is it too much to use mapWithState()? – Xiao Tan Oct 03 '16 at 13:36
  • I agree that `mapWithState` is not the obvious choice, if you don't have any sort of aggregation, but then if you don't need the historical data, why do you want it in your stream? – Glennie Helles Sindholt Oct 04 '16 at 09:06

1 Answers1

1

I can't speak for the MongoDB spark connector in terms of supporting this type of functionality and there doesn't seem to be much on Google about it. However, there are other databases in the Spark Database Ecosystem that do. I covered most of what is in the Spark Database Ecosystem in another answer. I can't say precisely which database's easily allow the type of functionality you're looking for, though I know SnappyData and MemSQL are in that list. However, you may need data in a relational form for both.

Community
  • 1
  • 1
plamb
  • 5,636
  • 1
  • 18
  • 31