1

Is there any way out to access java objects on worker nodes through pyspark ?

Below is my working sample scala code. I am trying to achieve same with the help of python on pyspark.

import test.Transformation
import test.GeneralPreHook

sc.addJar("/navi/Transformer.jar")
GeneralPreHook.config(sc.hadoopConfiguration)
val params = sc.hadoopConfiguration.get("params")
val file = sc.textFile("adl://xxxxx.azuredatalakestore.net/navi/test.txt")

val output = file.flatMap(line => line.split("\n")).map{word =>
    val transformer = new Transformation(params)
    transformer.decryptString(word)
    }

output.saveAsTextFile("adl://xxxxx.azuredatalakestore.net/navi/Out")

Here is the python sample code, which I am trying to execute through pyspark.

Open pyspark shell with below parameters.

pyspark --jars /navi/Transformer.jar --master yarn

from pyspark import SparkContext

#Add required jar files to distributed cache which can be used by slave nodes
sc._jsc.addJar("/navi/Transformer.jar")
sc._jvm.test.Transformation.setParams(params)
# Create a RDD of the input file. 
file = sc.textFile("adl://xxxxx.azuredatalakestore.net/navi/test.txt")

#5. Split each line to tokens, transform input tokens in map class
output=file.flatMap(lambda line: line.split(",")).map(lambda line : sc._jvm.test.Transformer.transformStatic(line))

#Save results(the task directory should be cleaned first)
output.saveAsTextFile("adl://xxxxx.azuredatalakestore.net/navi/Out")

#Alternatively, we can display the results directly
output.collect()

While collecting or saving the results to the output file, I am getting below errors :-

pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

I googled it a lot but didn't get any luck to resolve this issue. However, similar code is executed perfectly with scala. Any other way out to use the java function within the map ?

Any help on this would be greatly appreciated. Thanks in advance!

java_dev
  • 992
  • 1
  • 8
  • 11
  • may be this will help - https://stackoverflow.com/questions/31684842/calling-java-scala-function-from-a-task – Pushkr Sep 25 '18 at 00:32
  • Thanks for the reply @Pushkr , I looked into the provided solutions. In general Spark doesn't use Py4j for anything else than some basic RPC calls on the driver and doesn't start Py4j gateway on any other machine. So there isn't any direct solution for using the java code inside the transformations in case of Yarn-Cluster mode. – java_dev Sep 25 '18 at 12:37

0 Answers0