10

I'm trying to figure out the best approach to call a Rest endpoint from Spark.

My current approach (solution [1]) looks something like this -

val df = ... // some dataframe

val repartitionedDf = df.repartition(numberPartitions)

lazy val restEndPoint = new restEndPointCaller() // lazy evaluation of the object which creates the connection to REST. lazy vals are also initialized once per JVM (executor)

val enrichedDf = repartitionedDf 
.map(rec => restEndPoint.getResponse(rec)) // calls the rest endpoint for every record
.toDF

I know I could have used .mapPartitions() instead of .map(), but looking at the DAG, it looks like spark optimizes the repartition -> map to a mapPartition anyway.

In this second approach (solution [2]), a connection is created once for every partition and reused for all records within the partition.

  val newDs = myDs.mapPartitions(partition => {
  val restEndPoint = new restEndPointCaller /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    restEndPoint.getResponse(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  restEndPoint.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

In this third approach (solution [3]), a connection is created once per JVM (executor) reused across all partitions processed by the executor.

    lazy val connection = new DbConnection /*creates a db connection per partition*/
    val newDs = myDs.mapPartitions(partition => {

          val newPartition = partition.map(record => {
            readMatchingFromDB(record, connection)
          }).toList // consumes the iterator, thus calls readMatchingFromDB 
        
          newPartition.iterator // create a new iterator

        })
    connection.close() // close dbconnection here

[a] With Solutions [1] and [3] which are very similar, is my understanding of how lazy val work correct? The intention is to restrict the number of connections to 1 per executor/ JVM and reuse the open connections for processing subsequent requests. Will I be creating 1 connection per JVM or 1 connection per partition?

[b] Are there any other ways by which I can control the number of requests (RPS) we make to the rest endpoint ?

[c] Please let me know if there are better and more efficient ways to do this.

Thanks!

Nikunj Kakadiya
  • 2,689
  • 2
  • 20
  • 35
Yash
  • 1,080
  • 2
  • 13
  • 24

1 Answers1

5

IMO the second solution with mapPartitions is better. First, you explicitly tells what you're expecting to achieve. The name of the transformation and the implemented logic tell it pretty clearly. For the first option you need to be aware of the how Apache Spark optimizes the processing. And it's maybe obvious to you just now but you should also think about the people who will work on your code or simply about you in 6 months, 1 year, 2 years and so fort. And they should understand better the mapPartitions than repartition + map.

Moreover maybe the optimization for repartition with map will change internally (I don't believe in it but you can still consider is as a valid point) and at this moment your job will perform worse.

Finally, with the 2nd solution you avoid a lot of problems that you can encounter with the serialization. In the code you wrote the driver will create one instance of the endpoint object, serialize it and send to the executors. So yes, maybe it'll be a single instance but only if it's serializable.


[edit] Thanks for clarification. You can achieve what are you looking for in different manners. To have exactly 1 connection per JVM you can use a design pattern called singleton. In Scala it's expressed pretty easily as an object (the first link I found on Google https://alvinalexander.com/scala/how-to-implement-singleton-pattern-in-scala-with-object)

And that it's pretty good because you don't need to serialize anything. The singletons are read directly from the classpath on the executor side. With it you're sure to have exactly one instance of given object.

[a] With Solutions [1] and [3] which are very similar, is my understanding of how lazy val work correct? The intention is to restrict the number of connections to 1 per executor/ JVM and reuse the open connections for processing subsequent requests. Will I be creating 1 connection per JVM or 1 connection per partition? It'll create 1 connection per partition. You can execute this small test to see that:

  class SerializationProblemsTest extends FlatSpec   {
    val conf = new SparkConf().setAppName("Spark serialization problems test").setMaster("local") 
    val sparkContext = SparkContext.getOrCreate(conf)
    "lazy object" should "be created once per partition" in {
      lazy val restEndpoint = new NotSerializableRest()
      sparkContext.parallelize(0 to 120).repartition(12)
        .mapPartitions(numbers => {
           //val restEndpoint = new NotSerializableRest()
           numbers.map(nr => restEndpoint.enrich(nr))
      })
      .collect()
   }
 }  
 class NotSerializableRest() {
   println("Creating REST instance")
   def enrich(id: Int): String = s"${id}"
}

It should print Creating REST instance 12 times (# of partitions)

[b] Are there ways by which I can control the number of requests (RPS) we make to the rest endpoint ?

To control the number of requests you can use an approach similar to database connection pools: HTTP connection pool (one quickly found link: HTTP connection pooling using HttpClient).

But maybe another valid approach would be the processing of smaller subsets of data ? So instead of taking 30000 rows to process, you can split it into different smaller micro-batches (if it's a streaming job). It should give your web service a little bit more "rest".

Otherwise you can also try to send bulk requests (Elasticsearch does it to index/delete multiple documents at once https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html). But it's up to the web service to allow you to do so.

Bartosz Konieczny
  • 1,985
  • 12
  • 27
  • thanks @bartosz25. I've edited the question with additional clarification and questions. Agreed, that solution [2] looks cleaner and more readable, and in all of these cases the driver needs to serialize the endpoint object. – Yash Jul 18 '18 at 16:36
  • Hi @Yash, could you share how did you solve the problem with a little feedback of pros/cons of solutions you've tested ? – Bartosz Konieczny Jul 24 '18 at 13:17
  • Hi, so sorry for the very late response here. I went with approach [3] and it seems to work well for a number of reasons. 1. Persistent open connections - better cpu time usage on servers, request pipelining, network tcp congestion, latency of handshakes. 2. Beyond that, how the data is partitioned doesn't matter unless one can make requests in batches where batch size can be controlled with partitioning sizes. – Yash Mar 20 '21 at 21:29
  • No worries, Yash. Thank you for sharing your feedback! – Bartosz Konieczny Mar 21 '21 at 10:33
  • would a python version of this code be available? – lightbox142 Mar 31 '21 at 01:35
  • Hi @lightbox142 mapPartitions exists in PySpark too. You can find an example here: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapPartitions.html – Bartosz Konieczny Apr 03 '21 at 05:27