9

It's about transform higher-order function (https://issues.apache.org/jira/browse/SPARK-23908).

Is there any way to use it as a standard function (in package org.apache.spark.sql.functions._)?

I have an array of strings and I want to apply URI normalization to each of them. For now I did it with an UDF. I just hopped that with spark 2.4.0 I would be able to skip the UDF.

As I see it should be used in selectExpr like df.selectExpr("transform(i, x -> x + 1)") but is it only meant to be used with selectExpr?

Using it this way is there anyway to provide a custom function for the transformation? Is there any way to achieve it or should I resort to using good old UDFs?

Community
  • 1
  • 1
MitakaJ9
  • 191
  • 1
  • 2
  • 11
  • Can you describe your use case? I think using UDFs is the fastest performance-wise. – Jacek Laskowski Dec 13 '18 at 16:53
  • I have an array of strings and I want to apply URI normalization to each of them. For now I did it with an UDF. I just hopped that with spark 2.4.0 I would be able to skip the UDF. – MitakaJ9 Dec 13 '18 at 18:12

2 Answers2

9

Is there anyway to use it as a standard function located in package org.apache.spark.sql.functions._ ?

For now it is intended only for usage with SQL expressions, although if you want to return a Column your use expr:

org.apache.spark.sql.functions._

expr("transform(i, x -> x + 1)"): Column

Using it this way is there anyway to provide a custom function for the transformation?

It is possible to use Scala UDF*:

spark.udf.register("f", (x: Int) => x + 1)

Seq((1, Seq(1, 2, 3))).toDF("id", "xs")
  .withColumn("xsinc", expr("transform(xs, x -> f(x))"))
  .show
+---+---------+---------+
| id|       xs|    xsinc|
+---+---------+---------+
|  1|[1, 2, 3]|[2, 3, 4]|
+---+---------+---------+

although it doesn't seem to provide any real benefits over UDF taking a Seq.


* A partial support for Python UDFs seem to be in place (udfs are recognized, types are correctly derived, and calls are dispatched) as well, but as of 2.4.0 the serialization mechanism seems to be broken (all records are passed to UDF as None):

from typing import Optional
from pyspark.sql.functions import expr

sc.version
'2.4.0'
def f(x: Optional[int]) -> Optional[int]:
    return x + 1 if x is not None else None

spark.udf.register('f', f, "integer")

df = (spark
    .createDataFrame([(1, [1, 2, 3])], ("id", "xs"))
    .withColumn("xsinc", expr("transform(xs, x -> f(x))")))

df.printSchema()
root
 |-- id: long (nullable = true)
 |-- xs: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- xsinc: array (nullable = true)
 |    |-- element: integer (containsNull = true)
df.show()
+---+---------+-----+
| id|       xs|xsinc|
+---+---------+-----+
|  1|[1, 2, 3]| [,,]|
+---+---------+-----+

Of course there is no real potential for performance boost here - it dispatches to BasePythonRunner so overhead should be the same as of plain udf.

Related JIRA ticket SPARK-27052 - Using PySpark udf in transform yields NULL values

Community
  • 1
  • 1
10465355
  • 4,481
  • 2
  • 20
  • 44
  • the fact you said if you want a hof to return a column use F.expr() makes me think in SQL you can use them to do more than just work on a single column. Is this possible? – Topde Jun 09 '21 at 19:50
1

It seems that in python, as reported in the answer above, there is a problem with the expr function using a udf, but it can be done as follows:

from typing import Optional
from pyspark.sql.functions import expr, transform

def f(x: Optional[int]) -> Optional[int]:
    return x + 1 if x is not None else None

spark.udf.register('f', f, "integer")

df = (spark
    .createDataFrame(
        [(1, [1, 2, 3])], ("id", "xs"))
    .withColumn("xsinc", eval("transform(col('xs'),  lambda x: f(x))")))

df.show()
+---+---------+---------+
| id|       xs|    xsinc|
+---+---------+---------+
|  1|[1, 2, 3]|[2, 3, 4]|
+---+---------+---------+