3

I am naive in Big data, I am trying to connect kafka to spark. Here is my producer code

import os
import sys
import pykafka
def get_text():
    ## This block generates my required text. 
      text_as_bytes=text.encode(text)
      producer.produce(text_as_bytes)


if __name__ == "__main__":
    client = pykafka.KafkaClient("localhost:9092")
    print ("topics",client.topics)
    producer = client.topics[b'imagetext'].get_producer()

    get_text() 

This is printing my generated text on console consumer when I do bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imagetext --from-beginning

Now I want this text to be consumed using Spark and this is my Jupyter code

import findspark
findspark.init()
import os
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /spark-2.1.1-bin-hadoop2.6/spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar pyspark-shell'



conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)

ssc = StreamingContext(sc,5)
print('ssc =================== {} {}')

kstream = KafkaUtils.createDirectStream(ssc, topics = ['imagetext'], 
     kafkaParams = {"metadata.broker.list": 'localhost:9092'})

print('contexts =================== {} {}')
lines = kstream.map(lambda x: x[1])
lines.pprint()

ssc.start()
ssc.awaitTermination()
ssc.stop(stopGraceFully = True)

But this is producing output on my Jupyter as

Time: 2018-02-21 15:03:25
-------------------------------------------

-------------------------------------------
Time: 2018-02-21 15:03:30
-------------------------------------------

Not the text that is on my console consumer.. Please help, unable to figure out the mistake.

philantrovert
  • 9,904
  • 3
  • 37
  • 61
  • Is the data from Kafka Client coming in stream? Because if it sends a single file and does nothing thereafter (sends no data after that file), no streaming data is reaching the spark stream. – mayank agrawal Feb 21 '18 at 10:34
  • The data is already on console consumer, as of now no new data is coming. – Bidisha Mukherjee Feb 21 '18 at 10:37
  • I think that is the issue. Spark streaming expects data in a streaming fashion. Kafka should continuously send data to spark. – mayank agrawal Feb 21 '18 at 10:41
  • Try once putting the get_text() in an infinite loop to continuously send data. I use KafkaProducer and put the send in a loop. producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER,acks=ACKS)\n r = requests.get("https://stream.meetup.com/2/rsvps",stream=True)\n for line in r.iter_lines():\n producer.send(TOPIC_KAFKA,line,key=TOPIC_KAFKA,partition=0)\n print line – mayank agrawal Feb 21 '18 at 10:48
  • 2
    Thank you so much. It seems working ! – Bidisha Mukherjee Feb 21 '18 at 10:51

2 Answers2

0

I found another solution to it. While the solution of putting get_text() in a loop works, it is not the right solution. You data was not in continuous fashion when it was sent in Kafka. As a result, Spark streaming should not get it in such a way.

Kafka-python library provides a get(timeout) functionality so that Kafka waits for a request.

producer.send(topic,data).get(timeout=10)

Since you are using pykafka, I am not sure whether it will work. Nevertheless, you can still try once and dont put get_text() in loop.

Giorgos Myrianthous
  • 36,235
  • 20
  • 134
  • 156
mayank agrawal
  • 2,495
  • 2
  • 13
  • 32
0

Just change your port in the consumer from 9092 to 2181 as it is the Zookeeper. From the producer side, it has to be connected to the Kafka with port number 9092. And from the streamer side, it has to be connected to the Zookeeper with port number 2181.