25

I am trying to use the Spark Dataset API but I am having some issues doing a simple join.

Let's say I have two dataset with fields: date | value, then in the case of DataFrame my join would look like:

val dfA : DataFrame
val dfB : DataFrame

dfA.join(dfB, dfB("date") === dfA("date") )

However for Dataset there is the .joinWith method, but the same approach does not work:

val dfA : Dataset
val dfB : Dataset

dfA.joinWith(dfB, ? )

What is the argument required by .joinWith ?

mastro
  • 619
  • 1
  • 8
  • 17

3 Answers3

43

To use joinWith you first have to create a DataSet, and most likely two of them. To create a DataSet, you need to create a case class that matches your schema and call DataFrame.as[T] where T is your case class. So:

case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]

You could also skip the case class and use a tuple:

val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

Then if you had another case class / DF, like this say:

case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]

Then, while the syntax of join and joinWith are similar, the results are different:

df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// |  1| asdf|  1| 7.7| 101|
// |  2|34234|  2| 1.2|  10|
// +---+-----+---+----+----+

ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// |       _1|         _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+

As you can see, joinWith leaves the objects intact as parts of a tuple, while join flattens out the columns into a single namespace. (Which will cause problems in the above case because the column name "key" is repeated.)

Curiously enough, I have to use df.col("key") and df2.col("key") to create the conditions for joining ds and ds2 -- if you use just col("key") on either side it does not work, and ds.col(...) doesn't exist. Using the original df.col("key") does the trick, however.

David Griffin
  • 13,677
  • 5
  • 47
  • 65
  • 5
    detailed explanation. Just one confusion. Is there a better way to write typed join condition. for e.g. df.col("key") can we have something more type safe that can resolve correctness of "key" at compile time. – Mohammad Adnan Oct 10 '16 at 08:32
  • 11
    I completely agree, based on this syntax there is no use in creating the Dataset, so where is the benefit? I can't get over the fact that there is no typed alternative.. Such a pity! – Sparky Dec 12 '16 at 17:32
  • 1
    there is a workaround in this [answer of "Perform a typed join in Scala with Spark Datasets"](https://stackoverflow.com/questions/40605167/perform-a-typed-join-in-scala-with-spark-datasets?rq=1) – Minh Thai Jul 10 '18 at 09:25
9

From https://docs.cloud.databricks.com/docs/latest/databricks_guide/05%20Spark/1%20Intro%20Datasets.html

it looks like you could just do

dfA.as("A").joinWith(dfB.as("B"), $"A.date" === $"B.date" )
2

For the above example, you can try the below:

Define a case class for your output

case class JoinOutput(key:Int, value:String, num1:Double, num2:Long) 

Join two Datasets with Seq("key"), this will help you to avoid two duplicate key columns in the output, which will also help to apply the case class or fetch the data in the next step

val joined = ds.join(ds2, Seq("key")).as[JoinOutput]
// res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]

The result will be flat instead:

joined.show

+---+-----+----+----+
|key|value|num1|num2|
+---+-----+----+----+
|  1| asdf| 7.7| 101|
|  2|34234| 1.2|  10|
+---+-----+----+----+
Shaido
  • 27,497
  • 23
  • 70
  • 73
Syntax
  • 284
  • 1
  • 2
  • 9
  • you don't specifically answer the question, but the Seq("key") tip helped me out – ImDarrenG Nov 30 '17 at 16:35
  • You are not answering how to use `.joinWith`, and also `.join` is actually an untyped transformation, in which case you are not benefitting from the type safety of `Dataset` – jack Mar 03 '21 at 20:13