20

We are running spark 2.3.0 on AWS EMR. The following DataFrame "df" is non empty and of modest size:

scala> df.count
res0: Long = 4067

The following code works fine for writing df to hdfs:

   scala> val hdf = spark.read.parquet("/tmp/topVendors")
hdf: org.apache.spark.sql.DataFrame = [displayName: string, cnt: bigint]

scala> hdf.count
res4: Long = 4067

However using the same code to write to a local parquet or csv file end up with empty results:

df.repartition(1).write.mode("overwrite").parquet("file:///tmp/topVendors")

scala> val locdf = spark.read.parquet("file:///tmp/topVendors")
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:207)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:207)
  at scala.Option.getOrElse(Option.scala:121)

We can see why it fails:

 ls -l /tmp/topVendors
total 0
-rw-r--r-- 1 hadoop hadoop 0 Jul 30 22:38 _SUCCESS

So there is no parquet file being written.

I have tried this maybe twenty times and for both csv and parquet and on two different EMR Servers: this same behavior is exhibited in all cases.

Is this an EMR specific bug? A more general EC2 bug? Something else? This code works on spark on macOS.

In case it matters - here is the versioning info:

Release label:emr-5.13.0
Hadoop distribution:Amazon 2.8.3
Applications:Spark 2.3.0, Hive 2.3.2, Zeppelin 0.7.3
Yongfeng
  • 666
  • 7
  • 22
WestCoastProjects
  • 58,982
  • 91
  • 316
  • 560

2 Answers2

20

That is not a bug and it is the expected behavior. Spark does not really support writes to non-distributed storage (it will work in local mode, just because you have shared file system).

Local path is not interpreted (only) as a path on the driver (this would require collecting the data) but local path on each executor. Therefore each executor will write its own chunk to its own local file system.

Not only output is no readable back (to load data each executor and the driver should see the same state of the file system), but depending on the commit algorithm, might not be even finalized (move from the temporary directory).

zero323
  • 322,348
  • 103
  • 959
  • 935
  • I see. Funny I had thought had done this before on a cluster. Maybe it worked in Standalone but not in Yarn?? – WestCoastProjects Jul 31 '18 at 01:00
  • I have double checked and on our internal non-AWS/EMR clusters we are able to write to `file://` – WestCoastProjects Jul 31 '18 at 03:42
  • AFAIK The case when it might seem to work, is when driver is by chance co-located with the executor. This however, same as local, is detail of resource allocation, not a rule. – zero323 Jul 31 '18 at 15:16
  • When running on the non-AWS cluster we have up to 100 executors yet are still able to save the local file. It seems that the results from each (remote) executor are sent back to the driver. – WestCoastProjects Jul 31 '18 at 15:29
  • 1
    btw `Holden Karau` - who is a top committer on `spark` has an answer stating that local saves are possible https://stackoverflow.com/a/31240494/1056563 `saveAsTextFile is able to take in local file system paths (e.g. file:///tmp/magic/...). ` – WestCoastProjects Jul 31 '18 at 15:30
  • 1
    I don't see a real conflict between my answer here and the Holden's answer. Holden clearly states that _if your running on a distributed cluster, you most likely want to `collect()` the data back to the cluster and then save it with standard file operations_ and as far as I can tell, doesn't claim that _is able to take in local file system paths_ means that data is automagically transferred to the driver. – zero323 Jul 31 '18 at 19:44
-2

This error usually occurs when you try to read an empty directory as parquet. You could check 1. if the DataFrame is empty with outcome.rdd.isEmpty() before write it. 2. Check the if the path you are giving is correct

Also in what mode you are running your application? Try running it in client mode if you are running in cluster mode.

Gangadhar Kadam
  • 536
  • 1
  • 4
  • 15