0

Consider 2 dataframes holiday df and everyday df with 3 columns as below

  1. Holiday df: (5 records)
Country_code|currency_code| date
Gb          | gbp         | 2022-04-15
Gb          | gbp         | 2022-04-16
US          | usd         | 2022-04-17
Gb          | gbp         | 2022-04-18
Gb          | gbp         | 2022-04-21
  1. Everyday df (4 records)
Country_code_demo|currency_code_demo| date_demo
Gb               | gbp              | 2022-04-14
Gb               | gbp              | 2022-04-15
Gb               | gbp              | 2022-04-16
Gb               | gbp              | 2022-04-18

with columns as country_code,currency_code and date columns. Date columns of both dataframes needs to be Compared based on country_code and currency_code. If date between both dataframes matches then everyday df:date_demo column needs to updated to next working day and the date should also not be present in the holiday df. Write a spark scala code using window functions. Expected output is as below

Country_code_demo|currency_code_demo| date_updated
Gb               | gbp              | 2022-04-14
Gb               | gbp              | 2022-04-17
Gb               | gbp              | 2022-04-17
Gb               | gbp              | 2022-04-19
Dmytro Mitin
  • 48,194
  • 3
  • 28
  • 66
  • 2
    [How do I ask and answer homework questions?](https://meta.stackoverflow.com/questions/334822/how-do-i-ask-and-answer-homework-questions) – Dmytro Mitin Apr 05 '23 at 04:29

1 Answers1

1

The holiday df cannot be that big so I would use a UDF to compute, for each holiday, the next working day. Then I would join that dataframe to the everyday dataframe to obtain the result you seek:

// optionally, you can consider sundays and saturdays as non working days
// val nonWorkingDays = Set(java.time.DayOfWeek.SUNDAY, java.time.DayOfWeek.SATURDAY)
// using an empty set to match your expected result
val nonWorkingDays = Set[java.time.DayOfWeek]()
val nextWorkingDay = udf((dates : WrappedArray[java.sql.Date]) => {
    val dateSet = dates.map(_.toLocalDate).toSet
    dates.map(date => {
        var nextDate = date.toLocalDate.plusDays(1)
        while(dateSet.contains(nextDate) || nonWorkingDays.contains(nextDate.getDayOfWeek))
            nextDate = nextDate.plusDays(1)
        date -> java.sql.Date.valueOf(nextDate)
    })
})

val nextDayDF = holiday
    .groupBy("Country_code", "currency_code")
    .agg(collect_list('date) as "dates")
    .withColumn("date_struct", explode(nextWorkingDay('dates)))
    .drop("dates")
    // renaming columns to simplify the join
    .select($"Country_code" as "Country_code_demo", $"currency_code" as "currency_code_demo",
            $"date_struct._1" as "date_demo", $"date_struct._2" as "date_updated")

everyday
    .join(nextDayDF, Seq("Country_code_demo", "currency_code_demo", "date_demo"), "left")
    .withColumn("date_updated", coalesce('date_updated, 'date_demo))
    .show()
+-----------------+------------------+----------+------------+
|Country_code_demo|currency_code_demo| date_demo|date_updated|
+-----------------+------------------+----------+------------+
|               Gb|               gbp|2022-04-14|  2022-04-14|
|               Gb|               gbp|2022-04-15|  2022-04-17|
|               Gb|               gbp|2022-04-16|  2022-04-17|
|               Gb|               gbp|2022-04-18|  2022-04-19|
+-----------------+------------------+----------+------------+
Oli
  • 9,766
  • 5
  • 25
  • 46