1

I am trying to write a spark streaming application using Spark Python API.

The application should read text files from local directory and send it to Kafka cluster.

When submitting the python script to spark engine, nothing sent to kafka at all.

I tried to print the events instead of send it to Kafka and found that there is nothing read.

Here is the code of the script.

#!/usr/lib/python
# -*- coding: utf-8 -*-

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from kafka import KafkaProducer
import sys 
import time
reload(sys)
sys.setdefaultencoding('utf8')


producer = KafkaProducer(bootstrap_servers="kafka-b01.css.org:9092,kafka-b02.css.org:9092,kafka-b03.css.org:9092,kafka-b04.css.org:9092,kafka-b05.css.org:9092")


def send_to_kafka(rdd):
    tweets = rdd.collect()
    print ("--------------------------")
    print (tweets)
    print "--------------------------"
    #for tweet in tweets:
    #    producer.send('test_historical_job', value=bytes(tweet))


if __name__ == "__main__":

    conf = SparkConf().setAppName("TestSparkFromPython")

    sc = SparkContext(conf=conf)

    ssc = StreamingContext(sc, 1)

    tweetsDstream = ssc.textFileStream("/tmp/historical/")

    tweetsDstream.foreachRDD(lambda rdd: send_to_kafka(rdd))
    ssc.start()
    ssc.awaitTermination()

I am submitting the script using this command

./spark-submit --master spark://spark-master:7077 /apps/historical_streamer.py

The output of the print statement is an empty list.

--------------------------
[]
--------------------------

EDIT

based on this question I changed the path of the data directory from "/tmp/historical/" to "file:///tmp/historical/".

I tried to run the job first and then move files to the directory but unfortunately it did not work also.

Community
  • 1
  • 1
Fanooos
  • 2,718
  • 5
  • 31
  • 55
  • Having the same issue in windows, newly atomically moved files are not identified by Spark Stream. But the same code works and picks local files well in Linux. Puzzled! – Sudheer Palyam Feb 21 '17 at 17:46
  • tried freshly creating a new file in stream directly (in windows) and Spark Stream picked it properly. So i guess the problem with copy/move where the file modified timestamp still refers to older time. – Sudheer Palyam Feb 21 '17 at 17:52

2 Answers2

1

File stream based sources like fileStream or textFileStream expect data files to be:

be created in the dataDirectory by atomically moving or renaming them into the data directory.

If there are no new files in a given window there is nothing to proces so pre-existing files (it seems to be the case here) won't be read on won't show on the output.

10465355
  • 4,481
  • 2
  • 20
  • 44
zero323
  • 322,348
  • 103
  • 959
  • 935
0

Your function:

def send_to_kafka(rdd):
tweets = rdd.collect()
print ("--------------------------")
print (tweets)
print "--------------------------"
#for tweet in tweets:
#    producer.send('test_historical_job', value=bytes(tweet))

will collect all the rdd, but it won't print the content of the rdd. To do so, you should use the routine:

tweets.foreach(println)

that will, for every element in the RDD, give as output the elements. As explained in the Spark Documentation

Hope this will help

salvob
  • 1,300
  • 3
  • 21
  • 41
  • Thnx Nome for the answer but this did not help. Actually, my understanding to the docs you referred to is against what you are said. I am using one machine, the file contains only two lines, and I am trying to print it. What I understood from the docs, that using collect method with a big data will cause out of memory exception on the driver machine because this method is executed on the driver machine not on a cluster. – Fanooos Apr 17 '16 at 14:42