Is there any way out to access java objects on worker nodes through pyspark ?
Below is my working sample scala code. I am trying to achieve same with the help of python on pyspark.
import test.Transformation
import test.GeneralPreHook
sc.addJar("/navi/Transformer.jar")
GeneralPreHook.config(sc.hadoopConfiguration)
val params = sc.hadoopConfiguration.get("params")
val file = sc.textFile("adl://xxxxx.azuredatalakestore.net/navi/test.txt")
val output = file.flatMap(line => line.split("\n")).map{word =>
val transformer = new Transformation(params)
transformer.decryptString(word)
}
output.saveAsTextFile("adl://xxxxx.azuredatalakestore.net/navi/Out")
Here is the python sample code, which I am trying to execute through pyspark.
Open pyspark shell with below parameters.
pyspark --jars /navi/Transformer.jar --master yarn
from pyspark import SparkContext
#Add required jar files to distributed cache which can be used by slave nodes
sc._jsc.addJar("/navi/Transformer.jar")
sc._jvm.test.Transformation.setParams(params)
# Create a RDD of the input file.
file = sc.textFile("adl://xxxxx.azuredatalakestore.net/navi/test.txt")
#5. Split each line to tokens, transform input tokens in map class
output=file.flatMap(lambda line: line.split(",")).map(lambda line : sc._jvm.test.Transformer.transformStatic(line))
#Save results(the task directory should be cleaned first)
output.saveAsTextFile("adl://xxxxx.azuredatalakestore.net/navi/Out")
#Alternatively, we can display the results directly
output.collect()
While collecting or saving the results to the output file, I am getting below errors :-
pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
I googled it a lot but didn't get any luck to resolve this issue. However, similar code is executed perfectly with scala. Any other way out to use the java function within the map ?
Any help on this would be greatly appreciated. Thanks in advance!