29

Let's say for instance that my team has choosen Python as the reference language to develop with Spark. But later for performance reasons, we would like to develop specific Scala or Java specific librairies in order to map them with our Python code (something similar to Python stubs with Scala or Java skeletons).

Don't you think is it possible to interface new customized Python methods with under the hood some Scala or Java User Defined Functions ?

prossblad
  • 688
  • 1
  • 7
  • 11

1 Answers1

35

Spark 2.1+

You can use SQLContext.registerJavaFunction:

Register a java UDF so it can be used in SQL statements.

which requires a name, fully qualified name of Java class, and optional return type. Unfortunately for now it can be used only in SQL statements (or with expr / selectExpr) and requires a Java org.apache.spark.sql.api.java.UDF*:

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.1.0"
)
package com.example.spark.udfs

import org.apache.spark.sql.api.java.UDF1

class addOne extends UDF1[Integer, Integer] {
  def call(x: Integer) = x + 1
} 
sqlContext.registerJavaFunction("add_one", "com.example.spark.udfs.addOne")
sqlContext.sql("SELECT add_one(1)").show()

## +------+
## |UDF(1)|
## +------+
## |     2|
## +------+

Version indpendent:

I wouldn't go so far as to say it is supported but it is certainly possible. All SQL functions available currently in PySpark are simply a wrappers around Scala API.

Lets assume I want to reuse GroupConcat UDAF I've created as an answer to SPARK SQL replacement for mysql GROUP_CONCAT aggregate function and it is located in a package com.example.udaf:

from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql import Row

row = Row("k", "v")
df = sc.parallelize([
    row(1, "foo1"), row(1, "foo2"), row(2, "bar1"), row(2, "bar2")]).toDF()

def groupConcat(col):
    """Group and concatenate values for a given column

    >>> df = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
    >>> df.select(groupConcat("v").alias("vs"))
    [Row(vs=u'foo,bar')]
    """
    sc = SparkContext._active_spark_context
    # It is possible to use java_import to avoid full package path
    _groupConcat = sc._jvm.com.example.udaf.GroupConcat.apply
    # Converting to Seq to match apply(exprs: Column*)
    return Column(_groupConcat(_to_seq(sc, [col], _to_java_column)))

df.groupBy("k").agg(groupConcat("v").alias("vs")).show()

## +---+---------+
## |  k|       vs|
## +---+---------+
## |  1|foo1,foo2|
## |  2|bar1,bar2|
## +---+---------+

There is far too much leading underscores for my taste but as you can see it can be done.

Related to:

zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    I am doing the following, but every time I encounter "py4j.protocol.Py4JError": com.example.udf.GroupConcat.apply does not exist in the JVM. My package is "com.example.udf" – Arnab Jun 27 '16 at 06:32
  • I have a jar which has enum constant and UDF. How to modify this code to use that? – dksahuji Jan 17 '18 at 11:15
  • I'm missing something on how `registerJavaFunction` knows where to find your UDF... Could you elaborate on the directory structure here? Have you `sbt clean assembly` your `scalaVersion :=...` (`build.sbt`??) and `package com.example.spark.udfs...` (`src/main/scala`??) files from another directory?elsewhere? – MichaelChirico Nov 24 '18 at 10:55
  • It's worth noting that you should really start by checking https://repo1.maven.org/maven2/org/apache/spark to be sure your Scala and Spark versions are compatible in the first place... I just spent a whole day (my first day using `sbt` ) trying to make `scalaVersion := "2.12.7"` work with `sparkVersion = "2.3.1"`, but Scala `2.12`+ are only compatible with Spark `2.4`+ (or so I gather) – MichaelChirico Nov 24 '18 at 16:40