1

I am using spark-sql-2.4.1v with java 1.8.

Have source data as below :

  val df_data = Seq(
  ("Indus_1","Indus_1_Name","Country1", "State1",12789979,"2020-03-01"),
  ("Indus_1","Indus_1_Name","Country1", "State1",12789979,"2019-06-01"),
  ("Indus_1","Indus_1_Name","Country1", "State1",12789979,"2019-03-01"),

  ("Indus_2","Indus_2_Name","Country1", "State2",21789933,"2020-03-01"),
  ("Indus_2","Indus_2_Name","Country1", "State2",300789933,"2018-03-01"),

  ("Indus_3","Indus_3_Name","Country1", "State3",27989978,"2019-03-01"),
  ("Indus_3","Indus_3_Name","Country1", "State3",30014633,"2017-06-01"),
  ("Indus_3","Indus_3_Name","Country1", "State3",30014633,"2017-03-01"),

  ("Indus_4","Indus_4_Name","Country2", "State1",41789978,"2020-03-01"),
  ("Indus_4","Indus_4_Name","Country2", "State1",41789978,"2018-03-01"),

  ("Indus_5","Indus_5_Name","Country3", "State3",67789978,"2019-03-01"),
  ("Indus_5","Indus_5_Name","Country3", "State3",67789978,"2018-03-01"),
  ("Indus_5","Indus_5_Name","Country3", "State3",67789978,"2017-03-01"),

  ("Indus_6","Indus_6_Name","Country1", "State1",37899790,"2020-03-01"),
  ("Indus_6","Indus_6_Name","Country1", "State1",37899790,"2020-06-01"),
  ("Indus_6","Indus_6_Name","Country1", "State1",37899790,"2018-03-01"),

  ("Indus_7","Indus_7_Name","Country3", "State1",26689900,"2020-03-01"),
  ("Indus_7","Indus_7_Name","Country3", "State1",26689900,"2020-12-01"),
  ("Indus_7","Indus_7_Name","Country3", "State1",26689900,"2019-03-01"),

  ("Indus_8","Indus_8_Name","Country1", "State2",212359979,"2018-03-01"),
  ("Indus_8","Indus_8_Name","Country1", "State2",212359979,"2018-09-01"),
  ("Indus_8","Indus_8_Name","Country1", "State2",212359979,"2016-03-01"),

  ("Indus_9","Indus_9_Name","Country4", "State1",97899790,"2020-03-01"),
  ("Indus_9","Indus_9_Name","Country4", "State1",97899790,"2019-09-01"),
  ("Indus_9","Indus_9_Name","Country4", "State1",97899790,"2016-03-01")
  ).toDF("industry_id","industry_name","country","state","revenue","generated_date");

Query :

val distinct_gen_date = df_data.select("generated_date").distinct.orderBy(desc("generated_date"));

For each "generated_date" in list distinct_gen_date , need to get all unique industry_ids for 6 months data

val cols = {col("industry_id")}
 val ws = Window.partitionBy(cols).orderBy(desc("generated_date"));

val newDf = df_data
                .withColumn("rank",rank().over(ws))
                .where(col("rank").equalTo(lit(1)))
                //.drop(col("rank"))
                .select("*");

How to get moving aggregate (on unique industry_ids for 6 months data ) for each distinct item , how to achieve this moving aggregation.

more details :

Example, in the given sample data given , assume, is from "2020-03-01" to "2016-03-01". if some industry_x is not there in "2020-03-01", need to check "2020-02-01" "2020-01-01","2019-12-01","2019-11-01","2019-10-01","2019-09-01" sequentically whenever we found thats rank-1 is taken into consider for that data set for calculating "2020-03-01" data......we next go .."2020-02-01" i.e. each distinct "generated_date".. for each distinct date go back 6 months get unique industries ..pick rank 1 data...this data for ."2020-02-01"...next pick another distinct "generated_date" and do same so on .....here dataset keep changing....using for loop I can do but it is not giving parallesm..how to pick distinct dataset for each distinct "generated_date" parallell ?

BdEngineer
  • 2,929
  • 4
  • 49
  • 85
  • 1
    So if I understand correctly, for each unique "generated_date" you want to get a list of all "industry_id" that have a "generated_date" within 6 months? – Shaido Apr 16 '20 at 02:37

1 Answers1

1

I don't know how to do this with window functions but a self join can solve your problem.

First, you need a DataFrame with distinct dates:

val df_dates = df_data
  .select("generated_date")
  .withColumnRenamed("generated_date", "distinct_date")
  .distinct()

Next, for each row in your industries data you need to calculate up to which date that industry will be included, i.e., add 6 months to generated_date. I think of them as active dates. I've used add_months() to do this but you can think of different logics.

import org.apache.spark.sql.functions.add_months
val df_active = df_data.withColumn("active_date", add_months(col("generated_date"), 6))

If we start with this data (separated by date just for our eyes):

  industry_id     generated_date
(("Indus_1", ..., "2020-03-01"),

 ("Indus_1", ..., "2019-12-01"),
 ("Indus_2", ..., "2019-12-01"),

 ("Indus_3", ..., "2018-06-01"))

It has now:

  industry_id     generated_date active_date
(("Indus_1", ..., "2020-03-01", "2020-09-01"),

 ("Indus_1", ..., "2019-12-01", "2020-06-01"),
 ("Indus_2", ..., "2019-12-01", "2020-06-01")

 ("Indus_3", ..., "2018-06-01", "2018-12-01"))

Now proceed with self join based on dates, using the join condition that will match your 6 month period:

val condition: Column = (
  col("distinct_date") >= col("generated_date")).and(
  col("distinct_date") <= col("active_date"))

val df_joined = df_dates.join(df_active, condition, "inner")

df_joined has now:

  distinct_date industry_id     generated_date active_date
(("2020-03-01", "Indus_1", ..., "2020-03-01", "2020-09-01"),
 ("2020-03-01", "Indus_1", ..., "2019-12-01", "2020-06-01"),
 ("2020-03-01", "Indus_2", ..., "2019-12-01", "2020-06-01"),

 ("2019-12-01", "Indus_1", ..., "2019-12-01", "2020-06-01"),
 ("2019-12-01", "Indus_2", ..., "2019-12-01", "2020-06-01"),

 ("2018-06-01", "Indus_3", ..., "2018-06-01", "2018-12-01"))

Drop that auxiliary column active_date or even better, drop duplicates based on your needs:

val df_result = df_joined.dropDuplicates(Seq("distinct_date", "industry_id"))

Which drops the duplicated "Indus_1" in "2020-03-01" (It appeared twice because it's retrieved from two different generated_dates):

  distinct_date industry_id
(("2020-03-01", "Indus_1"),
 ("2020-03-01", "Indus_2"),

 ("2019-12-01", "Indus_1"),
 ("2019-12-01", "Indus_2"),

 ("2018-06-01", "Indus_3"))
Civyshk
  • 194
  • 1
  • 7
  • 1
    I don't know the shuffle cost of window function, anyway I didn't find any logic to do it with a window. However, the join method joins 2 DFs with the same origin (so I called it "self" join), so maybe some clever repartition minimizes the shuffle. – Civyshk Apr 22 '20 at 14:15
  • 1
    No, I'm not fluent with the Dataset creation nor Encoders. Post a new question if you need Dataset or try to createDataFrame() if you can live with Dataset. – Civyshk Apr 24 '20 at 14:33
  • can you help and suggest how to handle this https://stackoverflow.com/questions/62036791/while-writing-to-hdfs-path-getting-error-java-io-ioexception-failed-to-rename – BdEngineer May 27 '20 at 06:36