2

I am processing a Spark DataFrame (DF) and need to add a column to it on the flow, from inside a call to mapPartitions:

// Don't worry about what 'widget' is or represents
val rdd = df.mapPartitions { rows => addColIfNecessary(rows, widget) }

Then:

def addColIfNecessary(rows : Iterator[Row], widget : Widget) : Iterator[Row] = {
    rows.foreach { row =>
        if(widget.determineWhetherRowNeedsNewCol(row)) {
            // TODO: Add a new "fizz" column (of StringType) to the row
            val newVal : String = widget.getValueOfNewCol(row)
            row.addColumn("fizz", StringType, newVal)
        }
    }

    rows
}

This is obviously just pseudo-code, but conveys what I'm trying to do. Any ideas as to how I can actually implement it?

smeeb
  • 27,777
  • 57
  • 250
  • 447
  • 1
    possible duplicate of http://stackoverflow.com/questions/33876155/how-to-add-columns-into-org-apache-spark-sql-row-inside-of-mappartitions – Shankar Oct 25 '16 at 14:54
  • Thanks @Shankar but in that question the column is added to a newly created DF, not an existing one from inside the map partitions function. So I would argue that this is either a different (standalone) question, or that at the very least I need to understand what I'm doing wrong in my approach that would then allow that question to be the solution for my problem as well. Thanks again! – smeeb Oct 25 '16 at 15:06
  • 1
    Given that DF is a columnar format, it would be more advisable to conditionally add a value to a nillable column than to add a column to some Rows. Also, is there a specific need to do this within `mapPartitions` ? – maasg Oct 25 '16 at 15:18
  • Thanks @maasg (+1) if you could post even a pseudo-code example that would go a long way for me (I'm brand new to Spark and Scala). Also I don't think I *need* to do this from inside `mapPartitions`, thats just the code I've stitched together from consulting the Google Gods and Spark docs for the last few hours. – smeeb Oct 25 '16 at 15:20

1 Answers1

1

DataFrames are column-oriented structures, meaning that adding a column to some rows is not a good idea. Instead, you could leverage the support for nullable values in DataFrames and instead of adding an extra column, add an optional value to a Row based on some criteria.

An example: Let's take a DF of users and pages:

val users = Seq("Alice" , "Bob", "Charly", "Dean", "Eve", "Flor", "Greta")
val pages = (1 to 9).map(i => s"page_$i")
val userPages = for {u <- users
                     p <- pages} yield (u,p) 

val userPagesDF = sparkContext.parallelize(userPages).toDF("user","page")

// a user defined function that takes the last digit from the page and uses it to calculate a "rank". It only ranks pages with a number higher than 7

val rankUDF = udf((p:String) => if (p.takeRight(1).toInt>7) "top" else null)

// New DF with the extra column "rank", which contains values for only some rows
val ranked = userPagesDF.withColumn("rank", topPage($"page"))

ranked.show

+-----+-------+----+
| user|   page|rank|
+-----+-------+----+
|Alice| page_1|null|
|Alice| page_2|null|
|Alice| page_3|null|
|Alice| page_4|null|
|Alice| page_5|null|
|Alice| page_6|null|
|Alice| page_7|null|
|Alice| page_8| top|
|Alice| page_9| top|
|  Bob| page_1|null|
|  Bob| page_2|null|
|  Bob| page_3|null|
|  Bob| page_4|null|
|  Bob| page_5|null|
|  Bob| page_6|null|
|  Bob| page_7|null|
|  Bob| page_8| top|
|  Bob| page_9| top|
+-----+-------+----+

ranked.printSchema

root
 |-- user: string (nullable = true)
 |-- page: string (nullable = true)
 |-- rank: string (nullable = true)
maasg
  • 37,100
  • 11
  • 88
  • 115