1

When I create a stream from Kafka topic and print its content

    import os
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils

    sc = SparkContext(appName="PythonStreamingKafkaWords")
    ssc = StreamingContext(sc, 10)

    lines = KafkaUtils.createDirectStream(ssc, ['sample_topic'], {"bootstrap.servers": 'localhost:9092'})

    lines.pprint()

    ssc.start()
    ssc.awaitTermination()

I get an empty result

    -------------------------------------------
    Time: 2019-12-07 13:11:50
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:00
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:10
    -------------------------------------------

Meanwhile, it works in the console:

    kafka-console-consumer --topic sample_topic --from-beginning --bootstrap-server localhost:9092

correctly gives me all lines of my text in Kafka topic:

    ham Ok lor... Sony ericsson salesman... I ask shuhui then she say quite gd 2 use so i considering...
    ham Ard 6 like dat lor.
    ham Why don't you wait 'til at least wednesday to see if you get your .
    ham Huh y lei...
    spam    REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode
    spam    This is the 2nd time we have tried 2 contact u. U have won the £750 Pound prize. 2 claim is easy, call 087187272008 NOW1! Only 10p per minute. BT-national-rate.
    ham Will ü b going to esplanade fr home?
    . . . 

What is the proper way to stream data from Kafka topic into Spark streaming app?

Robin Moffatt
  • 30,382
  • 3
  • 65
  • 92
Val
  • 345
  • 2
  • 14

3 Answers3

1

Based on your code ,We can't print the streaming RDD directly and should be printing based on the foreachRDD .DStream.foreachRDD is an "output operator" in Spark Streaming. It allows you to access the underlying RDDs of the DStream to execute actions that do something practical with the data.

What's the meaning of DStream.foreachRDD function?

Note:: Still You can achieve through structured streaming as well. ref : Pyspark Structured streaming processing

Sample working code : This code trying to read the message from kafka topic and printing it. You can change this code based on your requirement.

from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

def handler(message):
    records = message.collect()
    for record in records:
        print(record[1])

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    kvs = KafkaUtils.createDirectStream(ssc, ['topic_name'], {"metadata.broker.list": 'localhost:9192'},valueDecoder=serializer.decode_message)
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Can you please remove Cassandra code from the answer? It's somewhat distracting. And this is not Structured Streaming like you've said – OneCricketeer Dec 30 '19 at 06:31
  • Thanks for sharing your comments. I have removed Cassandra code from this answer. As like you mentioned , this is not structured streaming method and tried to give solution in DSStreaming method itself based on the code in question. Along with DSSreaming answer, I have given structured streaming stack overflow reference link as well. Could you please let me know if you feel still need to be improved will correct it . Thanks !! – Karthikeyan Rasipalay Durairaj Dec 30 '19 at 13:13
0

I recommend to use Spark structured streaming. It's the new generation streaming engine comes with the release of Spark 2. You can check it in this link.

For Kafka integration, you can look at the docs at this link.

c.guzel
  • 173
  • 6
  • 18
  • Ok, you should write your df to console or kafka topic to see the data. Can you check [this section](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#starting-streaming-queries) in the docs. – c.guzel Dec 10 '19 at 14:36
  • I tried this # Subscribe to 1 topic df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "sample_topic") \ .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") but I get AnalysisException: 'Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;' – Val Dec 10 '19 at 14:36
  • It seems kafka dependencies are not available in spark libs directory. Could you please make sure you've add spark+kafka dependencies. Also, you can check deploying section to run with dependencies in this [link](https://spark.apache.org/docs/2.3.1/structured-streaming-kafka-integration.html#deploying) – c.guzel Dec 10 '19 at 14:40
  • I used both: 1) in Jupiter notebook: import os os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.4 pyspark-shell' and 2) command line: spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.4 test.py - no luck. – Val Dec 11 '19 at 13:52
0

The reason that you are not seeing any data in streaming output is because spark streaming starts reading data from latest by default. So if you start your spark streaming application first and then write data to Kafka, you will see output in streaming job. Refer documentation here:

By default, it will start consuming from the latest offset of each Kafka partition

But you can also read data from any specific offset of your topic. Take a look at createDirectStream method here. It takes a dict parameter fromOffsets where you can specify the offset per partition in a dictionary.

I have tested below code with kafka 2.2.0 and spark 2.4.3 and Python 3.7.3:

Start pyspark shell with kafka dependencies:

pyspark --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.0

Run below code:

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
topicPartion = TopicAndPartition('test',0)
fromOffset = {topicPartion: 0}

lines = KafkaUtils.createDirectStream(ssc, ['test'],{"bootstrap.servers": 'localhost:9092'}, fromOffsets=fromOffset)

lines.pprint()

ssc.start()
ssc.awaitTermination()

Also you should consider using Structured Streaming instead Spark Streaming if you have kafka broker version 10 or higher. Refer Structured Streaming documentation here and Structured Streaming with Kafka integration here.

Below is a sample code to run in Structured Streaming. Please use jar version according to your Kafka version and spark version. I am using spark 2.4.3 with Scala 11 and kafka 0.10 so using jar spark-sql-kafka-0-10_2.11:2.4.3.

Start pyspark shell:

pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", "earliest") \
  .load()


df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("console") \
  .start()
wypul
  • 807
  • 6
  • 9