0

Let me first inform all of you that I am very new to Spark.

I need to process a huge number of records in a table and when it is grouped by email it is around 1 million. I need to perform multiple logical calculations based on the data set against individual email and update the database based on the logical calculation

Roughly my code structure is like

//Initial Data Load ...
import sparkSession.implicits._
var tableData = sparkSession.read.jdbc(<JDBC_URL>, <TABLE NAME>, connectionProperties).select("email").where(<CUSTOM CONDITION>)

//Data Frame with Records with grouping on email count greater than one
var recordsGroupedBy =tableData.groupBy("email").count().withColumnRenamed("count", "recordcount").filter("recordcount > 1 ").toDF()

//Now comes the processing after grouping against email using processDataAgainstEmail() method 

recordsGroupedBy.collect().foreach(x=>processDataAgainstEmail(x.getAs("email"),sparkSession))

Here I see foreach is not parallelly executed. I need to invoke the method processDataAgainstEmail(,) in parallel. But if I try to parallelize by doing

Hi I can get a list by invoking

val emailList =dataFrameWithGroupedByMultipleRecords.select("email").rdd.map(r => r(0).asInstanceOf[String]).collect().toList

var rdd = sc.parallelize(emailList )

rdd.foreach(x => processDataAgainstEmail(x.getAs("email"),sparkSession))

This is not supported as I can not pass sparkSession when using parallelize.

Can anybody help me with this as in processDataAgainstEmail(,) multiple operations would be performed related to database insert and update and also spark dataframe and spark SQL operations needs to be performed?

To summerize I need to invoke parallelly processDataAgainstEmail(,) with sparksession

In case it is not all possible to pass spark sessions, the method won't be able to perform anything on the database. I am not sure what would be the alternate way as parallelism on email is a must for my scenario.

Lamanus
  • 12,898
  • 4
  • 21
  • 47
Soumen
  • 121
  • 1
  • 3
  • 14
  • You probably need `foreachPartition` check this [post](https://stackoverflow.com/questions/30484701/apache-spark-foreach-vs-foreachpartitions-when-to-use-what). Within foreachPartition you can call `processDataAgainstEmail` and save data to database although you can't use SparkSession. SparkSession is only available on driver's code therefore you need to organise your code in a different way. – abiratsis Feb 23 '20 at 23:38

1 Answers1

0

The forEach is the method the list that operates on each element of the list sequentially, so you are acting on it one at a time, and passing that to processDataAgainstEmail method.

Once you have gotten the resultant list, you then invoke the sc.parallelize on to parallelize the creation of the dataframe from the list of records you created/manipulated in the previous step. The parallelization, as I can see in the pySpark, is the property of creating of the dataframe, not acting the result of any operation.

Senthil Kumaran
  • 54,681
  • 14
  • 94
  • 131
  • Actually I want to work in parallel on the result that is the email list using **processDataAgainstEmail()** method which would have email and spark session as its two parameters.Is that possible ? or any alternate way ? – Soumen Feb 23 '17 at 07:44