val tmpDataFS = FileSystem.get(new java.net.URI(tmpDataBasePath),Config.sc.hadoopConfiguration)
val rawEventsFS = FileSystem.get(new java.net.URI(basePath),Config.sc.hadoopConfiguration)
This is the equivalent scala code, which I need to convert to python. Tried getting Filesystem from JVM and pass it though serializable configuration, but getting error when I do that.
I tried this :
hadoopConf = sc._jsc.hadoopConfiguration()
serializableHadoopConf = sc._gateway.jvm.org.apache.spark.util.SerializableConfiguration(hadoopConf)
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
URI = sc._gateway.jvm.java.net.URI
def blurFunction(row):
fs = FileSystem.get(URI("s3a://test-dwh/sessions/test.jpg"),serializableHadoopConf.value())
df.rdd.repartition(6).map(blurFunction).take(10)
But got the following error:
py4j.protocol.Py4JError: An error occurred while calling o974.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-6373543827769130136.py", line 277, in <module>
exec(code)
File "<stdin>", line 16, in <module>
File "/usr/lib/spark/python/pyspark/rdd.py", line 1343, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "/usr/lib/spark/python/pyspark/context.py", line 992, in runJob
port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2455, in _jrdd
self._jrdd_deserializer, profiler)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2388, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/usr/lib/spark/python/pyspark/serializers.py", line 460, in dumps
return cloudpickle.dumps(obj, 2)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 704, in dumps
cp.dump(obj)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 162, in dump
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o974.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)