-2

I am using spark-sql-2.4.1v with Java 8.

I have data columns like below

val df_data = Seq(
  ("Indus_1","Indus_1_Name","Country1", "State1",12789979),
  ("Indus_2","Indus_2_Name","Country1", "State2",21789933),
  ("Indus_3","Indus_3_Name","Country1", "State3",21789978),
  ("Indus_4","Indus_4_Name","Country2", "State1",41789978),
  ("Indus_5","Indus_5_Name","Country3", "State3",27789978),
  ("Indus_6","Indus_6_Name","Country1", "State1",27899790),
  ("Indus_7","Indus_7_Name","Country3", "State1",27899790),
  ("Indus_8","Indus_8_Name","Country1", "State2",27899790),
  ("Indus_9","Indus_9_Name","Country4", "State1",27899790)
  ).toDF("industry_id","industry_name","country","state","revenue");

Given the below inputs list :

val countryList = Seq("Country1","Country2");
val stateMap = Map("Country1" -> {"State1","State2"}, "Country2" -> {"State2","State3"});

In spark job , for each country for each state I need to calculate few industries total revenue.

In other languages we do in for loop.

i.e.

for( country <- countryList ){
   for( state <- stateMap.get(country){
   // do some calculation for each state industries
   }
}

In spark , what i understood we should do like this, i.e. all executors not been utilized by doing this. so what is the correct way to handle this ?

BdEngineer
  • 2,929
  • 4
  • 49
  • 85
  • don't know what you're doing here, but obviously using filter and grouping would do what you want. You should read again about tables resp. SQL in Saprk - looks like you're not close to understanding the concept of Dataframes – UninformedUser Apr 20 '20 at 13:24

3 Answers3

1

It really depent on what you want to do, if you don`t need to share state between states(country states), then u should create your DataFrame that each row is (country,state) and then you can control how much rows will be process parallely (num partitions and num cores).

ShemTov
  • 687
  • 3
  • 8
  • 1
    yes, now you can run df_data.foreach() and it`ll run parallel. Please read the spark documentation, it`ll give you good base knowledge. – ShemTov Apr 16 '20 at 10:16
1

You can use flatMapValues to create key-value pairs and then make your calculations in .map step.

scala> val data = Seq(("country1",Seq("state1","state2","state3")),("country2",Seq("state1","state2")))
scala> val rdd = sc.parallelize(data)
scala> val rdd2 = rdd.flatMapValues(s=>s)

scala> rdd2.foreach(println(_))
(country1,state1)
(country2,state1)
(country1,state2)
(country2,state2)
(country1,state3)

Here you can perform operations, I've added # to each state

scala> rdd2.map(s=>(s._1,s._2+"#")).foreach(println(_))
(country1,state1#)
(country1,state2#)
(country1,state3#)
(country2,state1#)
(country2,state2#)
chlebek
  • 2,431
  • 1
  • 8
  • 20
  • 1
    Yes I forgot to add `collect()`, but it doesn't matter in local mode. You can check more details about foreach here https://stackoverflow.com/a/28804763/9687910 – chlebek Apr 15 '20 at 13:32
  • then you need to call `rdd.collect().foreach(println(_))` to see output, otherwise if you call `rdd.foreach(println(_))` the output will be visible on worker stdout logs – chlebek Apr 15 '20 at 13:55
1

I have added few extra rows to your sample data to differentiate aggregation. I have used scala parallel collection, For each country it will get states & then uses those values to filter the given dataframe & then do aggregation, end it will join all the result back.

scala> val df = Seq(
     |   ("Indus_1","Indus_1_Name","Country1", "State1",12789979),
     |   ("Indus_2","Indus_2_Name","Country1", "State2",21789933),
     |   ("Indus_2","Indus_2_Name","Country1", "State2",31789933),
     |   ("Indus_3","Indus_3_Name","Country1", "State3",21789978),
     |   ("Indus_4","Indus_4_Name","Country2", "State1",41789978),
     |   ("Indus_4","Indus_4_Name","Country2", "State2",41789978),
     |   ("Indus_4","Indus_4_Name","Country2", "State2",81789978),
     |   ("Indus_4","Indus_4_Name","Country2", "State3",41789978),
     |   ("Indus_4","Indus_4_Name","Country2", "State3",51789978),
     |   ("Indus_5","Indus_5_Name","Country3", "State3",27789978),
     |   ("Indus_6","Indus_6_Name","Country1", "State1",27899790),
     |   ("Indus_7","Indus_7_Name","Country3", "State1",27899790),
     |   ("Indus_8","Indus_8_Name","Country1", "State2",27899790),
     |   ("Indus_9","Indus_9_Name","Country4", "State1",27899790)
     |   ).toDF("industry_id","industry_name","country","state","revenue")
df: org.apache.spark.sql.DataFrame = [industry_id: string, industry_name: string ... 3 more fields]

scala> val countryList = Seq("Country1","Country2","Country4","Country5");
countryList: Seq[String] = List(Country1, Country2, Country4, Country5)

scala> val stateMap = Map("Country1" -> ("State1","State2"), "Country2" -> ("State2","State3"),"Country3" -> ("State31","State32"));
stateMap: scala.collection.immutable.Map[String,(String, String)] = Map(Country1 -> (State1,State2), Country2 -> (State2,State3), Country3 -> (State31,State32))

scala>

scala> :paste
// Entering paste mode (ctrl-D to finish)

countryList
.par
.filter(cn => stateMap.exists(_._1 == cn))
.map(country => (country,stateMap(country)))
.map{data =>
    df.filter($"country" === data._1 && ($"state" === data._2._1 || $"state" === data._2._2)).groupBy("country","state","industry_name").agg(sum("revenue").as("total_revenue"))
}.reduce(_ union _).show(false)


// Exiting paste mode, now interpreting.

+--------+------+-------------+-------------+
|country |state |industry_name|total_revenue|
+--------+------+-------------+-------------+
|Country1|State2|Indus_8_Name |27899790     |
|Country1|State1|Indus_6_Name |27899790     |
|Country1|State2|Indus_2_Name |53579866     |
|Country1|State1|Indus_1_Name |12789979     |
|Country2|State3|Indus_4_Name |93579956     |
|Country2|State2|Indus_4_Name |123579956    |
+--------+------+-------------+-------------+


scala>

Edit - 1 : Separated Agg code into different function block.

scala> def processDF(data:(String,(String,String)),adf:DataFrame) = adf.filter($"country" === data._1 && ($"state" === data._2._1 || $"state" === data._2._2)).groupBy("country","state","industry_name").agg(sum("revenue").as("total_revenue"))
processDF: (data: (String, (String, String)), adf: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame

scala> :paste
// Entering paste mode (ctrl-D to finish)

countryList.
par
.filter(cn => stateMap.exists(_._1 == cn))
.map(country => (country,stateMap(country)))
.map(data => processDF(data,df))
.reduce(_ union _)
.show(false)


// Exiting paste mode, now interpreting.

+--------+------+-------------+-------------+
|country |state |industry_name|total_revenue|
+--------+------+-------------+-------------+
|Country1|State2|Indus_8_Name |27899790     |
|Country1|State1|Indus_6_Name |27899790     |
|Country1|State2|Indus_2_Name |53579866     |
|Country1|State1|Indus_1_Name |12789979     |
|Country2|State3|Indus_4_Name |93579956     |
|Country2|State2|Indus_4_Name |123579956    |
+--------+------+-------------+-------------+


scala>
Srinivas
  • 8,957
  • 2
  • 12
  • 26
  • 1
    You can try something like this - .reduce((dfa, dfb) -> dfa.union(dfb)), can you please accept or upvote this answer. – Srinivas Apr 21 '20 at 02:02
  • 1
    par will split data into parallel & then use spark session to perform operations. this will run parallel on cluster not on driver and use most of your executors. – Srinivas Apr 21 '20 at 04:19
  • 1
    I have updated above answer, please check & let me know if its not working. – Srinivas Apr 22 '20 at 09:55
  • 1
    We need normal function and I am passing df from map to function – Srinivas Apr 22 '20 at 13:05
  • df (DataFrame) immutable & it is created above with some default values & now we need to apply some transformations parallel, so i am passing df from map to processDF function. – Srinivas Apr 22 '20 at 14:31
  • Check this you will get some idea - https://stackoverflow.com/questions/61388397/pyspark-dataframe-performance-tuning/61389521#61389521 – Srinivas Apr 23 '20 at 14:23