1

I have two dataframes in Scala:

df1 =

ID  start_date_time
1   2016-10-12 11:55:23
2   2016-10-12 12:25:00
3   2016-10-12 16:20:00

and

df2 =

PK  start_date
1   2016-10-12
2   2016-10-14

I need to add a new column to df1 that will have value 0 if the following condition fails, otherwise -> 1:

If ID == PK and start_date_time refers to the same year, month and day as start_date.

The result should be this one:

df1 =

ID  start_date_time      check
1   2016-10-12-11-55-23  1
2   2016-10-12-12-25-00  0
3   2016-10-12-16-20-00  0

How can I do it?

I assume that the logic should be something like this:

    df1 = df.withColumn("check", define(df("ID"),df("start_date")))

    val define = udf {(id: String,dateString:String) =>
       val formatter = new SimpleDateFormat("yyyy-MM-dd")
       val date = formatter.format(dateString)
       val checks = df2.filter(df2("PK")===ID).filter(df2("start_date_time")===date)
       if(checks.collect().length>0) "1" else "0"
    }

However, I have doubts regarding how to compare dates, because df1 and df2 have differently formatted dates. How to better implement it?

user7379562
  • 349
  • 2
  • 6
  • 15
  • Don't use the SimpleDateFormat as it's not thread-safe. Also, can you convert the 2 dates into timestamp ? Finally, you will probably need to join the 2 before running a row-level check (with the map operator). – Guillaume Jan 17 '17 at 16:21
  • @GuillaumeG: Thanks for your indications. I appreciate if you could show an example in the answer. – user7379562 Jan 17 '17 at 16:25

2 Answers2

1

I don't have the exact logic I would do something like that:

val df3 = df2.
join(df1,df1("ID") === df2("ID")).
filter( ($"start_date_time").isBefore($"start_date") )

You will need to convert the 2 timestamp to joda time using this: Converting a date string to a DateTime object using Joda Time library

Good luck !

Community
  • 1
  • 1
Guillaume
  • 1,277
  • 2
  • 13
  • 21
1

You can use spark datetime functions to create date columns on both df1 and df2 and then do a left join on df1, df2, here you create an extra constant column check on df2 to indicate if there is a match in the result:

import org.apache.spark.sql.functions.lit

val df1_date = df1.withColumn("date", to_date(df1("start_date_time")))
val df2_date = (df2.withColumn("date", to_date(df2("start_date"))).
                    withColumn("check", lit(1)).
                    select($"PK".as("ID"), $"date", $"check"))

df1_date.join(df2_date, Seq("ID", "date"), "left").drop($"date").na.fill(0).show

+---+--------------------+-----+
| ID|     start_date_time|check|
+---+--------------------+-----+
|  1|2016-10-12 11:55:...|    1|
|  2|2016-10-12 12:25:...|    0|
|  3|2016-10-12 16:20:...|    0|
+---+--------------------+-----+
Psidom
  • 209,562
  • 33
  • 339
  • 356
  • The filter logic will not work if you want the ID and date match at the same time, this is what `join` is for. And also spark has comprehensive datetime functions for date time manipulation, I consider it a better practice to use them. – Psidom Jan 17 '17 at 16:42
  • I published a similar question here: http://stackoverflow.com/questions/41703517/create-a-new-column-based-on-date-checking The thing is that indeed I'd like to avoid joining two dataframes. It's explained in my new question. If you could fit this original solution to a new question, I'd appreciate. – user7379562 Jan 17 '17 at 17:37