12

Several people (1, 2, 3) have discussed using a Scala UDF in a PySpark application, usually for performance reasons. I am interested in the opposite - using a python UDF in a Scala Spark project.

I am particularly interested in building a model using sklearn (and MLFlow) then efficiently applying that to records in a Spark streaming job. I know I could also host the python model behind a REST API and make calls to that API in the Spark streaming application in mapPartitions, but managing concurrency for that task and setting up the API for hosted model isn't something I'm super excited about.

Is this possible without too much custom development with something like Py4J? Is this just a bad idea?

Thanks!

Community
  • 1
  • 1
turtlemonvh
  • 9,149
  • 6
  • 47
  • 53
  • It is possible, though definitely not supported nor straightforward. So the question really is why would you even try. It is really hard to find a reasonable justification for such process. – zero323 Aug 19 '18 at 11:02
  • @user6910411 Thanks for the response. I explained the use case in the question - I'd like to use a model I trained using sklearn to evaluate individual rows in a structured streaming application. – turtlemonvh Aug 20 '18 at 13:20
  • I guess the question is - if you already want to pay a price of inter-language communication, why not go with PySpark all the way? – zero323 Aug 20 '18 at 16:10
  • In this case, because 1) the python operation will be a small piece of a larger Spark job, and I'd rather not pay the PySpark penalty for the whole thing and 2) I already have a mature Scala project, and just want to add in a bit of python w/o needing a rewrite. – turtlemonvh Aug 20 '18 at 18:18
  • Not submitting as an answer but if you use Databricks you can use Scala and Python in the same job. You can hop over to Pyspark for that UDF with sklearn, score the records, and then immediately transition back to Scala downstream. The common layer is Spark tables in SparkSQL. Pyspark can read them and write them, as well as Spark in Scala (obviously). Not sure how you would do this with pure open source, or if something like Zeppelin supports this. (Full disclosure, I work for Databricks) – Raphael K May 08 '19 at 01:55
  • Thanks @RaphaelK: we are Databricks customers at the company I work for, so I'm aware of that option, at least in notebook powered jobs. – turtlemonvh May 09 '19 at 02:17
  • Were you able to find a solution for this? I have a similar problem. I have a well-matured spark/scala project where I need to load models generated using sklearn or even pytorch and then do predictions on a large dataset. – Shirish Kumar Dec 23 '20 at 19:00
  • @ShirishKumar kind of. I did have to do something recently where I had a udf that was doing some complex network operations, and I wanted to reuse the open connection and add some response caching. What I did was create a global object and set up the udf to make function calls through the global object. The global managed the connection, cached responses, etc. If I had to solve this today I'd take the same approach and open up a process running python then feed records through process. This is similar to how python udfs work in pyspark (all data goes through stdin/stdout). – turtlemonvh Dec 23 '20 at 22:45
  • Some examples of this weird pattern: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala, https://github.com/apache/spark/blob/v3.0.1/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala, https://github.com/apache/spark/blob/v3.0.1/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala, https://github.com/apache/spark/blob/v3.0.1/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala The last one is the main entrypoint for running python udfs in spark. – turtlemonvh Dec 23 '20 at 22:55
  • To summarize the above: look at how python udfs work by checking out [PythonRunner](https://github.com/apache/spark/blob/v3.0.1/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala). The code is `private[spark]`, but you can still access it by putting a wrapper object inside `org.apache.spark`. There is a lot of fanciness inside that function that you may not need. If you don't need udfs, a simple `mapPartitions` (spinning up a python process per partition) may suffice to call your code. – turtlemonvh Dec 23 '20 at 23:06
  • 1
    Please, have a look at https://stackoverflow.com/q/76802912/6380624 – Andrei Iatsuk Aug 02 '23 at 11:20

1 Answers1

1

Maybe I'm late to the party, but at least I can help with this for posterity. This is actually achievable by creating your python udf and registering it with spark.udf.register("my_python_udf", foo). You can view the doc here https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.UDFRegistration.register

This function can then be called from sqlContext in Python, Scala, Java, R or any language really, because you're accessing sqlContext directly (where the udf is registered). For example, you would call something like

spark.sql("SELECT my_python_udf(...)").show()

PROS - You get to call your sklearn model from Scala.

CONS - You have to use sqlContext and write SQL style queries.

I hope this helps, at least for any future visitors.

Napoleon Borntoparty
  • 1,870
  • 1
  • 8
  • 28
  • Thanks for this. It looks like we should be able to submit python zips alongside a primary jar for a spark job and use those python zips as dependencies. – turtlemonvh Nov 26 '19 at 16:00
  • 1
    I think you're speaking from a situation where you have a context in a Python process, register the UDF, and then reuse the context in a JVM where you could access it. This would be possible in a Databricks notebook, but not when I have a single job that I start with spark-submit. – Def_Os Dec 14 '20 at 07:53
  • I'm looking for the way to create a PySpark UDF for a SparkContext created using Scala. The link you gave is now broken. – Averell Dec 12 '22 at 11:43