0

I am using below program and runnign this in Anaconda(Spyder) for creating data pipeline from Kafka to Spark streaming & in python

import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from uuid import uuid1
import os


##Step 1: Initialize sparkcontext
spark_context = SparkContext(appName="Transformation Application")

###Step 2: Initialize streaming context
ssc = StreamingContext(spark_context, 5)

def utf8_decoder(s):
    """ Decode the unicode as UTF-8 """
    if s is None:
        return None
    return s.decode('utf-8')

message = KafkaUtils.createDirectStream(ssc,topics=['testtopic'],kafkaParams={"metadata.broker.list":"localhost:9092","key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer","value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"},fromOffsets=None,messageHandler=None,keyDecoder=utf8_decoder,valueDecoder=utf8_decoder)
message
words = message.map(lambda x: x[1]).flatMap(lambda x: x.split(" "))
wordcount=words.map(lambda x: (x,1)).reduceByKey(lambda a,b:a+b)
wordcount.pprint()

When I am printing message, words,wordscount i am getting no proper results,I am getting hexadecimal values .

message
Out[16]: <pyspark.streaming.kafka.KafkaDStream at 0x23f8b1f8248>

wordcount
Out[18]: <pyspark.streaming.dstream.TransformedDStream at 0x23f8b2324c8>

in my topic(testtopic) I am produced string - " Hi Hi Hi how are you doing" then wordcount should give count for each word but it is giving some encoded hexadecimal values

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Sarvendra Singh
  • 109
  • 1
  • 1
  • 9
  • What did you expect the line with just `message` to do? You're printing the Python object, not consuming the stream... Also, Spark has its own json functions, you shouldn't need to (try to) import Spring serializers – OneCricketeer Nov 20 '20 at 15:37
  • @OneCricketeer Sorry I am new to spark and kafka both..it means message output is fine. but printing wordcount and words both also giving TransformedDStream object..While wordcount should give counts of all words like 3 for Hi and 1 for each rest word for Topic with string produced as "my topic(testtopic) I am produced string - " Hi Hi Hi how are you doing"... Also If I have to create data pipeline from kafka>Spark Streaming>MYSQL DB then how can I make sure that data is kafka topic is avilable in Spark where can I see that data in Spark. Please help to guide on this. – Sarvendra Singh Nov 21 '20 at 12:35
  • My point is that you're printing an object, which is a Python "problem", unrelated to spark or Kafka... `wordcount.pprint()` is correct if you want to actually see the data – OneCricketeer Nov 21 '20 at 16:31
  • @OneCricketeer Thanks Sir...Just one last question Can we convert message (Kafka direct stream) into spark data frame ? I have to store my streaming records in my sql.Since spark dataframe can be stored to mysql DB so thats why asking – Sarvendra Singh Nov 22 '20 at 20:57
  • You should be using Structured Streaming if you want to do that. Alternatively, Kafka Connect is provided by Kafka and you can use that to write to mysql as well – OneCricketeer Nov 23 '20 at 01:29
  • @OneCricketeer Thanks Sir I am trying to use now Structured Streaming. I am using Spark structured Streaming to read data from Kafka topic. Spark version(Local windows machine): 2.3.4 kafka version: Confluence 6.6.0(kafka 2.6) Jar Version : Jar -spark-sql-kafka-0-10_2.11-2.3.4.jar (placed in jar folder of spark). Getting this Error.Is my jar version is correct. Py4JJavaError: An error occurred while calling o81.load. : java.lang.NoClassDefFoundError: Could not initialize class – Sarvendra Singh Nov 23 '20 at 12:37
  • Py4JJavaError: An error occurred while calling o81.load. : java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.KafkaSourceProvider$ at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:360) at – Sarvendra Singh Nov 23 '20 at 12:38
  • Don't manually add jar files. Use `PYSPARK_SUBMIT_ARGS` environment variable. For example https://stackoverflow.com/a/58723724/2308683 – OneCricketeer Nov 23 '20 at 16:15
  • @OneCricketeer Thanks Sir now it worked .Now able to read data from kafka and created spark streaming dataframe.But wjhen tried to write in mysql (df.write.jdbc )it is giving error "AnalysisException: "'write' can not be called on streaming Dataset/DataFrame;"" .is there any example or reference code on writing streaming pyspark dataframe into Mysql or any other DB. – Sarvendra Singh Nov 24 '20 at 07:44
  • writeStream, not write – OneCricketeer Nov 24 '20 at 14:20

0 Answers0