4

I wrote the following MyPythonGateway.java so that I can call my custom java class from Python:

public class MyPythonGateway {

    public String findMyNum(String input) {
        return MyUtiltity.parse(input).getMyNum(); 
    }

    public static void main(String[] args) {
        GatewayServer server = new GatewayServer(new MyPythonGateway());
        server.start();
    }
}

and here is how I used it in my Python code:

def main():

    gateway = JavaGateway()                   # connect to the JVM
    myObj = gateway.entry_point.findMyNum("1234 GOOD DAY")
    print(myObj)


if __name__ == '__main__':
    main()

Now I want to use MyPythonGateway.findMyNum() function from PySpark, not just a standalone python script. I did the following:

myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")
print(myNum)

However, I got the following error:

... line 43, in main:
myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")
  File "/home/edamameQ/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 726, in __getattr__
py4j.protocol.Py4JError: Trying to call a package.

So what did I miss here? I don't know if I should run a separate JavaApplication of MyPythonGateway to start a gateway server when using pyspark. Please advice. Thanks!


Below is exactly what I need:

input.map(f)

def f(row):
   // call MyUtility.java 
   // x = MyUtility.parse(row).getMyNum()
   // return x

What would be the best way to approach this? Thanks!

zero323
  • 322,348
  • 103
  • 959
  • 935
Edamame
  • 23,718
  • 73
  • 186
  • 320
  • For `input.map(f)`, what is `input`? If it's not an RDD/Dataset/Dataframe, then what Kuttan has below is fine. But if it's a RDD, then his solution won't work. – Marcus Sep 10 '19 at 18:04

4 Answers4

4

In PySpark before start calling the method -

myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")

you have to import MyPythonGateway java class as follows

java_import(sparkContext._jvm, "myPackage.MyPythonGateway")
myPythonGateway  = spark.sparkContext._jvm.MyPythonGateway()
myPythonGateway.findMyNum("1234 GOOD DAY")

specify the jar containing myPackage.MyPythonGateway with --jars option in spark-submit

Kuttan
  • 41
  • 1
3

First of all the error you see usually means the class you're trying to use is not accessible. So most likely it is a CLASSPATH issue.

Regarding general idea there are two important issues:

  • you cannot access SparkContext inside an action or transformation so using PySpark gateway won't work (see How to use Java/Scala function from an action or a transformation? for some details)). If you want to use Py4J from the workers you'll have to start a separate gateways on each worker machine.
  • you really don't want to pass data between Python an JVM this way. Py4J is not designed for data intensive tasks.
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks! Basically, MyUtitlity.java is somewhat complicated and we really do NOT want to re-code it in python. Is there a way to call MyUtility.java from a pyspark job? We don't necessarily need to use Py4J if there is other option ... – Edamame Feb 28 '16 at 23:30
  • Well, a lot depends on your architecture and code. Probably the simplest and relatively efficient solution is to `pipe` data to a Java code and read the output. Alternatively you can pass data through disk (this is basically how PySpark driver used to handled things, although I think it is not the case anymore. Or maybe it is). The most complicated solution is to have persistent (or temporary, for example during lifetime of executor) Java process which handles requests. – zero323 Feb 29 '16 at 00:38
  • How to properly register jars at both driver and workers? Then make Python wrappers to jars to be properly called on driver? – Alexander Myltsev Mar 22 '16 at 06:23
  • 1
    @AlexanderMyltsev For the driver you need `driver-class-path` or similar solution. For workers `--jars` or `--packages` are a method of choice but distributing manually and adding to classpath should work as well. – zero323 Mar 22 '16 at 15:15
  • > If you want to use Py4J from the workers you'll have to start a separate gateways on each worker machine How? – scubbo Aug 12 '19 at 00:26
1

If input.map(f) has inputs as an RDD for example, this might work, since you can't access the JVM variable (attached to spark context) inside the executor for a map function of an RDD (and to my knowledge there is no equivalent for @transient lazy val in pyspark).

def pythonGatewayIterator(iterator):
    results = []
    jvm = py4j.java_gateway.JavaGateway().jvm
    mygw = jvm.myPackage.MyPythonGateway()
    for value in iterator:
        results.append(mygw.findMyNum(value))
    return results


inputs.mapPartitions(pythonGatewayIterator)
Marcus
  • 2,128
  • 20
  • 22
0

all you need to do is compile jar and add to pyspark classpath with --jars or --driver-class-path spark submit options. Then access class and method with below code-

sc._jvm.com.company.MyClass.func1()

where sc - spark context

Tested with Spark 2.3. Keep in mind, you can call JVM class method only from driver program and not executor.

akshat thakar
  • 1,445
  • 21
  • 29