1

I am trying to do some file operations of hdfs directly within a Pyspark Script. In particular I want to check

  1. does a path or a file exits (org.apache.hadoop.fs.FileSystem) --> ok
  2. create folders/delete folders (org.apache.hadoop.fs.FileSystem) --> ok
  3. move files from one path to another (org.apache.hadoop.fs.FileUtil)--> FAILS - WHY

So my issue is only in the use of the last class My Code

# get all the jvm Objects
spark = SparkSession.builder.getOrCreate()

hadoopPath = spark._jvm.org.apache.hadoop.fs.Path
hadoopConfiguration = spark._jsc.hadoopConfiguration()
hadoopFs   = spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoopConfiguration)

# this import does not throw an error so far
hadoopFu   = spark._jvm.org.apache.hadoop.fs.FileUtil

the following functions working like charme

path1="hdfs://mycluster/somefolder"
path2="hdfs://mycluster/newfolder"

# check if folder exits and contains parquet files (returns list with or without files)
hadoopFs.globStatus(hadoopPath(path + "/*.parquet"))

# alternativ check only if path exists (returns true, false)
hadoopFs.exists(hadoopPath(path))

# create folder on hdfs
hadoopFs.mkdirs(hadoopPath(path2))

But as soon as I want to access methods from FileUtil there are not found. But I do not understand why, they should be part of standard hadoop/spark libraries https://hadoop.apache.org/docs/r3.3.5/api/org/apache/hadoop/fs/FileUtil.html The following commands are failing

hadoopFu.list(path1)

Py4JError: An error occurred while calling z:org.apache.hadoop.fs.FileUtil.list. Trace:
py4j.Py4JException: Method list([class java.lang.String]) does not exist

also

hadoopFu.copy(path1,path2)

Py4JError: An error occurred while calling z:org.apache.hadoop.fs.FileUtil.copy. Trace:
py4j.Py4JException: Method copy([class java.lang.String, class java.lang.String]) does not exist

Why are they not found? I don't understand what I am doing wrong. Important, I want to do this within Python/Pyspark wrapper Thanks Alex

Koedlt
  • 4,286
  • 8
  • 15
  • 33
Alex Ortner
  • 1,097
  • 8
  • 24
  • I'd suggest using pyarrow or pyhdfs if you're not actually using Spark functions – OneCricketeer May 30 '23 at 11:03
  • I also thought about using pyarrow on the driver only, but this would be not directly processed distributed on the hadoop cluster and would take some time to process all files. Distributing pyarrow via Dask on a Hadoop Cluster seems also quit challenging in particular without admin access to the infrastructure – Alex Ortner May 30 '23 at 13:04
  • You can use Oozie shell action to distribute the job via YARN. You could also use `subprocess.call(['hdfs', '-ls'])`, for example, or use `distcp` for copying – OneCricketeer May 30 '23 at 15:20

1 Answers1

0

For this answer, I'm looking at the Hadoop version that is used in the latest Spark version (v3.4.0 at the time of this post): version 3.3.4 (commit hash a585a73c3e0).

If you look at the function signature of FileUtil.list

public static String[] list(File dir) throws IOException {

and of FileUtil.copy (there are multiple ones, you'll have to choose):

public static boolean copy(FileSystem srcFS, Path src,
                           FileSystem dstFS, Path dst,
                           boolean deleteSource,
                           Configuration conf) throws IOException {

or

public static boolean copy(FileSystem srcFS, Path[] srcs,
                           FileSystem dstFS, Path dst,
                           boolean deleteSource,
                           boolean overwrite, Configuration conf)
                           throws IOException {

or a few others.

You can start understanding what is going wrong: you are not supplying the correct input parameters. FileUtil.list for example needs one input parameter of type File but you are supplying an input parameter of type String (path1 is a String).

I'm not super familiar with making Java objects inside of Pyspark with the underlying _jvm object but try to make a java.io.File and give that as input to the FileUtil.list function, something like this maybe?

hadoopFu.list(spark._jvm.java.io.File(path1))
Koedlt
  • 4,286
  • 8
  • 15
  • 33
  • 1
    `java.io.File` is incorrect. Hadoop has its own Path objects. Also, Spark doesn't use trunk – OneCricketeer May 30 '23 at 11:01
  • Great remark about Spark not using trunk, I will adapt my answer to contain a relevant Hadoop version! – Koedlt May 30 '23 at 12:11
  • About `java.io.File` being incorrect, I must be missing something. If you look at the [function signature](https://github.com/apache/hadoop/blob/a585a73c3e02ac62350c136643a5e7f6095a3dbb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java#L1443-L1453C59) and the comments right above it, it seems to be using `java.io.File` right? I must admit that I'm not very familiar `org.apache.hadoop.fs.FileUtil` but I'm interested to see what your take on this is. – Koedlt May 30 '23 at 12:24
  • `path1` is a string but I transform it using `spark._jvm.org.apache.hadoop.fs.Path(path1)`. from the docmentation I thought this is the way to go – Alex Ortner May 30 '23 at 13:05
  • So you mean the error `method does not exists` might be a weird output from the method was not recognized due to the different versions that are identified by the correct parameters. But the `list` function is unique, this function at least should work – Alex Ortner May 30 '23 at 13:09
  • Indeed, when you see a `method does not exist` output that means that the exact way you wrote that method (including the input parameters and their types) does not exist. It does not necessarily mean that there is no method with that exact name. – Koedlt May 30 '23 at 13:21
  • But from the code that you shared here, it seems like `path1` is a `String`. I'm seeing `path1="hdfs://mycluster/somefolder"` and then `hadoopFu.list(path1)`. – Koedlt May 30 '23 at 13:22
  • `java.io.File` might be "correct" for that API, but it is "not correct" for accessing/listing HDFS `Path` resources since that class is generally used for local FS only. So, I don't think `FileUtil` is the the correct class to be used here – OneCricketeer May 30 '23 at 15:22
  • Fair enough, I agree on that point. Thanks for your insight! – Koedlt May 30 '23 at 15:58
  • Ok, this was a stupid copy paste error to stackoverflow `hadoopFu.list(hadoopPath(path))` or in detail `hadoopFu.list(spark._jvm.org.apache.hadoop.fs.Path(path))` . But it does still not work. – Alex Ortner May 30 '23 at 20:48
  • Yeah so as @OneCricketeer and me were discussing using a `Path` object won't work here: you'll really need a `java.io.File`. But more important than that, it seems like `FileUtil.list` might not really be the API you want to use. Something like [this](https://stackoverflow.com/a/40258750/15405732) might be more what you want. – Koedlt May 31 '23 at 07:58