1

In my spark streaming app, I have many I/O operations, such as codis, hbase, etc. I want to make sure exactly one connection pool in each executor, how can I do this elegantly? Now, I implement some static class dispersedly, this is not good for management. How about centralize them into one class like xxContext, some what like SparkContext, and need I broadcast it? I know it's good to broadcast large read-only dataset, but how about these connection pools? Java or scala are both acceptable.

wttttt
  • 137
  • 1
  • 9
  • i think this anwser can solve your question. [https://stackoverflow.com/questions/40015777/how-to-perform-one-operation-on-each-executor-once-in-spark][1] – shao Sep 20 '19 at 03:24

1 Answers1

1

foreachPartition is best fit

Sample code snippet to it

val dstream = ...

dstream.foreachRDD { rdd =>

  //loop through each parttion in rdd
  rdd.foreachPartition { partitionOfRecords =>

    //1. Create Connection object/pool for Codis, HBase

    // Use it if you want record level control in rdd or partion
    partitionOfRecords.foreach { record =>
      // 2. Write each record to external client 
    }
    
    // 3. Batch insert if connector supports from an RDD to external source
  }

  //Use 2 or 3 to write data as per your requirement 
}

Another SO Answer for similar use case

check this: Design Patterns for using foreachRDD

Community
  • 1
  • 1
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
  • DB Connection can not be serialized with state(transient). So, it's recommended to create/maintain connections(or pools) at executor level. Usually per executor one connection does well, Other side database need to serve **executor number of connections** in parallel. All the connections will be independent as executors are independent. So, I am not sure about the idea of centralizing is great. – mrsrinivas Sep 02 '17 at 02:46
  • But I want to maintain a connection pool for each JVM to reduce cost. My confusion is how to centralize them. – wttttt Sep 02 '17 at 03:00
  • Each executor is JVM process. As mentioned in the code you can create db connection pool at **point 1** in the sample code for each JVM. – mrsrinivas Sep 02 '17 at 03:05
  • But in spark streaming, will this(foreachRDD) be called multi-times? – wttttt Sep 02 '17 at 03:11
  • Yes, foreachRDD calls multiple times(If DStream has multiple rdds). Every rdd will have partitions those are exists at worker machine. check this: [Design Patterns for using foreachRDD](https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd) – mrsrinivas Sep 02 '17 at 03:26