1
val tmpDataFS = FileSystem.get(new java.net.URI(tmpDataBasePath),Config.sc.hadoopConfiguration)

val rawEventsFS = FileSystem.get(new java.net.URI(basePath),Config.sc.hadoopConfiguration)

This is the equivalent scala code, which I need to convert to python. Tried getting Filesystem from JVM and pass it though serializable configuration, but getting error when I do that.

I tried this :

hadoopConf = sc._jsc.hadoopConfiguration()
serializableHadoopConf = sc._gateway.jvm.org.apache.spark.util.SerializableConfiguration(hadoopConf)
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
URI           = sc._gateway.jvm.java.net.URI

def blurFunction(row): 
     fs = FileSystem.get(URI("s3a://test-dwh/sessions/test.jpg"),serializableHadoopConf.value())

df.rdd.repartition(6).map(blurFunction).take(10)

But got the following error:

py4j.protocol.Py4JError: An error occurred while calling o974.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-6373543827769130136.py", line 277, in <module>
    exec(code)
  File "<stdin>", line 16, in <module>
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1343, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/usr/lib/spark/python/pyspark/context.py", line 992, in runJob
    port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2455, in _jrdd
    self._jrdd_deserializer, profiler)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2388, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/usr/lib/spark/python/pyspark/serializers.py", line 460, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 704, in dumps
    cp.dump(obj)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 162, in dump
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o974.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
  • What is the blurFunction even doing? You're mapping rows, but do nothing with them. Even if this ran, you'd get a dataframe of 10 null rows – OneCricketeer Dec 15 '17 at 14:37

0 Answers0