2

I have two dataframes like below

Df1

    +----------------------+---------+
    |products              |visitorId|
    +----------------------+---------+
    |[[i1,0.68], [i2,0.42]]|v1       |
    |[[i1,0.78], [i3,0.11]]|v2       |
    +----------------------+---------+

Df2

+---+----------+
| id|      name|
+---+----------+
| i1|Nike Shoes|
| i2|  Umbrella|
| i3|     Jeans|
+---+----------+

Here is the schema of the dataframe Df1

root
 |-- products: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- interest: double (nullable = true)
 |-- visitorId: string (nullable = true)

I want to join the 2 dataframes so that the output is

+------------------------------------------+---------+
|products                                  |visitorId|
+------------------------------------------+---------+
|[[i1,0.68,Nike Shoes], [i2,0.42,Umbrella]]|v1       |
|[[i1,0.78,Nike Shoes], [i3,0.11,Jeans]]   |v2       |
+------------------------------------------+---------+

Here is the schema of the output that I am expecting

root
     |-- products: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- id: string (nullable = true)
     |    |    |-- interest: double (nullable = true)
     |    |    |-- name: double (nullable = true)
     |-- visitorId: string (nullable = true)

How do I do it in Scala? I am using Spark 2.2.0.

Update

I did the explode and join on the above dataframes and got the below output.

+---------+---+--------+----------+
|visitorId| id|interest|      name|
+---------+---+--------+----------+
|       v1| i1|    0.68|Nike Shoes|
|       v1| i2|    0.42|  Umbrella|
|       v2| i1|    0.78|Nike Shoes|
|       v2| i3|    0.11|     Jeans|
+---------+---+--------+----------+

Now, I just need the above dataframe in the below json format.

{
    "visitorId": "v1",
    "products": [{
         "id": "i1",
         "name": "Nike Shoes",
         "interest": 0.68
    }, {
         "id": "i2",
         "name": "Umbrella",
         "interest": 0.42
    }]
},
{
    "visitorId": "v2",
    "products": [{
         "id": "i1",
         "name": "Nike Shoes",
         "interest": 0.78
    }, {
         "id": "i3",
         "name": "Jeans",
         "interest": 0.11
    }]
}
yAsH
  • 3,367
  • 8
  • 36
  • 67

2 Answers2

3

Try this.

scala> val df1 = Seq((Seq(("i1",0.68),("i2",0.42)), "v1"), (Seq(("i1",0.78),("i3",0.11)), "v2")).toDF("products", "visitorId" )
df: org.apache.spark.sql.DataFrame = [products: array<struct<_1:string,_2:double>>, visitorId: string]

scala> df1.show(false)
+------------------------+---------+
|products                |visitorId|
+------------------------+---------+
|[[i1, 0.68], [i2, 0.42]]|v1       |
|[[i1, 0.78], [i3, 0.11]]|v2       |
+------------------------+---------+

scala> val df2 = Seq(("i1", "Nike Shoes"),("i2", "Umbrella"), ("i3", "Jeans")).toDF("id", "name")
df2: org.apache.spark.sql.DataFrame = [id: string, name: string]

scala> df2.show(false)
+---+----------+
|id |name      |
+---+----------+
|i1 |Nike Shoes|
|i2 |Umbrella  |
|i3 |Jeans     |
+---+----------+


scala> val withProductsDF = df1.withColumn("individualproducts", explode($"products")).select($"visitorId",$"products",$"individualproducts._1" as "id", $"individualproducts._2" as "interest")
withProductsDF: org.apache.spark.sql.DataFrame = [visitorId: string, products: array<struct<_1:string,_2:double>> ... 2 more fields]

scala> withProductsDF.show(false)
+---------+------------------------+---+--------+
|visitorId|products                |id |interest|
+---------+------------------------+---+--------+
|v1       |[[i1, 0.68], [i2, 0.42]]|i1 |0.68    |
|v1       |[[i1, 0.68], [i2, 0.42]]|i2 |0.42    |
|v2       |[[i1, 0.78], [i3, 0.11]]|i1 |0.78    |
|v2       |[[i1, 0.78], [i3, 0.11]]|i3 |0.11    |
+---------+------------------------+---+--------+


scala> val withProductNamesDF = withProductsDF.join(df2, "id")
withProductNamesDF: org.apache.spark.sql.DataFrame = [id: string, visitorId: string ... 3 more fields]

scala> withProductNamesDF.show(false)
+---+---------+------------------------+--------+----------+
|id |visitorId|products                |interest|name      |
+---+---------+------------------------+--------+----------+
|i1 |v2       |[[i1, 0.78], [i3, 0.11]]|0.78    |Nike Shoes|
|i1 |v1       |[[i1, 0.68], [i2, 0.42]]|0.68    |Nike Shoes|
|i2 |v1       |[[i1, 0.68], [i2, 0.42]]|0.42    |Umbrella  |
|i3 |v2       |[[i1, 0.78], [i3, 0.11]]|0.11    |Jeans     |
+---+---------+------------------------+--------+----------+


scala> val outputDF = withProductNamesDF.groupBy("visitorId").agg(collect_list(struct($"id", $"name", $"interest")) as  "products")
outputDF: org.apache.spark.sql.DataFrame = [visitorId: string, products: array<struct<id:string,name:string,interest:double>>]

scala> outputDF.toJSON.show(false)
+-----------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                        |
+-----------------------------------------------------------------------------------------------------------------------------+
|{"visitorId":"v2","products":[{"id":"i1","name":"Nike Shoes","interest":0.78},{"id":"i3","name":"Jeans","interest":0.11}]}   |
|{"visitorId":"v1","products":[{"id":"i1","name":"Nike Shoes","interest":0.68},{"id":"i2","name":"Umbrella","interest":0.42}]}|
+-----------------------------------------------------------------------------------------------------------------------------+
C.S.Reddy Gadipally
  • 1,718
  • 11
  • 22
2

Depends on your specific case, but if it happens that the df2 lookup table is small enough you could try to collect it as Scala map to use in UDF. So it becomes as simple as:

val m = df2.as[(String, String)].collect.toMap

val addName = udf( (arr: Seq[Row]) => {
    arr.map(i => (i.getAs[String](0), i.getAs[Double](1), m(i.getAs[String](0))))
})

df1.withColumn("products", addName('products)).show(false)

+------------------------------------------+---------+
|products                                  |visitorId|
+------------------------------------------+---------+
|[[i1,0.68,Nike Shoes], [i2,0.42,Umbrella]]|v1       |
|[[i1,0.78,Nike Shoes], [i3,0.11,Jeans]]   |v2       |
+------------------------------------------+---------+
Kombajn zbożowy
  • 8,755
  • 3
  • 28
  • 60