58
response = "mi_or_chd_5"

outcome = sqlc.sql("""select eid,{response} as response
from outcomes
where {response} IS NOT NULL""".format(response=response))
outcome.write.parquet(response, mode="overwrite") # Success
print outcome.schema
StructType(List(StructField(eid,IntegerType,true),StructField(response,ShortType,true)))

But then:

outcome2 = sqlc.read.parquet(response)  # fail

fails with:

AnalysisException: u'Unable to infer schema for Parquet. It must be specified manually.;'

in

/usr/local/lib/python2.7/dist-packages/pyspark-2.1.0+hadoop2.7-py2.7.egg/pyspark/sql/utils.pyc in deco(*a, **kw)

The documentation for parquet says the format is self describing, and the full schema was available when the parquet file was saved. What gives?

Using Spark 2.1.1. Also fails in 2.2.0.

Found this bug report, but was fixed in 2.0.1, 2.1.0.

UPDATE: This work when on connected with master="local", and fails when connected to master="mysparkcluster".

user48956
  • 14,850
  • 19
  • 93
  • 154

18 Answers18

86

This error usually occurs when you try to read an empty directory as parquet. Probably your outcome Dataframe is empty.

You could check if the DataFrame is empty with outcome.rdd.isEmpty() before writing it.

Javier Montón
  • 4,601
  • 3
  • 21
  • 29
  • The dataframe is not empty. I believe the issue happens because the filename `response` can't be written to on the cluster. Works fine in local mode. – user48956 Aug 16 '17 at 20:01
  • Then maybe you could try changing the username. In Pyspark: `os.environ["HADOOP_USER_NAME"] = "hdfs"` or in Scala`System.setProperty("HADOOP_USER_NAME","hdfs")` – Javier Montón Aug 31 '17 at 10:20
  • I'm not sure we're make use of hadoop. Is is a requirement for Spark and needs to be configured with user profiles when the spark cluster is installed? (All of our data is sourced from relational DBs and loaded into Spark on demand). In any case wouldn't I need to prefix the filename with "hdfs://". If use a filename such as "/my/nfs/network_directory/filename" saving works. Which also makes me think that the path refers to the worker-local filesystem. (sorry -- spark n00b) – user48956 Aug 31 '17 at 15:44
  • 1
    Sorry I assumed you used Hadoop. You can run Spark in Local[], Standalone (cluster with Spark only) or YARN (cluster with Hadoop). If you're using YARN mode, by default all paths assumed you're using HDFS and it's not necessary put `hdfs://`, in fact if you want to use local files you should use `file://`If for example you are sending an aplication to the cluster from your computer, the application will use your username and probably it haven't access to HDFS files. With HADOOP_USER_NAME you can change it In Spark Standalone I don't know exactly how files and permissions work Hope this help! – Javier Montón Sep 04 '17 at 16:23
  • I'll check, but as I remember, if I specified a valid path to a locally filesystem mounted network drive starting "/mnt/..., all was fine. When '/...' is specified, does it look to Hadoop:Yarn first and then local file system if not found? – user48956 Sep 04 '17 at 16:29
  • 3
    It's never a good practice to use isEmpty() method. Please avoid if you can - it 'can' bring the entire data into driver memory - refer RDD class code in Spark. – Kumar Vaibhav Jul 17 '18 at 15:11
18

In my case, the error occured because I was trying to read a parquet file which started with an underscore (e.g. _lots_of_data.parquet). Not sure why this was an issue, but removing the leading underscore solved the problem.

See also:

ostrokach
  • 17,993
  • 11
  • 78
  • 90
  • 6
    Spark treats all files that begin with `_` as metadata and not data. – Sim Nov 03 '18 at 23:50
  • 3
    "Spark 2.0 ignores the path names starting with underscore or dot; `_` or `.` " as discussed by Spark developers here: https://issues.apache.org/jira/browse/SPARK-16975?focusedCommentId=15415903&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15415903 – Davos Aug 19 '19 at 02:28
8

This case occurs when you try to read a table that is empty. If the table had correctly inserted data, there should be no problem.

Besides that with parquet, the same thing happens with ORC.

Anxo P
  • 759
  • 7
  • 12
8

I'm using AWS Glue and I received this error while reading data from a data catalog table (location: s3 bucket). After bit of analysis I realised that, this is due to file not available in file location(in my case s3 bucket path).

Glue was trying to apply data catalog table schema on a file which doesn't exist.

After copying file into s3 bucket file location, issue got resolved.

Hope this helps someone who encounters/encountered an error in AWS Glue.

Ash
  • 1,180
  • 3
  • 22
  • 36
  • 3
    Also with AWS Glue, if the job bookmark filter results in there being no data and you attempt to write then it says "After final job bookmarks filter, processing 0.00% of 0 files in partition" which then leads to "Unable to infer schema for Parquet. It must be specified manually." because the frame being written is empty. – Davos Aug 19 '19 at 02:22
4

I see there are already so many Answers. But the issue I faced was my Spark job was trying to read a file which is being overwritten by another Spark job that was previously started. It sounds bad, but I did that mistake.

Pavan_Obj
  • 1,071
  • 1
  • 12
  • 26
4

Just to emphasize @Davos answer in a comment, you will encounter this exact exception error, if your file name has a dot . or an underscore _ at start of the filename

val df = spark.read.format("csv").option("delimiter", "|").option("header", "false")
         .load("/Users/myuser/_HEADER_0")

org.apache.spark.sql.AnalysisException: 
Unable to infer schema for CSV. It must be specified manually.;

Solution is to rename the file and try again (e.g. _HEADER rename to HEADER)

ibaralf
  • 12,218
  • 5
  • 47
  • 69
3

Happened to me for a parquet file that was in the process of being written to. Just need to wait for it to be completely written.

Bruno Degomme
  • 883
  • 10
  • 11
2

In my case, the error occurred because the filename contained underscores. Rewriting / reading the file without underscores (hyphens were OK) solved the problem...

meeh
  • 29
  • 2
2

For me this happened when I thought loading the correct file path but instead pointed a incorrect folder

mani_nz
  • 4,522
  • 3
  • 28
  • 37
1

I ran into a similar problem with reading a csv

spark.read.csv("s3a://bucket/spark/csv_dir/.")

gave an error of:

org.apache.spark.sql.AnalysisException: Unable to infer schema for CSV. It must be specified manually.;

I found if I removed the trailing . and then it works. ie:

spark.read.csv("s3a://bucket/spark/csv_dir/")

I tested this for parquet adding a trailing . and you get an error of:

org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
lockwobr
  • 1,461
  • 1
  • 15
  • 24
  • 3
    "Spark 2.0 ignores the path (file) names starting with underscore or dot; `_` or `.` " as discussed by Spark developers here: https://issues.apache.org/jira/browse/SPARK-16975?focusedCommentId=15415903&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15415903 – Davos Aug 19 '19 at 02:40
1

I just encountered the same problem but none of the solutions here work for me. I try to merge the row groups of my parquet files on hdfs by first reading them and write it to another place using:

df = spark.read.parquet('somewhere')
df.write.parquet('somewhere else')

But later when I query it with

spark.sql('SELECT sth FROM parquet.`hdfs://host:port/parquetfolder/` WHERE .. ')

It shows the same problem. I finally solve this by using pyarrow:

df = spark.read.parquet('somewhere')
pdf = df.toPandas()
adf = pa.Table.from_pandas(pdf)   # import pyarrow as pa
fs = pa.hdfs.connect()
fw = fs.open(path, 'wb')
pq.write_table(adf, fw)           # import pyarrow.parquet as pq
fw.close()
cloudray
  • 41
  • 6
1

Check if .parquet files available at the response path. I am assuming, either files don't exist or it may be exist in some internal(partitioned) folders. If files are available under multiple hierarchy folders then append /* for each folder.

As in my case .parquet files were under 3 folders from base_path, so I have given path as base_path/*/*/*

Shams
  • 3,637
  • 5
  • 31
  • 49
1

Seems this issue can be caused by a lot of reasons, I am providing another scenario:

By default the spark parquet source is using "partition inferring" which means it requires the file path to be partition in Key=Value pairs and the loads happens at the root.

To avoid this, if we assure all the leaf files have identical schema, then we can use

df = spark.read.format("parquet")\
          .option("recursiveFileLookup", "true")

to disable the "partition inferring" manually. It basically load files one by one using the parquet's logical_type.

QPeiran
  • 1,108
  • 1
  • 8
  • 18
0

As others mentioned, in my case this error appeared when I was reading S3 keys that did not exist. A solution is filter-in keys that do exist:

import com.amazonaws.services.s3.AmazonS3URI
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession
import java.net.URI

def addEndpointToUrl(url: String, domain: String = "s3.amazonaws.com"): String = {
  val uri = new URI(url)
  val hostWithEndpoint = uri.getHost + "." + domain
  new URI(uri.getScheme, uri.getUserInfo, hostWithEndpoint, uri.getPort, uri.getPath, uri.getQuery, uri.getFragment).toString
}

def createS3URI(url: String): AmazonS3URI = {
  try {
    // try to instantiate AmazonS3URI with url
    new AmazonS3URI(url)
  } catch {
    case e: IllegalArgumentException if e.getMessage.
      startsWith("Invalid S3 URI: hostname does not appear to be a valid S3 endpoint") => {
      new AmazonS3URI(addEndpointToUrl(url))
    }
  }
}

def s3FileExists(spark: SparkSession, url: String): Boolean = {
  val amazonS3Uri: AmazonS3URI = createS3URI(url)
  val s3BucketUri = new URI(s"${amazonS3Uri.getURI().getScheme}://${amazonS3Uri.getBucket}")

  FileSystem
    .get(s3BucketUri, spark.sparkContext.hadoopConfiguration)
    .exists(new Path(url))
}

and you can use it as:

val partitions = List(yesterday, today, tomorrow)
  .map(f => somepath + "/date=" + f)
  .filter(f => s3FileExists(spark, f))

val df = spark.read.parquet(partitions: _*)

For that solution I took some code out of spark-redshift project, here.

Kyr
  • 5,383
  • 2
  • 27
  • 22
0

You are just loading a parquet file , Of course parquet had valid schema. Otherwise it would not been saved as parquet. This error means -

  • Either parquet file does not exist . (99.99% cases this is the issue. Spark error messages are often less obvious)
  • Somehow the parquet file got corrupted or Or It's not a parquet file at all
sapy
  • 8,952
  • 7
  • 49
  • 60
  • 1
    Yes. In retrospect, that may be obvious to someone who knows how to interpret Spark exception messages. – user48956 Sep 26 '20 at 20:19
0

I ran into this issue because of folder in folder issue.

for example folderA.parquet was supposed to have partion.... but instead it have folderB.parquet which inside have partition.

Resolution, transfer the file to parent folder and delete the subfolder.

Rishabh Agarwal
  • 2,374
  • 1
  • 21
  • 27
0

you can read with /*

outcome2 = sqlc.read.parquet(f"{response}/*")  # work for me
0

My problem was moving the files to another location and not changing the path in the files in the folder _spark_metadata.

To fix it do something like:

sed -i "s|/old/path|/new/path|g" **  .*

Jean Carlo Machado
  • 1,480
  • 1
  • 15
  • 25