19

Question

I want to add the return values of a UDF to an existing dataframe in seperate columns. How do I achieve this in a resourceful way?

Here's an example of what I have so far.

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType  

df = spark.createDataFrame([("Alive",4)],["Name","Number"])
df.show(1)

+-----+------+
| Name|Number|
+-----+------+
|Alive|     4|
+-----+------+

def example(n):
        return [[n+2], [n-2]]

#  schema = StructType([
#          StructField("Out1", ArrayType(IntegerType()), False),
#          StructField("Out2", ArrayType(IntegerType()), False)])

example_udf = udf(example)

Now I can add a column to the dataframe as follows

newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF.show(1)
+-----+------+----------+
| Name|Number|Output    |
+-----+------+----------+
|Alive|     4|[[6], [2]]|
+-----+------+----------+

However I don't want the two values to be in the same column but rather in separate ones.

Ideally I'd like to split the output column now to avoid calling the example function two times (once for each return value) as explained here and here, however in my situation I'm getting an array of arrays and I can't see how a split would work there (please note that each array will contain multiple values, separated with a ",".

How the result should look like

What I ultimately want is this

+-----+------+----+----+
| Name|Number|Out1|Out2|
+-----+------+----+----+
|Alive|     4|   6|   2|
+-----+------+----+----+

Note that the use of the StructType return type is optional and doesn't necessarily have to be part of the solution.

EDIT: I commented out the use of StructType (and edited the udf assignment) since it's not necessary for the return type of the example function. However it has to be used if the return value would be something like

return [6,3,2],[4,3,1]
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Rob
  • 369
  • 2
  • 3
  • 13
  • Possible duplicate of [Apache Spark -- Assign the result of UDF to multiple dataframe columns](https://stackoverflow.com/questions/35322764/apache-spark-assign-the-result-of-udf-to-multiple-dataframe-columns) – pault Mar 08 '19 at 16:18

3 Answers3

28

To return a StructType, just using Row

from pyspark.sql.types import StructType,StructField,IntegerType,Row
from pyspark.sql import functions as F

df = spark.createDataFrame([("Alive", 4)], ["Name", "Number"])


def example(n):
    return Row('Out1', 'Out2')(n + 2, n - 2)


schema = StructType([
    StructField("Out1", IntegerType(), False),
    StructField("Out2", IntegerType(), False)])

example_udf = F.UserDefinedFunction(example, schema)

newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF = newDF.select("Name", "Number", "Output.*")

newDF.show(truncate=False)
computermacgyver
  • 802
  • 7
  • 15
Zhang Tong
  • 4,569
  • 3
  • 19
  • 38
  • Oh sweet, that's exactly what I've been looking for! Thank you. – Rob Dec 06 '17 at 09:10
  • 5
    This technique works but if you look at the explain plan it internally calls the example method twice, which is not intended and should be avoided. As in general catalyst optimizer does not optimize UDF since its a blackbox on top of that we ended up calling same UDF twice(in this case, since we want out1 and out2 in different columns) – Manish Mehra Aug 18 '18 at 11:55
  • 1
    Calling the method twice *is* an optimization, at least according to the optimizer. If you don't want the method to be called twice you can mark it as non-deterministic and thus forcing the optimizer to call it once by doing `example_udf = example_udf.asNondeterministic()`. You don't need to go through the hoops of turning into an array and then exploding, which also adds its own computational cost. – dtheodor Jan 26 '21 at 08:46
22

Better way to solve above problem is by casting the output in an array and then exploding it

import pyspark.sql.functions as f
import pyspark.sql.types as t

df = spark.createDataFrame([("Alive", 4)], ["Name", "Number"])


def example(n):
    return t.Row('Out1', 'Out2')(n + 2, n - 2)


schema = StructType([
    StructField("Out1", t.IntegerType(), False),
    StructField("Out2", t.IntegerType(), False)])

example_udf = f.udf(example, schema)

newDF = df.withColumn("Output", f.explode(f.array(example_udf(df["Number"]))))
newDF = newDF.select("Name", "Number", "Output.*")

newDF.show(truncate=False)
newDF.explain()

Notice the output of explain, you will observe that example method is actually getting called only once!!

Manish Mehra
  • 1,381
  • 1
  • 16
  • 24
  • 2
    Purely anecdotal, but I tried both versions in my script and this was almost 3 times faster! Anyone looking at this should definitely try it – KGS Dec 16 '19 at 15:29
  • 1
    Be warned: explode is going to stop performing with 500,000+ rows. – Leonardo Eloy Jun 03 '20 at 05:14
3

In scala

import spark.implicits
val df = Seq(("Alive", 4)).toDF("Name", "Number")

Without a UDF

df.
  withColumn("OutPlus",  $"Number" + 2).
  withColumn("OutMinus", $"Number" - 2).
  show
+-----+------+-------+--------+
| Name|Number|OutPlus|OutMinus|
+-----+------+-------+--------+
|Alive|     4|      6|       2|
+-----+------+-------+--------+

With a UDF using explode

import org.apache.spark.sql.functions.udf
def twoItems(_i: Int) = Seq((_i + 2, _i - 2))
val twoItemsUdf = udf(twoItems(_: Int))

val exploded = df.
  withColumn("Out", explode(twoItemsUdf($"Number"))).
  withColumn("OutPlus", $"Out._1").
  withColumn("OutMinus", $"Out._2")

exploded.printSchema

root
 |-- Name: string (nullable = true)
 |-- Number: integer (nullable = false)
 |-- Out: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: integer (nullable = false)
 |-- OutPlus: integer (nullable = true)
 |-- OutMinus: integer (nullable = true)

  exploded.drop("Out").show

+-----+------+-------+--------+
| Name|Number|OutPlus|OutMinus|
+-----+------+-------+--------+
|Alive|     4|      6|       2|
+-----+------+-------+--------+
Tony Fraser
  • 727
  • 7
  • 14