2

I am running a spark job and at some point I want to connect to an elastic search server to get some data and add them to an RDD. So the code I am using looks like this

 input.mapParitions(records=>{
  val elcon=new ElasticSearchConnection
  val client:TransportClient=elcon.openConnection()
 val newRecs=records.flatMap(record=>{
      val response = client.prepareGet("index" "indexType",
      record.id.toString).execute().actionGet()
       val newRec=processRec(record,reponse)
       newRec
   })//end of flatMap
   client.close()
   newRecs
 })//end of mapPartitions

My problem is that the client.close() command is called before the flatMap operation is finished which results of course into an Exception. The code works if I move the generation and the closing of the connection inside the flatMap, but this would generate a huge amount of connections. Is it possible to make sure that client.close will be called after the flatMap operation is finished?

orestis
  • 932
  • 2
  • 9
  • 23
  • is your problem resolved? – Pranav Shukla May 09 '16 at 12:08
  • Thanks for your suggestion and help. I have considered the alternative you proposed but I make a call to another service as well, so I am not sure how I will use the framework you suggest. For the time being, I have found a suboptimal workaround, using a while loop instead of a map inside the mapPartitions. Although this is generally slow, the bottleneck in my case is the network calls, so parallelism at this stage is not crucial. – orestis May 10 '16 at 00:28
  • This seems to solve this issue: http://stackoverflow.com/questions/36545579/spark-how-to-use-mappartition-and-create-close-connection-per-partition – Stanislav Apr 29 '17 at 21:45

1 Answers1

0

Making a blocking call for each item in your RDD to fetch corresponding ElasticSearch document is causing the problem. It is generally advised to avoid blocking calls.

There is another alternate approach using the ElasticSearch-for-Hadoop's Spark support.

Read the ElasticSearch index/type as another RDD and join it with your RDD.

Include the right version of ESHadoop dependency.

import org.elasticsearch.spark._
val esRdd = sc.esRDD("index/indexType")   //This returns a pair RDD of (_id, Map of all key value pairs for all fields]
input.map(record => (record.id, record))  //Convert your RDD of records to a pair rdd of (id, record) as we want to join based on the id
input.join(esRdd).map(rec => processResponse(rec._2._1, rec._2._2)) // Join the two RDDs based on id column it returns a pair RDD with key=id & value=Pair of matching records (id,(inputrddrecord,esrddrecord))

Hope this helps.

PS: It will still not alleviate the problem of lack of co-location. (i.e. each document with _id will come from different shard of the index). Better approach would have been to achieve co-location of the input RDD and the ES index's documents at the time of creating the ES index.

Pranav Shukla
  • 2,206
  • 2
  • 17
  • 20