1

I was unable to find this problem in the numerous Stack Overflow similar questions "how to read csv into a pyspark dataframe?" (see list of similar sounding but different questions at end).

The CSV file in question resides in the tmp directory of the driver of the cluster, note that this csv file is intentionally NOT in the Databricks DBFS cloud storage. Using DBFS will not work for the use case that led to this question.

Note I am trying to get this working on Databricks runtime 10.3 with Spark 3.2.1 and Scala 2.12.

y_header = ['fruit','color','size','note']
y = [('apple','red','medium','juicy')]
y.append(('grape','purple','small','fresh'))
import csv
with (open('/tmp/test.csv','w')) as f:
  w = csv.writer(f)
  w.writerow(y_header)
  w.writerows(y)

Then use python os to verify the file was created:

import os
list(filter(lambda f: f == 'test.csv',os.listdir('/tmp/')))

cmd output

Now verify that the databricks Spark API can see the file, have to use file:///

dbutils.fs.ls('file:///tmp/test.csv')

cmd output

Now, optional step, specify a dataframe schema for Spark to apply to the csv file:

from pyspark.sql.types import *
csv_schema = StructType([StructField('fruit', StringType()), StructField('color', StringType()), StructField('size', StringType()), StructField('note', StringType())])

Now define the PySpark dataframe:

x = spark.read.csv('file:///tmp/test.csv',header=True,schema=csv_schema)

cmd output

Above line runs no errors, but remember, due to lazy execution, the spark engine still has not read the file. So next we will give Spark a command that forces it to execute the dataframe:

display(x)

And the error is: FileReadException: Error while reading file file:/tmp/test.csv. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved. If Delta cache is stale or the underlying files have been removed, you can invalidate Delta cache manually by restarting the cluster. Caused by: FileNotFoundException: File file:/tmp/test.csv does not exist. . .

and digging into the error I found this: java.io.FileNotFoundException: File file:/tmp/test.csv does not exist. And I already tried restarting the cluster, restart did not clear the error.

But I can prove the file does exist, only for some reason Spark and Java are unable to access it, because I can read in the same file with pandas no problem:

import pandas as p
p.read_csv('/tmp/test.csv')

So how do I get spark to read this csv file?

appendix - list of similar spark read csv questions I searched through that did not answer my question: 1 2 3 4 5 6 7 8

  • 1
    why file couldn't be put on DBFS? – Alex Ott Mar 24 '22 at 18:08
  • 1
    `spark.read.csv('/tmp/test.csv')`? spark should read local file. – Emma Mar 24 '22 at 18:31
  • 1
    Can you try with spark.read.csv("dbfs:/tmp/test.csv") ? – greenie Mar 24 '22 at 19:32
  • Alex this is for a simulation. The CSV file in the driver tmp directory will be temporarily caching the results of hundreds of millions of simulated transactions. Theoretically this could be written to DBFS on cloud, however, consider that there will be hundreds of instances of this simulator running various simulations with different parameters at the same time. So I think it's best to write to /tmp/ on cluster. – Nathan T Alexander Mar 24 '22 at 20:18
  • Emma the error output of spark.read.csv('/tmp/test.csv') is "AnalysisException: Path does not exist: dbfs:/tmp/test.csv" – Nathan T Alexander Mar 24 '22 at 20:19
  • Greenie the error output of your suggestion, spark.read.csv("dbfs:/tmp/test.csv") is AnalysisException: Path does not exist: dbfs:/tmp/test.csv, because you dbfs: is Spark's path to the cloud storage, not on the local cluster. – Nathan T Alexander Mar 24 '22 at 20:21
  • 1
    @NathanTAlexander Can you try using `/dbfs/tmp/test.csv` instead? – Dipanjan Mallick Mar 25 '22 at 12:17
  • 1
    My bad, I missed the line about the dbfs. You are NOT having it in dbfs and the file exists in driver. Are you running this in cluster mode? I am guessing the issue can come up if the file is not existing in executors. If you are running in local, I am not sure why you have an issue. – Emma Mar 25 '22 at 14:39
  • DKNY Python on my databricks cannot write to the path you said ...with (open('/dfbs/tmp/test.csv','w')) as f:... errors with "FileNotFoundError: [Errno 2] No such file or directory: '/dfbs/tmp/test.csv'" Again, the question is how to get Spark/Databricks to read a csv file from a tmp directory on the driver. Yes I could hack together some other less optimal solution involving and I probably will have to unless this is solved. – Nathan T Alexander Mar 25 '22 at 14:39
  • Emma thanks I'm not sure what you mean by cluster vs local mode. I'm using Azure databricks, so it's a cloud thingy. And correct, the tmp file only exists on the driver, not on the executors, is this why it errors out? If so is there a parameter or setting in spark so I can tell it to only look on the driver? – Nathan T Alexander Mar 25 '22 at 14:41
  • btw I just read another reason to get this working in Spark the way I want, according to https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option Spark is supposed to be able to read in a directory of CSV files as well. I'm gonna try this on the driver fs and also on dbfs – Nathan T Alexander Mar 25 '22 at 14:48
  • Spark can run in local or cluster mode. local means you run a job in a single instance. _Looks like_ databricks can run only in cluster mode. And the error can happen if file only exists in driver. – Emma Mar 25 '22 at 14:53
  • I also found additional docs here https://learn.microsoft.com/en-us/azure/databricks/data/databricks-file-system?msclkid=48c5b198ac4c11ec98e959a23d438502 seems like databricks just wants me to use DBFS, if that's so I'll have to have each instance of my notebook being executed read/write to a randomly generated name folder on dbfs, and clean up afterwards. I'd really rather just use /tmp/ if I can get it working. – Nathan T Alexander Mar 25 '22 at 15:05
  • Emma your clues are correct! I created a single node cluster and Spark is able to read the csv from tmp and execute the dataframe! So the root cause of this must be that Spark is searching on nodes for the same file or directory, not finding them, and throwing an error! – Nathan T Alexander Mar 25 '22 at 15:14

3 Answers3

1

I guess databricks file loader doesn't seem to recognize the absolute path /tmp/. you can try the following work around.

  1. Read the file using path using Pandas Dataframe
  2. Pass the pandas dataframe to Spark using CreateDataFrame function

Code :

df_pd = pd.read_csv('File:///tmp/test.csv')
sparkDF=spark.createDataFrame(df_pd) 
sparkDF.display()

Output :

enter image description here

Anand Vidvat
  • 977
  • 7
  • 20
  • Thanks I tried this, for a small csv file this could work. The problem is that when it's time to read the file back into a Spark dataframe, it will have 200M+ rows, could crash pandas. So I'm still looking for an answer that involves the Spark API reading the CSV straight into a spark dataframe. – Nathan T Alexander Mar 25 '22 at 12:51
1

In my case, the .csv file I was trying to read was on an external storage (company storage). Copying the file to the internal storage solved it for me. The other solutions didn't work.

0

I made email contact with a Databricks architect, who confirmed that Databricks can only read locally (from the cluster) in a single node setup.

So DBFS is the only option for random writing/reading of text data files in a typical cluster which contains >1 node.