0

I have two data frames that holds values for some people in two different time stamps. The possible changes for a person before and after are listed in the code below.

val before = Seq(
(1, "soccer", "1", "2", "3", "4", ""),
(2, "soccer", "",  "",  "",  "",  ""),
(3, "soccer", "1", "",  "",  "",  ""),
(4, "soccer", "1", "",  "",  "",  ""),
(5, "soccer", "1", "",  "",  "",  ""),
(6, "soccer", "1", "",  "",  "",  "")
).toDF("id", "sport", "var1", "var2", "var3", "var4", "var5")

before.show                   //> +---+------+----+----+----+----+----+
                              //| | id| sport|var1|var2|var3|var4|var5|
                              //| +---+------+----+----+----+----+----+
                              //| |  1|soccer|   1|   2|   3|   4|    |
                              //| |  2|soccer|    |    |    |    |    |
                              //| |  3|soccer|   1|    |    |    |    |
                              //| |  4|soccer|   1|    |    |    |    |
                              //| |  5|soccer|   1|    |    |    |    |
                              //| |  6|soccer|   1|    |    |    |    |
                              //| +---+------+----+----+----+----+----+
                              //| 

val after = Seq(
(1, "soccer", "1", "2", "3", "4", ""), // Same
(2, "soccer", "1", "",  "",  "",  ""), // Addition
(3, "soccer", "1", "1", "",  "",  ""), // Addition
(4, "soccer", "",  "",  "",  "",  ""), // Remove
(5, "soccer", "2", "1", "",  "",  ""), // Slide
(6, "soccer", "2", "",  "",  "",  "")  // Change
).toDF("id", "sport", "var1", "var2", "var3", "var4", "var5")

after.show                    //> +---+------+----+----+----+----+----+
                              //| | id| sport|var1|var2|var3|var4|var5|
                              //| +---+------+----+----+----+----+----+
                              //| |  1|soccer|   1|   2|   3|   4|    |
                              //| |  2|soccer|   1|    |    |    |    |
                              //| |  3|soccer|   1|   1|    |    |    |
                              //| |  4|soccer|    |    |    |    |    |
                              //| |  5|soccer|   2|   1|    |    |    |
                              //| |  6|soccer|   2|    |    |    |    |
                              //| +---+------+----+----+----+----+----+
                              //| 

So things can stay the same, there could be an addition or a removal, and finally there could be a change or a slide.

My ideal output is something that confronts each row in before and after data frames and attach a label:

outcome.show                   //> +---+------+------+
                               //| | id| sport|  diff|
                               //| +---+------+------+
                               //| |  1|soccer|  same|
                               //| |  2|soccer|   add|
                               //| |  3|soccer|   add|
                               //| |  4|soccer|remove|
                               //| |  5|soccer| slide|
                               //| |  6|soccer|change|
                               //| +---+------+------+
                               //| 

This question is related to this one, but the point there was just to count how many differences there was between two rows... This time I am trying to understand these differences with a finer grain, but I am stuck in defining the different possible options.

EDIT

Since I am using DataFrame, I'd like to stick to this structure rather than use case classes. I am thus trying to adapt what has been proposed by @iboss using DataFrame instead.

I have this UDF that should do all the work:

val diff = udf { (bef:DataFrame, aft:DataFrame) => {
  "hello" // return just this string for now
  } : String
}

This udf will do all the work, as suggested by @iboss, to produce the output in outcome.show, so the possible outcome after matching two rows will be a String, more precisely one of "same", "add", "remove", "slide" or "change".

I have then this code to merge the two data frames and create the new column:

val mydiff = before.join(after, "id")
  .withColumn("diff", diff( before, after ) )
  .select("id", "diff")

However, I have an error when calling diff that complains like this:

type mismatch; found : org.apache.spark.sql.DataFrame required: org.apache.spark.sql.Column

What I don't understand is why it doesn't like the DataFrame and how to solve it...

Community
  • 1
  • 1
user299791
  • 2,021
  • 3
  • 31
  • 57

1 Answers1

0

I'm not quite sure what are those vars but if I were you I'd group them into tuple or case class which is easier for further process. It might look like this:

val before = Seq(
    (1, "soccer", ("1", "2", "3", "4", "")),
    (2, "soccer", ("",  "",  "",  "",  "")),
    (3, "soccer", ("1", "",  "",  "",  "")),
    (4, "soccer", ("1", "",  "",  "",  "")),
    (5, "soccer", ("1", "",  "",  "",  "")),
    (6, "soccer", ("1", "",  "",  "",  ""))
).toDF("id", "sport", "vars")


val after = Seq(
    (1, "soccer", ("1", "2", "3", "4", "")),
    (2, "soccer", ("1",  "",  "",  "",  "")),
    (3, "soccer", ("1", "1",  "",  "",  "")),
    (4, "soccer", ("", "",  "",  "",  "")),
    (5, "soccer", ("2", "1",  "",  "",  "")),
    (6, "soccer", ("2", "",  "",  "",  ""))
).toDF("id", "sport", "vars")

Then you could use user-defined function to compute your diff

type MyVars = (String, String, String, String, String)

val diff = udf { (before_vars: MyVars, after_vars: MyVars) =>
    // your implementation of diff function
}

before
    .join(after)
    .withColumn("diff", diff(before("vars"), after("vars")))
    .select("id", "sport", "diff")

Edit

For udf, normally they would do a type inference for you so you may not need to define your type. But if you want to define it, then, you can do it this way

udf { (firstName: String, lastName: String) => s"$firstName $lastName": String }

or with block

udf { (name: String) => {
    val hello = "hello "
    "hello, " + name
}: Int }

and you can also use def

def getFullName(firstName: String, lastName: String): String =
    s"$firstName $lastName"

udf(getFullName _)

since using def is not defining function but method and udf require funcstion. So we need to convert it using partial application notation.

For more detail, you can take a look at this Difference between method and function in Scala

Edit 2

Seems like I misunderstood your question a little bit. The diff udf must be applied to each row separately. So you can not pass the whole DataFrame to it.

I suggest you to group those vars(in each row) into a tuple just because it is easier to read. But if you still want to use the original form then you can do this

val diff = udf { (
    beforeVar1: String, 
    beforeVar2: String, 
    beforeVar3: String, 
    beforeVar4: String, 
    beforeVar5: String, 
    afterVar1: String, 
    afterVar2: String, 
    afterVar3: String, 
    afterVar4: String, 
    afterVar5: String
  ) => {
    "hello" // return just this string for now
  } : String
}

before.join(after, "id")
  .withColumn("diff", diff(
     before("var1"),
     before("var2"),
     before("var3"),
     before("var4"),
     before("var5"),
     after("var1"),
     after("var2"),
     after("var3"),
     after("var4"),
     after("var5"),
  ))
  .select("id", "diff")
Community
  • 1
  • 1
iboss
  • 456
  • 6
  • 18
  • those vars come from two MySQL tables, but I like the approach you are suggesting... – user299791 Apr 12 '16 at 10:14
  • I haven't understand how to return a Column from the UDF yet... do you have a hint? – user299791 Apr 12 '16 at 11:04
  • this line `.withColumn("diff", diff(before("vars"), after("vars")))` define the result column name "diff" from function `diff` – iboss Apr 12 '16 at 11:06
  • say your udf just return "hello", can you edit your post to show how you define the return type? – user299791 Apr 12 '16 at 12:50
  • sorry but I don't get why you edited adding a new example... you already wrote a udf called diff, I meant to change that one... besides I have my data in a data frame, I also don't get why using tuple or case class... can I pass data frames to a UDF? – user299791 Apr 12 '16 at 15:30
  • you can not pass the whole DataFrame to it. see my recent edit – iboss Apr 13 '16 at 15:42