0

I am trying to pull the data from a file for a month and then process it.Basically i need to pull data for each month and do some transformations. Since my job runs daily, i want to make use of it and populate data for that month till the run_date.

I have two approaches in mind:

Approach 1:

populate the data for previous month alone. For eg, if my current_date or run_date is in the month of May, i would be populating the data for the month of April. This can be achieved by pulling the month from current_date() and subtracting 1 from it. Something similar to below:

df.filter(month(to_date(col("startDate")))===month(to_date(current_date())-1))

This is just an idea. This code wont achieve what i am trying to do since i am subtracting month part alone and not considering Year part.

But in this case, my job will be running daily to populate same data for the whole month. Doesn't make sense to do it.

Approach 2:

If my current_date is 2020-05-27, i want to pull the data from 2020-05-01 to 2020-05-26. If my current date is 2020-06-01, it should populate the data for the month of May that is from 2020-05-01 to 2020-05-31.

I want to implement Approach 2. The only idea i can think of was to write couple of Case statement to check for dates and accordingly populate it.

Can someone please share some idea on it. Is there any slightly straight forward way.

I am using Spark 1.5

Vaishak
  • 607
  • 3
  • 8
  • 30
  • Out of curiosity, are you perhaps using Scala 2.10? 1.5.0 is the last version for 2.10 and I see no other reason why you would still use 5 year old version of Spark. – Mateusz Kubuszok May 27 '20 at 14:31
  • I was sharing the same curiosity when i started working there. :D. I am working with a banking client and they have integrated their environment(Have a custom platform with custom functions) with Spark 1.5 pretty tightly. So updating spark is not an easy task for them and hence we are stuck with Spark 1.5 for now. – Vaishak May 27 '20 at 14:50
  • Wouldn't date comparising work for you? https://stackoverflow.com/questions/31994997/filtering-a-spark-dataframe-based-on-date - calculate the range that you are interested in and then use `.lt` and similar to process all data with date within that range? – Mateusz Kubuszok May 27 '20 at 15:01
  • Yes...But my confusion lies in how to calculate the range. Depending upon the current_date, the range also changes like i have explained above. – Vaishak May 27 '20 at 15:05
  • Can you check my answer? – Som May 27 '20 at 23:32
  • @SomeshwarKale let me try it out – Vaishak May 28 '20 at 06:42

1 Answers1

1

Check if this helps-

1. Load the testing data

val data =
      """
        |2018-04-07 07:07:17
        |2018-04-07 07:32:27
        |2018-04-07 08:36:44
        |2018-04-07 08:38:00
        |2018-04-07 08:39:29
        |2018-04-08 01:43:08
        |2018-04-08 01:43:55
        |2018-04-09 07:52:31
        |2018-04-09 07:52:42
        |2019-01-24 11:52:31
        |2019-01-24 12:52:42
        |2019-01-25 12:52:42
      """.stripMargin
    val df = spark.read
      .schema(StructType(Array(StructField("startDate", DataTypes.TimestampType))))
      .csv(data.split(System.lineSeparator()).toSeq.toDS())
    df.show(false)
    df.printSchema()

Output-


+-------------------+
|startDate          |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
|2019-01-24 11:52:31|
|2019-01-24 12:52:42|
|2019-01-25 12:52:42|
+-------------------+

root
 |-- startDate: timestamp (nullable = true)

2. Create Filter Column based on the current date

    val filterCOl = (currentDate: String) =>  when(datediff(date_format(lit(currentDate), "yyyy-MM-dd")
      ,date_format(lit(currentDate), "yyyy-MM-01"))===lit(0),
     date_format(col("startDate"), "yyyy-MM") ===
       date_format(concat_ws("-",year(lit(currentDate)), month(lit(currentDate)) -1), "yyyy-MM")
    ).otherwise(to_date(col("startDate"))
     .between(date_format(lit(currentDate), "yyyy-MM-01"), lit(currentDate)))

3. Check when the current data is in between month

 var currentDateStr = "2018-04-08"
    df.filter(filterCOl(currentDateStr)).show(false)

Output-

+-------------------+
|startDate          |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
+-------------------+

4. Check when the current data is the first day of month

currentDateStr = "2018-05-01"
    df.filter(filterCOl(currentDateStr)).show(false)

Output-

+-------------------+
|startDate          |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
+-------------------+

Som
  • 6,193
  • 1
  • 11
  • 22
  • Your case statement works fine! But the problem here is while pulling the last month data, since only month is subtracted here, it pulls data for other years for same month. Eg: in above case, if ```2017 ```data was also there it pulls data for ```April``` month for ```2017``` as well. – Vaishak May 28 '20 at 10:22
  • check updated filter- please upvote + accept if it helps – Som May 28 '20 at 10:45