I am trying to write a spark streaming application using Spark Python API.
The application should read text files from local directory and send it to Kafka cluster.
When submitting the python script to spark engine, nothing sent to kafka at all.
I tried to print the events instead of send it to Kafka and found that there is nothing read.
Here is the code of the script.
#!/usr/lib/python
# -*- coding: utf-8 -*-
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from kafka import KafkaProducer
import sys
import time
reload(sys)
sys.setdefaultencoding('utf8')
producer = KafkaProducer(bootstrap_servers="kafka-b01.css.org:9092,kafka-b02.css.org:9092,kafka-b03.css.org:9092,kafka-b04.css.org:9092,kafka-b05.css.org:9092")
def send_to_kafka(rdd):
tweets = rdd.collect()
print ("--------------------------")
print (tweets)
print "--------------------------"
#for tweet in tweets:
# producer.send('test_historical_job', value=bytes(tweet))
if __name__ == "__main__":
conf = SparkConf().setAppName("TestSparkFromPython")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
tweetsDstream = ssc.textFileStream("/tmp/historical/")
tweetsDstream.foreachRDD(lambda rdd: send_to_kafka(rdd))
ssc.start()
ssc.awaitTermination()
I am submitting the script using this command
./spark-submit --master spark://spark-master:7077 /apps/historical_streamer.py
The output of the print statement is an empty list.
--------------------------
[]
--------------------------
EDIT
based on this question I changed the path of the data directory from "/tmp/historical/"
to "file:///tmp/historical/"
.
I tried to run the job first and then move files to the directory but unfortunately it did not work also.