0

I am collecting twitter stream data using this python code https://github.com/sridharswamy/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka/blob/master/app.py

After that, I run this code to create streaming context and to store the data in MongoDB.

def main():

  conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
  sc = SparkContext(conf=conf)
  ssc = StreamingContext(sc, 10)
  ssc.checkpoint("checkpoint")   
  kstream = KafkaUtils.createDirectStream(
  ssc, topics = ['topic1'], kafkaParams = {"metadata.broker.list": 
  'localhost:9092'})
  tweets = kstream.map(lambda x: x[1].encode("ascii", "ignore"))
  #................insert in MonGODB.........................
  db.mynewcollection.insert_one(tweets)
  ssc.start()
  ssc.awaitTerminationOrTimeout(100)
  ssc.stop(stopGraceFully = True)

if __name__=="__main__":
  urllib3.contrib.pyopenssl.inject_into_urllib3()
  connection = pymongo.MongoClient('....',...)
  db = connection['twitter1']
  db.authenticate('..','...')
  main()

but I got this error:

TypeError: document must be an instance of dict, bson.son.SON, bson.raw_bson.RawBSONDocument, or a type that inherits from collections.MutableMapping

I also tried to use 'foreachRDD' and create function 'save'

tweets.foreachRDD(Save)

and I moved the 'insert' to this function

def Save(rdd):
if not rdd.isEmpty():
    db.mynewcollection.insert_one(rdd)

but it does not work

TypeError: can't pickle _thread.lock objects

Can anyone help me to know how to store the streaming data in MongoDB

ebt_dev
  • 149
  • 1
  • 3
  • 12

1 Answers1

0
  • The first error occurs because you pass distributed object into db.mynewcollection.insert_one.

  • The second error occurs because you initalize database connection on the driver, and in general, connection objects cannot be serialized.

While there exist a number of Spark / MongoDB connectors you should take a look at (Getting Spark, Python, and MongoDB to work together) a generic pattern is to use foreachPartition. Define helper

def insert_partition(xs):
    connection = pymongo.MongoClient('....',...)
    db = connection['twitter1']
    db.authenticate('..','...')
    db.mynewcollection.insert_many(xs)

and then:

def to_dict(s):
    return ... # Convert input to a format acceptable by `insert_many`, for example with json.loads

tweets.map(to_dict) \
    .foreachRDD(lambda rdd: rdd.foreachPartition(insert_partition))
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • I got error ' db.mynewcollection.insert_many(xs) File "C:\Users\1\Miniconda3\lib\site-packages\pymongo\collection.py", line 742, in insert_many blk.execute(self.write_concern.document, session=session) File "C:\Users\1\Miniconda3\lib\site-packages\pymongo\bulk.py", line 414, in execute raise InvalidOperation('No operations to execute') pymongo.errors.InvalidOperation: No operations to execute' – ebt_dev Apr 08 '18 at 19:10
  • You'll have to also parse input to a format that can be accepted by mongo – Alper t. Turker Apr 08 '18 at 19:19
  • could you help on that, please? I tried this ' tweets = kstream.map(lambda x: json.dumps(x[1]).encode("ascii", "ignore") ' but it doesn't work. – ebt_dev Apr 08 '18 at 20:22
  • `kstream.map(json.loads)` if input contains valid JSON documents. – Alper t. Turker Apr 08 '18 at 22:52
  • when I tried `def to_dict(s): return s.map(json.loads) `, I got `AttributeError: 'tuple' object has no attribute 'map'`. so, I tried `json.loads(json.dumps(s)) ` , but I got `TypeError:b' People spend ...." the rest of the tweet text"... ' is not JSON serializable` – ebt_dev Apr 09 '18 at 08:39
  • Your data is not JSON-like. How do you expect to insert it into MongoDB? – Alper t. Turker Apr 09 '18 at 10:29
  • 1
    I noticed that in `on_data` I was sending the tweet text only. I modified it to send the entire tweet content `self.producer.send_messages(b'topic', data.encode('utf-8'))` and it is working now. Thank you. – ebt_dev Apr 10 '18 at 14:59