3

I have the following dataframe:

root
 |-- visitor: string (nullable = true)
 |-- asset: array (nullable = true)
 |    |-- element: string (containsNull = true)

I am trying to groupby list of values that share similar indexes (visitor) to a single list of the original list type (array).

example:

val rawData1 = Seq(("visitor1",Array("item1","item2","item3","item4")),("visitor2",Array("item1","item2","item3")))
val rawData2 = Seq(("visitor1",Array("item1","item2","item5")),("visitor2",Array("item4","item7")))
val df1 = spark.createDataFrame(rawData1).toDF("visitor","asset")
val df2 = spark.createDataFrame(rawData2).toDF("visitor","asset")
val dfJoined = df1.union(df2)
dfJoined.groupBy("visitor").agg(collect_list("asset"))

and what I get is:

visitor collect_list(asset)
visitor2    [WrappedArray(item1, item2, item3), WrappedArray(item4, item7)]
visitor1    [WrappedArray(item1, item2, item3, item4), WrappedArray(item1, item2, item5)]

but I don't want two sublists in the asset column, I would like all values of the two lists to be grouped as one list with the original type (array).

Thanks!

Andreyn
  • 304
  • 5
  • 14

1 Answers1

4

One option is to flatten df1 and df2 with explode before combining them, then do the aggregation:

(df1.withColumn("asset", explode($"asset"))
    .union(df2.withColumn("asset", explode($"asset")))
    .groupBy("visitor")
    .agg(collect_list("asset"))
).show(false)

+--------+-------------------------------------------------+
|visitor |collect_list(asset)                              |
+--------+-------------------------------------------------+
|visitor2|[item1, item2, item3, item4, item7]              |
|visitor1|[item1, item2, item3, item4, item1, item2, item5]|
+--------+-------------------------------------------------+
Psidom
  • 209,562
  • 33
  • 339
  • 356