51

I would like to know if the foreachPartition will results in better performance, due to an higher level of parallelism, compared to the foreach method considering the case in which I'm flowing through an RDD in order to perform some sums into an accumulator variable.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
Beniamino Del Pizzo
  • 873
  • 1
  • 7
  • 19

5 Answers5

41

foreach and foreachPartitions are actions.

foreach(function): Unit

A generic function for invoking operations with side effects. For each element in the RDD, it invokes the passed function . This is generally used for manipulating accumulators or writing to external stores.

Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

example :

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

foreachPartition(function): Unit

Similar to foreach() , but instead of invoking function for each element, it calls it for each partition. The function should be able to accept an iterator. This is more efficient than foreach() because it reduces the number of function calls (just like mapPartitions() ).

Usage of foreachPartition examples:


  • Example1 : for each partition one database connection (Inside for each partition block) you want to use then this is an example usage of how it can be done using scala.
/**
    * Insert in to database using foreach partition.
    *
    * @param sqlDatabaseConnectionString
    * @param sqlTableName
    */
  def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {

    //numPartitions = number of simultaneous DB connections you can planning to give

datframe.repartition(numofpartitionsyouwant)

    val tableHeader: String = dataFrame.columns.mkString(",")
    dataFrame.foreachPartition { partition =>
      // Note : Each partition one connection (more better way is to use connection pools)
      val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
      //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
      partition.grouped(1000).foreach {
        group =>
          val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
          group.foreach {
            record => insertString.append("('" + record.mkString(",") + "'),")
          }

          sqlExecutorConnection.createStatement()
            .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
              + insertString.stripSuffix(","))
      }


      sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
    }
  }
  • Example2 :

Usage of foreachPartition with sparkstreaming (dstreams) and kafka producer

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
    val producer = createKafkaProducer()
    partitionOfRecords.foreach { message =>
      producer.send(message)
    }
    producer.close()
  }
}

Note : If you want to avoid this way of creating producer once per partition, betterway is to broadcast producer using sparkContext.broadcast since Kafka producer is asynchronous and buffers data heavily before sending.


Accumulator samples snippet to play around with it... through which you can test the performance

     test("Foreach - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))
        assert(accum.value == 6L)
      }

      test("Foreach partition - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))
        assert(accum.value == 6L)
      }

Conclusion :

foreachPartition operations on partitions so obviously it would be better edge than foreach

Rule of Thumb :

foreachPartition should be used when you are accessing costly resources such as database connections or kafka producer etc.. which would initialize one per partition rather than one per element(foreach). when it comes to accumulators you can measure the performance by above test methods, which should work faster in case of accumulators as well..

Also... see map vs mappartitions which has similar concept but they are tranformations.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • 3
    An awesome explanation can you please add scenarios where foreach partition will be slower than foreach (in case of lets say Accumulators) as in that scenario foreachpartition will call foreach internally. – Vikram Singh Chandel Mar 28 '18 at 05:45
  • 1
    @RamGhadiyaram do we have a similar functionality available in JAVA. When i try to use grouped() on each partition it does not show any such method available. I am using Spark 2.1.0 – wandermonk Apr 23 '18 at 17:26
  • AFAIK scala its available. so its not availabe in java l you can do normal batch kind of operation. I mean you can do similar kind [of](https://stackoverflow.com/a/36977547/647053) – Ram Ghadiyaram Apr 23 '18 at 18:59
  • @RamGhadiyaram , have 30 paritions and 30 cores , need to copy 15GB data to cassandra , While running SparkJob I only one processor taking all load , other executors not able to participate in processing. by the way i am saving using parquet file format in hdfs , Can you help me – BdEngineer Dec 05 '18 at 07:01
  • print partition length. if its 1(since one partition is taking load) then try to re-partition and then do foeach partition. – Ram Ghadiyaram Dec 05 '18 at 13:44
  • @Ram Ghadiyaram ```` val company_model_vals_df = enriched_company_model_vals_df.repartition(col("model_id"), col("fiscal_quarter"),col("fiscal_year")) writeAsParquet(company_model_vals_df) How to write this using foreachPartition ??? – BdEngineer Jan 16 '19 at 10:10
25

foreach auto run the loop on many nodes.

However, sometimes you want to do some operations on each node. For example, make a connection to database. You can not just make a connection and pass it into the foreach function: the connection is only made on one node.

So with foreachPartition, you can make a connection to database on each node before running the loop.

Bin Wang
  • 2,697
  • 2
  • 24
  • 34
  • 10
    this is still not per node, it is per partition. there can be many more partitions than nodes. If you need a connection per node (more likely per JVM or container in YARN terms), you need some other solution. – user2456600 May 05 '17 at 22:14
  • @user2456600 . Do you have any idea how to have single class per jvm per executer. – donald Nov 08 '17 at 07:05
  • If using Scala, one option is to use a lazy val in an object or class, that would be initialized in the JVM the first time it is referenced. But that also has downsides, if you use multiple threads per executor you have to be careful about the object it points to being thread safe. Also it is difficult to pass runtime initialization params like configuration to the initialization. – user2456600 Apr 03 '18 at 18:04
19

There is really not that much of a difference between foreach and foreachPartitions. Under the covers, all that foreach is doing is calling the iterator's foreach using the provided function. foreachPartition just gives you the opportunity to do something outside of the looping of the iterator, usually something expensive like spinning up a database connection or something along those lines. So, if you don't have anything that could be done once for each node's iterator and reused throughout, then I would suggest using foreach for improved clarity and reduced complexity.

Justin Pihony
  • 66,056
  • 18
  • 147
  • 180
4

The foreachPartition does not mean it is per node activity rather it is executed for each partition and it is possible you may have large number of partition compared to number of nodes in that case your performance may be degraded. If you intend to do a activity at node level the solution explained here may be useful although it is not tested by me

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
deenbandhu
  • 599
  • 5
  • 18
4

foreachPartition is only helpful when you're iterating through data which you are aggregating by partition.

A good example is processing clickstreams per user. You'd want to clear your calculation cache every time you finish a user's stream of events, but keep it between records of the same user in order to calculate some user behavior insights.

Oren
  • 1,796
  • 1
  • 15
  • 17