4

I have an input dataframe which contains an array-typed column. Each entry in the array is a struct consisting of a key (one of about four values) and a value. I want to turn this into a dataframe with one column for each possible key, and nulls where that value is not in the array for that row. Keys are never duplicated in any of the arrays, but they may be out of order or missing.

So far the best I've got is

val wantedCols =df.columns
  .filter(_ != arrayCol)
  .filter(_ != "col")
val flattened = df
        .select((wantedCols.map(col(_)) ++ Seq(explode(col(arrayCol)))):_*)
        .groupBy(wantedCols.map(col(_)):_*)
        .pivot("col.key")
        .agg(first("col.value"))

This does exactly what I want, but it's hideous and I have no idea what the ramifactions of grouping on every-column-but-one would be. What's the RIGHT way to do this?

EDIT: Example input/output:

case class testStruct(name : String, number : String)
val dfExampleInput = Seq(
(0, "KY", Seq(testStruct("A", "45"))),
(1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))))
.toDF("index", "state", "entries")
.show

+-----+-----+------------------+
|index|state|           entries|
+-----+-----+------------------+
|    0|   KY|         [[A, 45]]|
|    1|   OR|[[A, 30], [B, 10]]|
+-----+-----+------------------+

val dfExampleOutput = Seq(
  (0, "KY", "45", null),
  (1, "OR", "30", "10"))
  .toDF("index", "state", "A", "B")
  .show

+-----+-----+---+----+
|index|state|  A|   B|
+-----+-----+---+----+
|    0|   KY| 45|null|
|    1|   OR| 30|  10|
+-----+-----+---+----+

FURTHER EDIT:

I submitted a solution myself (see below) that handles this well so long as you know the keys in advance (in my case I do.) If finding the keys is an issue, another answer holds code to handle that.

Edward Peters
  • 3,623
  • 2
  • 16
  • 39
  • Can you add sample input & output expected ? – Srinivas May 09 '20 at 03:31
  • I would be very helpful to add the given input and the expected output to your question, please check [this](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples) post for more details – abiratsis May 11 '20 at 09:18
  • @Srinivas edited that in. Worried it's slightly unclear (didn't display how I would expect) - the "entries" column is an array of structs. – Edward Peters May 11 '20 at 15:33
  • @AlexandrosBiratsis edited that in – Edward Peters May 11 '20 at 15:33
  • @Edward do you know the size of entries array in advance, it is fixed or not? Also, pivot is one of the heaviest operation in Spark if possible you should avoid it – abiratsis May 11 '20 at 16:03
  • @AlexandrosBiratsis I do not know the size of the arrays, but I know they are bounded by a small number. There are only four or five possible keys; any given row might have any permutation of those, with no repetition. If there's a way to do this without pivot I'd love to hear it. – Edward Peters May 11 '20 at 16:59
  • `Keys are never duplicated in any of the arrays, but they may be out of order or missing.` does that mean that if we identify the array with max elements we can consider it representative for all the columns? – abiratsis May 11 '20 at 19:13
  • @AlexandrosBiratsis I'm not sure I understand the question. Probably there will exist at least one entry with all possible keys, but it's possible there might be one with `[A, B]`, one with `[B, C]`, but none with all. However, it would not be prohibitive to hard code the keys. – Edward Peters May 11 '20 at 20:12
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/213640/discussion-between-alexandros-biratsis-and-edward-peters). – abiratsis May 11 '20 at 20:15
  • @EdwardPeters could provide some feedback about the provided answers? Could you try any of them? – abiratsis May 22 '20 at 20:36
  • @AlexandrosBiratsis I think all of the answers given probably technically work, but I don't think any fix the core issues I have with my own attempt (it expresses a one-row-to-one-row operation using arbitrary grouping, and is just generally inelegant.) Having looked here and elsewhere, I don't really think there is a "good" answer to this one... I suspect the best approach is to flatten the array directly by assigning columns to each possible index, then check each for each key. It'd be ugly and bulky, but efficient and less of a hack. – Edward Peters May 27 '20 at 18:54
  • Sure @Edward take your time to check all the possible options. Thank you for letting me know – abiratsis May 27 '20 at 20:29
  • @EdwardPeters I just realised that what you described on your last response it is precisely the solution I came up with. I would recommend you to check my answer once more since it is doing exactly that. It might be a bit long but it is definitely more performant than any groupBy solution. Of course when you want to avoid using the default Spark API you will need to sacrifice something in this case the implementation needs some more code – abiratsis May 31 '20 at 12:25
  • and by the way, it is not a hack at all. You will see many times similar transformations through the RDD API when the columns should be generated dynamically. In the case of known tuple size we can use Dataframe API and its `map` function – abiratsis May 31 '20 at 12:27
  • Okay, yeah, on first glance I was confused as to what the "explode" call was doing, and thought that was how you were collecting key/value pairs and not just getting the set of keys. – Edward Peters Jun 01 '20 at 14:28

4 Answers4

0

I wouldn't worry too much about grouping by several columns, other than potentially making things confusing. In that vein, if there is a simpler, more maintainable way, go for it. Without example input/output, I'm not sure if this gets you where you're trying to go, but maybe it'll be of use:

Seq(Seq("k1" -> "v1", "k2" -> "v2")).toDS() // some basic input based on my understanding of your description
  .select(explode($"value")) // flatten the array
  .select("col.*") // de-nest the struct
  .groupBy("_2") // one row per distinct value
  .pivot("_1") // one column per distinct key
  .count // or agg(first) if you want the value in each column
  .show
+---+----+----+
| _2|  k1|  k2|
+---+----+----+
| v2|null|   1|
| v1|   1|null|
+---+----+----+

Based on what you've now said, I get the impression that there are many columns like "state" that aren't required for the aggregation, but need to be in the final result.

For reference, if you didn't need to pivot, you could add a struct column with all such fields nested within, then add it to your aggregation, eg: .agg(first($"myStruct"), first($"number")). The main advantage is only having actual key column(s) referenced in the groubBy. But when using pivot things get a little weird, so we'll set that option aside.

In this use case, the simplest way I could come up with involves splitting your dataframe and joining it back together after the aggregation using some rowkey. In this example I am assuming that "index" is suitable for that purpose:

 val mehCols = dfExampleInput.columns.filter(_ != "entries").map(col)
 val mehDF = dfExampleInput.select(mehCols:_*)
 val aggDF = dfExampleInput
   .select($"index", explode($"entries").as("entry"))
   .select($"index", $"entry.*")
   .groupBy("index")
   .pivot("name")
   .agg(first($"number"))

 scala> mehDF.join(aggDF, Seq("index")).show
 +-----+-----+---+----+
 |index|state|  A|   B|
 +-----+-----+---+----+
 |    0|   KY| 45|null|
 |    1|   OR| 30|  10|
 +-----+-----+---+----+

I doubt you would see much of a difference in performance, if any. Maybe at the extremes, eg: very many meh columns, or very many pivot columns, or something like that, or maybe nothing at all. Personally, I would test both with decently-sized input, and if there wasn't a significant difference, use whichever one seemed easier to maintain.

  • Added example input/output. Worried it's slightly unclear (didn't display how I would expect) - the "entries" column is an array of structs. – Edward Peters May 11 '20 at 15:34
  • @EdwardPeters, my original answer only concerned itself with the keys and values. I have added a join-based solution now that there is some sample input/output to draw context from. – Brad LaVigne May 11 '20 at 18:38
  • @EdwardPeters One other option you could look into would be dataframe window functions. I haven't really tried them out yet, but from what I understand they are very similar to groupBy, but, among other differences, include all of the original columns. – Brad LaVigne May 12 '20 at 18:20
0

Without groupBy pivot agg first

Please check below code.

scala> val df = Seq((0, "KY", Seq(("A", "45"))),(1, "OR", Seq(("A", "30"),("B", "10")))).toDF("index", "state", "entries").withColumn("entries",$"entries".cast("array<struct<name:string,number:string>>"))
df: org.apache.spark.sql.DataFrame = [index: int, state: string ... 1 more field]

scala> df.printSchema
root
 |-- index: integer (nullable = false)
 |-- state: string (nullable = true)
 |-- entries: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- number: string (nullable = true)


scala> df.show(false)
+-----+-----+------------------+
|index|state|entries           |
+-----+-----+------------------+
|0    |KY   |[[A, 45]]         |
|1    |OR   |[[A, 30], [B, 10]]|
+-----+-----+------------------+


scala> val finalDFColumns = df.select(explode($"entries").as("entries")).select("entries.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect.foldLeft(df.limit(0))((cdf,c) => cdf.withColumn(c,lit(null))).columns
finalDFColumns: Array[String] = Array(index, state, entries, A, B)

scala> val finalDF = df.select($"*" +: (0 until max).map(i => $"entries".getItem(i)("number").as(i.toString)): _*)
finalDF: org.apache.spark.sql.DataFrame = [index: int, state: string ... 3 more fields]

scala> finalDF.show(false)
+-----+-----+------------------+---+----+
|index|state|entries           |0  |1   |
+-----+-----+------------------+---+----+
|0    |KY   |[[A, 45]]         |45 |null|
|1    |OR   |[[A, 30], [B, 10]]|30 |10  |
+-----+-----+------------------+---+----+


scala> finalDF.printSchema
root
 |-- index: integer (nullable = false)
 |-- state: string (nullable = true)
 |-- entries: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- number: string (nullable = true)
 |-- 0: string (nullable = true)
 |-- 1: string (nullable = true)

scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).show(false)
+-----+-----+------------------+---+----+
|index|state|entries           |A  |B   |
+-----+-----+------------------+---+----+
|0    |KY   |[[A, 45]]         |45 |null|
|1    |OR   |[[A, 30], [B, 10]]|30 |10  |
+-----+-----+------------------+---+----+



scala>

Final Output


scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).drop($"entries").show(false)
+-----+-----+---+----+
|index|state|A  |B   |
+-----+-----+---+----+
|0    |KY   |45 |null|
|1    |OR   |30 |10  |
+-----+-----+---+----+

Srinivas
  • 8,957
  • 2
  • 12
  • 26
  • I think that's almost exactly the version I'm trying to improve? (Mine generates the "Group by" arguments from the column list, though.) One thing is that the dataframe I'm working with has hundreds of columns... grouping by all of them seems wrong. – Edward Peters May 11 '20 at 17:02
  • I have updated answer, this time i have not used any of these - groupBy, pivot, agg, first functions – Srinivas May 12 '20 at 03:40
  • 1
    Hi Srinivas, by any chance do you have a pyspark version of this? – user3222101 Oct 27 '21 at 20:35
0

Here is another way that is based on the assumption that there are no duplicates on the entries column i.e Seq(testStruct("A", "30"), testStruct("A", "70"), testStruct("B", "10")) will cause an error. The next solution combines both RDD and Dataframe APIs for the implementation:

import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.types.StructType

case class testStruct(name : String, number : String)
val df = Seq(
  (0, "KY", Seq(testStruct("A", "45"))),
  (1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))),
  (2, "FL", Seq(testStruct("A", "30"), testStruct("B", "10"), testStruct("C", "20"))),
  (3, "TX", Seq(testStruct("B", "60"), testStruct("A", "19"), testStruct("C", "40")))
)
.toDF("index", "state", "entries")
.cache

// get all possible keys from entries i.e Seq[A, B, C]
val finalCols = df.select(explode($"entries").as("entry"))
                  .select($"entry".getField("name").as("entry_name"))
                  .distinct
                  .collect
                  .map{_.getAs[String]("entry_name")}
                  .sorted // Attention: we need to retain the order of the columns 
                          // 1. when generating row values and
                          // 2. when creating the schema

val rdd = df.rdd.map{ r =>
  // transform the entries array into a map i.e Map(A -> 30, B -> 10)
  val entriesMap = r.getSeq[Row](2).map{r => (r.getString(0), r.getString(1))}.toMap

  // transform finalCols into a map with null value i.e Map(A -> null, B -> null, C -> null)
  val finalColsMap = finalCols.map{c => (c, null)}.toMap

  // replace null values with those that are present from the current row by merging the two previous maps
  // Attention: this should retain the order of finalColsMap
  val merged = finalColsMap ++ entriesMap

  // concatenate the two first row values ["index", "state"] with the values from merged
  val finalValues = Seq(r(0), r(1)) ++ merged.values

  Row.fromSeq(finalValues)
}

val extraCols = finalCols.map{c => s"`${c}` STRING"}
val schema = StructType.fromDDL("`index` INT, `state` STRING," + extraCols.mkString(","))

val finalDf = spark.createDataFrame(rdd, schema)

finalDf.show
// +-----+-----+---+----+----+
// |index|state|  A|   B|   C|
// +-----+-----+---+----+----+
// |    0|   KY| 45|null|null|
// |    1|   OR| 30|  10|null|
// |    2|   FL| 30|  10|  20|
// |    3|   TX| 19|  60|  40|
// +-----+-----+---+----+----+

Note: the solution requires one extra action to retrieve the unique keys although it doesn't cause any shuffling since it it based on narrow transformations only.

abiratsis
  • 7,051
  • 3
  • 28
  • 46
0

I've worked out a solution myself:

def extractFromArray(colName : String, key : String, numKeys : Int, keyName : String) = {
  val indexCols = (0 to numKeys-1).map(col(colName).getItem(_))
  indexCols.foldLeft(lit(null))((innerCol : Column, indexCol : Column) =>
      when(indexCol.isNotNull && (indexCol.getItem(keyName) === key), indexCol)
      .otherwise(innerCol))
}

Example:

case class testStruct(name : String, number : String)
val df = Seq(
  (0, "KY", Seq(testStruct("A", "45"))),
  (1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))),
  (2, "FL", Seq(testStruct("A", "30"), testStruct("B", "10"), testStruct("C", "20"))),
  (3, "TX", Seq(testStruct("B", "60"), testStruct("A", "19"), testStruct("C", "40")))
)
.toDF("index", "state", "entries")
.withColumn("A", extractFromArray("entries", "B", 3, "name"))
.show

which produces:

+-----+-----+--------------------+-------+
|index|state|             entries|      A|
+-----+-----+--------------------+-------+
|    0|   KY|           [[A, 45]]|   null|
|    1|   OR|  [[A, 30], [B, 10]]|[B, 10]|
|    2|   FL|[[A, 30], [B, 10]...|[B, 10]|
|    3|   TX|[[B, 60], [A, 19]...|[B, 60]|
+-----+-----+--------------------+-------+

This solution is a little different from other answers:

  • It works on only a single key at a time
  • It requires the key name and number of keys be known in advance
  • It produces a column of structs, rather than doing the extra step of extracting specific values
  • It works as a simple column-to-column operation, rather than requiring transformations on the entire DF
  • It can be evaluated lazily

The first three issues can be handled by calling code, and leave it somewhat more flexible for cases where you already know the keys or where the structs contain additional values to extract.

Edward Peters
  • 3,623
  • 2
  • 16
  • 39
  • Hi @Edward I checked your solution and indeed as you mentioned it works on a single key. Although that was probably the most difficult part aka flattening dynamically and efficiently, therefore I think that your solution it works but under specific conditions and for the given keys. Although it will not solve the problem as a whole i.e how you will execute this for every key? You again will face the same issue for discovering the keys. – abiratsis Jun 05 '20 at 21:04