I am collecting twitter stream data using this python code https://github.com/sridharswamy/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka/blob/master/app.py
After that, I run this code to create streaming context and to store the data in MongoDB.
def main():
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")
kstream = KafkaUtils.createDirectStream(
ssc, topics = ['topic1'], kafkaParams = {"metadata.broker.list":
'localhost:9092'})
tweets = kstream.map(lambda x: x[1].encode("ascii", "ignore"))
#................insert in MonGODB.........................
db.mynewcollection.insert_one(tweets)
ssc.start()
ssc.awaitTerminationOrTimeout(100)
ssc.stop(stopGraceFully = True)
if __name__=="__main__":
urllib3.contrib.pyopenssl.inject_into_urllib3()
connection = pymongo.MongoClient('....',...)
db = connection['twitter1']
db.authenticate('..','...')
main()
but I got this error:
TypeError: document must be an instance of dict, bson.son.SON, bson.raw_bson.RawBSONDocument, or a type that inherits from collections.MutableMapping
I also tried to use 'foreachRDD' and create function 'save'
tweets.foreachRDD(Save)
and I moved the 'insert' to this function
def Save(rdd):
if not rdd.isEmpty():
db.mynewcollection.insert_one(rdd)
but it does not work
TypeError: can't pickle _thread.lock objects
Can anyone help me to know how to store the streaming data in MongoDB