2

I fail to understand how to simply list the contents of an S3 bucket on EMR during a spark job. I wanted to do the following

Configuration conf = spark.sparkContext().hadoopConfiguration();
FileSystem s3 = S3FileSystem.get(conf);
List<LocatedFileStatus> list = toList(s3.listFiles(new Path("s3://mybucket"), false))

This always fails with the following error

java.lang.IllegalArgumentException: Wrong FS: s3://*********/, expected: hdfs://**********.eu-central-1.compute.internal:8020

in the hadoopConfiguration fs.defaultFS -> hdfs://**********.eu-central-1.compute.internal:8020

The way I understand it if I don't use a protocol just /myfolder/myfile instead of i.e. hdfs://myfolder/myfile it will default to the df.defaultFS. But I would expect if I specify my s3://mybucket/ the fs.defaultFS should not matter.

How does one access the directory information? spark.read.parquet("s3://mybucket/*.parquet") works just fine but for this task I need to check the existence of some files and would also like to delete some. I assumed org.apache.hadoop.fs.FileSystem would be the correct tool.

PS: I also don't understand how logging works. If I use deploy-mode cluster (i want to deploy jars from s3 which does not work in client mode), the I can only find my logs in s3://logbucket/j-.../containers/application.../conatiner...0001. There is quite a long delay before those show in S3. How do I find it via ssh on the master? or is there some faster/better way to check spark application logs? UPDATE: Just found them under /mnt/var/log/hadoop-yarn/containers however the it is owned by yarn:yarn and as hadoop user I cannot read it. :( Ideas?

samst
  • 536
  • 7
  • 19

3 Answers3

0

In my case I needed to read a parquet file that was generated by prior EMR jobs, I was looking for list of files for a given s3 prefix, but nice thing is we don't need to do all that, we can simply do this: spark.read.parquet(bucket+prefix_directory)

Al Kannan
  • 11
  • 1
0

URI.create() should be used to point it to correct Filesystem.

val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

val dirPaths = FileSystem.get(URI.create("<s3-path>"), fs.getConf).listStatus(new Path("<s3-path>"))```
-1

I don't think you are picking up the FS right; just use the static FileSystem.get() method, or Path.get()

Try something like: Path p = new Path("s3://bucket/subdir"); FileSystem fs = p.get(conf); FileStatus[] status= fs.listStatus(p);

Regarding logs, YARN UI should let you at them via the node managers.

stevel
  • 12,567
  • 1
  • 39
  • 50