I am naive in Big data, I am trying to connect kafka to spark. Here is my producer code
import os
import sys
import pykafka
def get_text():
## This block generates my required text.
text_as_bytes=text.encode(text)
producer.produce(text_as_bytes)
if __name__ == "__main__":
client = pykafka.KafkaClient("localhost:9092")
print ("topics",client.topics)
producer = client.topics[b'imagetext'].get_producer()
get_text()
This is printing my generated text on console consumer when I do bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imagetext --from-beginning
Now I want this text to be consumed using Spark and this is my Jupyter code
import findspark
findspark.init()
import os
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /spark-2.1.1-bin-hadoop2.6/spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar pyspark-shell'
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,5)
print('ssc =================== {} {}')
kstream = KafkaUtils.createDirectStream(ssc, topics = ['imagetext'],
kafkaParams = {"metadata.broker.list": 'localhost:9092'})
print('contexts =================== {} {}')
lines = kstream.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop(stopGraceFully = True)
But this is producing output on my Jupyter as
Time: 2018-02-21 15:03:25
-------------------------------------------
-------------------------------------------
Time: 2018-02-21 15:03:30
-------------------------------------------
Not the text that is on my console consumer.. Please help, unable to figure out the mistake.