1

I am using twitter stream function which gives a stream. I am required to use Spark writeStream function like:writeStream function link

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

The 'df' needs to be a streaming Dataset/DataFrame. If df is a normal DataFrame, it will give error showing 'writeStream' can be called only on streaming Dataset/DataFrame;

I have already done: 1. get stream from twitter 2. filter and map it to get a tag for each twitt (Positive, Negative, Natural)

The last step is to groupBy tag and count for each and pass it to Kafka.

Do you guys have any idea how to transform a Dstream into a streaming Dataset/DataFrame?

Edited: ForeachRDD function does change Dstream to normal DataFrame. But 'writeStream' can be called only on streaming Dataset/DataFrame. (writeStream link is provided above)

org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame;

DD Jin
  • 355
  • 1
  • 3
  • 15
  • Does this answer your question? [How to convert RDD to DataFrame in Spark Streaming, not just Spark](https://stackoverflow.com/questions/39996549/how-to-convert-rdd-to-dataframe-in-spark-streaming-not-just-spark) – OneCricketeer Nov 13 '19 at 16:31
  • @cricket_007 Thanks for your info. I did it before. I update the question to be more specific. – DD Jin Nov 13 '19 at 17:11
  • Ah, apparently it is not possible... https://stackoverflow.com/questions/49559007/convert-between-streaming-dataset-and-dstream But you can write batches instead https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-the-output-of-batch-queries-to-kafka – OneCricketeer Nov 13 '19 at 19:25
  • What's _"twitter stream function"_? Can you show the code that does this? – Jacek Laskowski Nov 15 '19 at 10:48

1 Answers1

1

how to transform a Dstream into a streaming Dataset/DataFrame?

DStream is an abstraction of a series of RDDs.

A streaming Dataset is an "abstraction" of a series of Datasets (I use quotes since the difference between streaming and batch Datasets is a property isStreaming of Dataset).

It is possible to convert a DStream to a streaming Dataset to keep the behaviour of the DStream.

I think you don't really want it though.

All you need is to take tweets using DStream and save them to a Kafka topic (and you think you need Structured Streaming). I think you simply need Spark SQL (the underlying engine of Structured Streaming).

A pseudo-code would then be as follows (sorry it's been a longer while since I used the old-fashioned Spark Streaming):

val spark: SparkSession = ...
val tweets = DStream...
tweets.foreachRDD { rdd =>
  import spark.implicits._
  rdd.toDF.write.format("kafka")...
}
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420