0

I want to see the data available in the spark streaming dataframe and later part I want to apply business operation on that data.

So far I have tried to convert streaming DataFrame to RDD. Once that object converted into RDD, I want to apply a function with transform the data and also create new column with the schema( for specific message).

dsraw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_kafka_server) \
    .option("subscribe", topic) \
    .load() \
    .selectExpr("CAST(value AS STRING)")


print "type (df_stream)", type(dsraw)
print "schema (dsraw)", dsraw.printSchema()


def show_data_fun(dsraw, epoch_id):
    dsraw.show()

    row_rdd = dsraw.rdd.map(lambda row: literal_eval(dsraw['value']))
    json_data = row_rdd.collect()

    print "From rdd : ", type(json_data)
    print "From rdd : ", json_data[0]
    print "show_data_function_call"


jsonDataQuery = dsraw \
    .writeStream \
    .foreach(show_data_fun)\
    .queryName("df_value")\
    .trigger(continuous='1 second')\
    .start()

print the first JSON message which is in the stream.
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
AP-Big Data
  • 182
  • 1
  • 9
  • The headline of the question and the explanation are pretty different. Can you also rewrite the question to clearly set what output you expect. And why do you think it's an issue with ForEach ? – Tej Jul 18 '19 at 15:37
  • Hi Tej, I am sorry for the confusion, but can you take a look why foreach sink is unable to call the function (show_data_fun). – AP-Big Data Jul 18 '19 at 17:08
  • How do you execute the code? Do you use `pyspark` or execute it directly? – Jacek Laskowski Jul 18 '19 at 19:24
  • Nowadays, I submit my spark jobs using spark-submit in CDH. ```spark2-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 ``` – AP-Big Data Jul 18 '19 at 21:13
  • @AkashPatel Is this similar to what you are trying to achieve?https://stackoverflow.com/questions/23394286/modify-collection-inside-a-spark-rdd-foreach – A.Dev Jul 18 '19 at 21:38
  • @A.Dev similar to this. But my source is DataFrame streaming object so I cannot access this data directly. First I have to expose the data from DataFrame to RDD and then may be I can iterate over it. Here, in this case function "show_data_fun" should be called and convert DataFrame to RDD. But this isn't happening and also function is not called by foreach sink on each batch-epoch. – AP-Big Data Jul 19 '19 at 13:36

0 Answers0