1

I am looking to fetch and publish data from spark streaming onto cloudant. My code is as follows -

from CloudantPublisher import CloudantPublisher
from CloudantFetcher import CloudantFetcher


import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession

from kafka import KafkaConsumer, KafkaProducer
import json

class SampleFramework():

    def __init__(self):
        pass

    @staticmethod
    def messageHandler(m):
        return json.loads(m.message)

    @staticmethod
    def processData(rdd):

        if (rdd.isEmpty()):
            SampleFramework.logger.info("RDD is empty")
            return

        # Expand
        expanded_rdd = rdd.mapPartitions(CloudantFetcher.fetch)

        expanded_rdd.foreachPartition(CloudantPublisher.publish)

    def run(self, ssc):

        self.ssc = ssc

        directKafkaStream = KafkaUtils.createDirectStream(self.ssc, SUBSCRIBE_QUEUE], \
                                                          {"metadata.broker.list": METADATA, \
                                                          "bootstrap.servers": BOOTSTRAP_SERVERS}, \
                                                          messageHandler= SampleFramework.messageHandler)

        directKafkaStream.foreachRDD(SampleFramework.processData)

        ssc.start()
        ssc.awaitTermination()

Other supporting classes -

from CloudantConnector import CloudantConnector

class CloudantFetcher:
    config = Config.createConfig()
    cloudantConnector = CloudantConnector(config)

    @staticmethod
    def fetch(data):
        final_data = []
        for row in data:
            id = row["id"]
            if(not CloudantFetcher.cloudantConnector.isReady()):
                CloudantFetcher.cloudantConnector.open()
            data_json = CloudantFetcher.cloudantConnector.getOne({"id": id})
            row["data"] = data_json
            final_data.append(row)

        CloudantFetcher.cloudantConnector.close()

        return final_data

class CloudantPublisher:

    config = Config.createConfig()
    cloudantConnector = CloudantConnector(config)

    @staticmethod
    def publish(data):

        CloudantPublisher.cloudantConnector.open()

        CloudantPublisher.cloudantConnector.postAll(data)

        CloudantPublisher.cloudantConnector.close()


from cloudant.client import Cloudant
from cloudant.result import Result
from cloudant.result import QueryResult
from cloudant.document import Document
from cloudant.query import Query
from cloudant.database import CloudantDatabase

import json

class CloudantConnector:

    def __init__(self, config, db_name):

        self.config = config
        self.client = Cloudant(self.config["cloudant"]["credentials"]["username"], self.config["cloudant"]["credentials"]["password"], url=self.config["cloudant"]["host"]["full"])
        self.initialized = False
        self.db_name = self.config["cloudant"]["host"]["db_name"]

    def open(self):
        try:
            self.client.connect()
            self.logger.info("Connection to Cloudant established.")
            self.initialized = True
        except:
            raise Exception("Could not connect to Cloudant! Please verify credentials.")

        self.database = CloudantDatabase(self.client,self.db_name)

        if self.database.exists():
            pass
        else:
            self.database.create()

    def isReady(self):
        return self.initialized

    def close(self):

        self.client.disconnect()

    def getOne(self, query):

        new_filter = query
        query = Query(self.database, selector = query, limit=1)
        results_string = json.dumps(query.result[0][0])
        results_json = json.loads(results_string)

        return results_json

    def postAll(self, docs):

        documents = []
        quantum = self.config["cloudant"]["constants"]["bulk_quantum"]
        count = 0
        for doc in docs:
            document = Document(self.database)
            document["id"] = doc["id"]
            document["data"] = doc["data"]
            documents.append(document)
            count = count + 1
            if(count%quantum==0):
                self.database.bulk_docs(documents)
                documents = []

        if(len(documents)!=0):
            self.database.bulk_docs(documents)
        self.logger.debug("Uploaded document to the Cloudant database.")

My implementation works, but it's slow as compared to what I would expect in the case of not initializing the cloudant connection in each partition and maintaining a static source of these connection threads which can be passed on to each partition to use/ fetched by each partition to use.

My Questions are as follows:

  1. Do I need to create a connection pool with cloudant 2.0 API in python? (It seems that it already exists within the API). If yes, then how should I go about it? The closest I have seen an implementation is this - link, but it's on an outdated cloudant api and does not work with the new one.

  2. If the answer to the above is 'Yes', How can I make this accessible to the workers? I see references to creating serializable, lazily instantiated connection-client objects here. This would mean that I would make a lazily instantiated cloudant connection object in the SampleFramework. How can I do this in Python? Just like given in the spark documentation.

    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    ConnectionPool.returnConnection(connection)
    

If the above is not possible, how do I speed up my operations? The only alternative I can think off is maintaining a single connection on the driver program, collecting the data from all workers and then fetching/uploading the same. This would decrease the number of times I need to connect to cloudant, but would take away the distributed fetching/publishing architecture.

Community
  • 1
  • 1
  • Fyi https://github.com/cloudant/python-cloudant/issues/18. One other option may be to use the [spark-cloudant](https://github.com/cloudant-labs/spark-cloudant) package as that *should* be optimised for reading and writing cloudant documents from spark. – Chris Snow Jan 02 '17 at 14:36
  • I have evaluated the spark-cloudant package, but unfortunately it seems to be more of a database connector meant for static analysis and not the dynamic case (where the data on cloudant keeps on changing and the database needs to queried mulitple times in a real time system). It fetches data using the sqlContext. If I wish to do this fetch operation on different workers, I would need to pass on the context to the workers which is not possible in spark. Although in the case of publishing the data to cloudant on a per RDD basis I can possibly use it. – Sarthak Ahuja Jan 03 '17 at 07:58
  • Ah, ok. Makes sense. One other thought, in CloudantFetcher.fetch, is `id` a Cloudant document id? If so, could you not iterate first to extract a list of all document ids then make a single call to Cloudant `GET /_all_docs?keys=["somekey","someotherkey"]` to return all of those documents? – Chris Snow Jan 03 '17 at 08:14
  • For the fetch operation, since I fetch data based on an ID (incoming from the stream), and update the incoming message, if I use spark-cloudant, I would again need to perform this fetch on my driver program on a per RDD basis as that is where the sqlContext resides. This would have still worked but there are some further issues like collecting the entire data on the driver to get the ids to be retrieved, zipping two rdds where one is in json format and other in PySpark Rows (conversion of RDD to json again fetches the entire data to the driver). Also can't persisting the DB as it is dyanmic. – Sarthak Ahuja Jan 03 '17 at 09:44
  • "Ah, ok. Makes sense. One other thought, in CloudantFetcher.fetch, is id a Cloudant document id? If so, could you not iterate first to extract a list of all document ids then make a single call to Cloudant GET /_all_docs?keys=["somekey","someotherkey"] to return all of those documents?" This is what I have suggested in the end of my question as the only alternative for now, but wanted to know if the connection pool can be used some how. – Sarthak Ahuja Jan 03 '17 at 09:48
  • UPDATE - https://github.com/cloudant/python-cloudant/issues/18 I see I can now make a connection pool with cloudant API 2.0. (figuring out, how to pass it on to workers). – Sarthak Ahuja Jan 06 '17 at 07:58

0 Answers0