In my spark streaming app, I have many I/O operations, such as codis, hbase, etc. I want to make sure exactly one connection pool in each executor, how can I do this elegantly? Now, I implement some static class dispersedly, this is not good for management. How about centralize them into one class like xxContext, some what like SparkContext, and need I broadcast it? I know it's good to broadcast large read-only dataset, but how about these connection pools? Java or scala are both acceptable.
Asked
Active
Viewed 2,433 times
1
-
i think this anwser can solve your question. [https://stackoverflow.com/questions/40015777/how-to-perform-one-operation-on-each-executor-once-in-spark][1] – shao Sep 20 '19 at 03:24
1 Answers
1
foreachPartition
is best fit
Sample code snippet to it
val dstream = ...
dstream.foreachRDD { rdd =>
//loop through each parttion in rdd
rdd.foreachPartition { partitionOfRecords =>
//1. Create Connection object/pool for Codis, HBase
// Use it if you want record level control in rdd or partion
partitionOfRecords.foreach { record =>
// 2. Write each record to external client
}
// 3. Batch insert if connector supports from an RDD to external source
}
//Use 2 or 3 to write data as per your requirement
}
Another SO Answer for similar use case
check this: Design Patterns for using foreachRDD

Community
- 1
- 1

mrsrinivas
- 34,112
- 13
- 125
- 125
-
DB Connection can not be serialized with state(transient). So, it's recommended to create/maintain connections(or pools) at executor level. Usually per executor one connection does well, Other side database need to serve **executor number of connections** in parallel. All the connections will be independent as executors are independent. So, I am not sure about the idea of centralizing is great. – mrsrinivas Sep 02 '17 at 02:46
-
But I want to maintain a connection pool for each JVM to reduce cost. My confusion is how to centralize them. – wttttt Sep 02 '17 at 03:00
-
Each executor is JVM process. As mentioned in the code you can create db connection pool at **point 1** in the sample code for each JVM. – mrsrinivas Sep 02 '17 at 03:05
-
-
Yes, foreachRDD calls multiple times(If DStream has multiple rdds). Every rdd will have partitions those are exists at worker machine. check this: [Design Patterns for using foreachRDD](https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd) – mrsrinivas Sep 02 '17 at 03:26