3

Is there any way to get the Hadoop FileSystem from a Spark Executor when performing a mapPartitions operation over a Spark Dataframe? If not, at least is there any way to get the Hadoop configuration in order to generate a new Hadoop FileSystem?

Take into account that the HDFS is kerberized.

The use-case would be something like (pseudo-code):

spark.sql("SELECT * FROM cities").mapPartitions{ iter =>
    iter.groupedBy(some-variable).foreach{ rows =>
        hadoopFS.write(rows)
    }
    TaskContext.getPartitionId
}
miguel0afd
  • 305
  • 3
  • 15
  • There is a way... But why don't you just call write method on the dataframe object? – OneCricketeer Jun 09 '18 at 20:41
  • Because I want to create a kind of dynamic partitioning from the executors based on some variables. So... could you tell me that way, please? – miguel0afd Jun 10 '18 at 01:29
  • I've modified a little bit the pseudo-code to note that calling write method is not suitable here – miguel0afd Jun 10 '18 at 02:32
  • 1
    See https://stackoverflow.com/a/27027071/2308683 – OneCricketeer Jun 10 '18 at 17:08
  • Given that SparkContext is not available in the executors... I don't think that the solution proposed by @cricket_007 is valid – miguel0afd Jun 26 '18 at 07:50
  • I tried to follow the example of the suggested link but I get the next exception: org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration Serialization stack: - object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml) ` – miguel0afd Jun 28 '18 at 07:01
  • I tried to serialize the Hadoop Configuration as a json string and use it inside the mapPartitions but, even so, Hadoop Configuration couldn't be deserialized. Any idea? – miguel0afd Jul 05 '18 at 00:19
  • You should be able to get a configuration object within each partition action using the standard `new Configuration()` constructor. Thatll pull the configuration from the classpath on its own. That way, you're not deserializing around the closure – OneCricketeer Jul 05 '18 at 05:24
  • @cricket_007 Your suggestion didn't work... It takes the default parameters – miguel0afd Jul 06 '18 at 09:04

1 Answers1

4

I found the solution. Spark utils contains a very simple way of serializing the hadoop configuration: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala

miguel0afd
  • 305
  • 3
  • 15