0

Dataframe 1:

12345,B,C,2020-08-12,Internet
12345,B,D,2002-11-12,Mobile
12345,B,e,2003-10-12,Lap

Dataframe 2

12345

I have to join Dataframe 1 and Dataframe 2 and produce 1 row per record in the output for each record in DF2. My output should be like below,

Output:

12345,Y,Y,2002-11-12,Mobile

Conditions for columns,

Col 1 - Distinct value

Col 2 - If ALL values in col2 of DF1 == 'B', then populate 'Y' in output else 'N'

Col 3 - If ANY value in col3 of DF1 == 'C', then populate 'Y' in output else 'N'

Col 4 - Take Min date from col4 of DF1

Col 5 - Populate the value corresponding to Min date from Col 5 of DF1

How to achieve this?

1 Answers1

1

You can easily achieve it by using groupBy then agg API.

import org.apache.spark.sql.functions._
import spark.implictis._
val dataframe2 = dataframe1.groupBy("_c0")
 .agg(when(size(array_distinct(collect_list('_c1))) === lit("1") and array_contains(array_distinct(collect_list('_c1)),'B'),lit("Y")).otherwise(lit("N"))
 ,when(array_contains(collect_list('_c1),'C'),lit("Y")).otherwise(lit("N"))
 ,min('_c3).alias("date"))
 

Then join dataframe2.join(dataframe1,Seq("_c0","dateColumn"),"inner") and select all columns from dataframe2(select(dataframe2("*"))) and select only dataframe1(dataframe1("_c4")) you will get the desired result...

note : make sure to alias properly dataframe2 and dataframe1 columns before joining

In group by and aggregation, you cannot access normal columns of parent DF.

Otherwise : you can convert your dataframe to rdd and then convert rdd to pairedRdd<Key,Object> then you can perform reduceByKey or aggregateByKey or groupByKey operations on paired RDD and calculate all aggregations by writing custom code.

kavetiraviteja
  • 2,058
  • 1
  • 15
  • 35
  • Hi, thank you. Could you please let me know how to modify the .agg function for the changes I have added now – murali krishna Aug 29 '20 at 05:01
  • Hi, when you do countDistinct('_c1) === lit(1), if all the values in c1 is 'B' the result will be okay, but I need to set 'Y' only if it is 'B'. There are chances where _c1 will have all 'J" in that column. So taking countdistinct will not work since it will return 1 and condition gets satisfied but really it should not. I need to the check the values of the _c1 whether it is equal to 'B' – murali krishna Aug 29 '20 at 05:52
  • If possible code snippet pls? Or any example of how it is done. – murali krishna Aug 29 '20 at 05:59
  • @muralikrishna please check the edited solution. I have addressed your issues – kavetiraviteja Aug 29 '20 at 06:20
  • 1
    if you want to do it in rdd way which is specified in otherwise part ... you need to explore rdd apis and transformation i have provided enough resources link for you. please go ahead – kavetiraviteja Aug 29 '20 at 06:21
  • ok thank you. I cannot import array_distinct as I'm using spark 2.1 and array_distinct was introduced in 2.4.. any other alternatives for it? – murali krishna Aug 29 '20 at 06:40