20

I have a big distributed file on HDFS and each time I use sqlContext with spark-csv package, it first loads the entire file which takes quite some time.

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")

now as I just want to do some quick check at times, all I need is few/ any n rows of the entire file.

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)

but all these run after the file load is done. Can't I just restrict the number of rows while reading the file itself ? I am referring to n_rows equivalent of pandas in spark-csv, like:

pd_df = pandas.read_csv("file_path", nrows=20)

Or it might be the case that spark does not actually load the file, the first step, but in this case, why is my file load step taking too much time then?

I want

df.count()

to give me only n and not all rows, is it possible ?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Abhishek
  • 3,337
  • 4
  • 32
  • 51

7 Answers7

21

You can use limit(n).

sqlContext.format('com.databricks.spark.csv') \
          .options(header='true', inferschema='true').load("file_path").limit(20)

This will just load 20 rows.

eliasah
  • 39,588
  • 11
  • 124
  • 154
  • Would that preclude `inferSchema`, i.e. what if I used `inferSchema` enabled? – Jacek Laskowski May 31 '17 at 06:28
  • 1
    If you inferSchema, all the data will be scanned anyway. Scanned and not loaded. – eliasah May 31 '17 at 06:29
  • How is scanning different from loading? – Jacek Laskowski May 31 '17 at 06:30
  • Scanning as a passing over the data and loaded as in putting the data in memory... I'm not sure I don't get what you don't get about that @JacekLaskowski :-/ – eliasah May 31 '17 at 06:34
  • 2
    "Scanning as a passing over the data" == "loaded as in putting the data in memory" are the same things on Spark executors. – Jacek Laskowski May 31 '17 at 06:35
  • No man it's not. :) and by exploration, the OP is referring to the data exploration process in data analysis... – eliasah May 31 '17 at 06:36
  • I'm sorry but I don't agree with you on this one @JacekLaskowski – eliasah May 31 '17 at 06:37
  • 5
    @eliasah provided solution loads file first and then limit it. How is this going to help if I am having a petabyte file ? this will anyway load all the rows and then show limited records n. – Aditya Jun 18 '19 at 12:40
  • The solution is to read a part of file as it will be in a distributed environment. sqlContext.format('com.databricks.spark.csv') \ .options(header='true', inferschema='true').load("file_path/part-00000* ") – Krishna Pavan Ayitha Mar 02 '23 at 10:55
12

My understanding is that reading just a few lines is not supported by spark-csv module directly, and as a workaround you could just read the file as a text file, take as many lines as you want and save it to some temporary location. With the lines saved, you could use spark-csv to read the lines, including inferSchema option (that you may want to use given you are in exploration mode).

val numberOfLines = ...
spark.
  read.
  text("myfile.csv").
  limit(numberOfLines).
  write.
  text(s"myfile-$numberOfLines.csv")
val justFewLines = spark.
  read.
  option("inferSchema", true). // <-- you are in exploration mode, aren't you?
  csv(s"myfile-$numberOfLines.csv")
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • +1 although wouldn't .read() be read by multiple executors so the order of .limit() is not guranteed? In other words, yes you get `numberOfLines` lines but those aren't necessary first lines in the original "myfile.csv", isn't it? – Tagar Sep 11 '17 at 00:38
  • I tried this but text().limit(2) seems to be reading the whole file first, which in my case is several GBs. – Vaibhav Mar 17 '20 at 20:44
  • @Vaibhav That seems your file is not split due to a large split size on HDFS and simply fits into one single split (?) Just guessing. You should ask a separate question and provide more information, e.g. query plan and HDFS configuration. – Jacek Laskowski Mar 18 '20 at 09:14
  • @Tagar My understanding is that the order is guaranteed. Upon uploading a CSV file onto HDFS, the splits will be in order and that's what Spark knows about the file. – Jacek Laskowski Mar 18 '20 at 09:15
  • 1
    I notice the csv reader has the option `samplingRatio – defines fraction of rows used for schema inferring. If None is set, it uses the default value, 1.0.` Perhaps that is a performant way to infer the schema whilst in exploration, and help you to explicitly specify schema for performance & repeatability in production. – Davos Nov 12 '20 at 11:21
4

Not inferring schema and using limit(n) worked for me, in all aspects.

f_schema = StructType([
StructField("col1",LongType(),True),
StructField("col2",IntegerType(),True),
StructField("col3",DoubleType(),True)
...
])

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true').schema(f_schema).load(data_path).limit(10)

Note: If we use inferschema='true', its again the same time, and maybe hence the same old thing.

But if we dun have idea of the schema, Jacek Laskowski solutions works well too. :)

Abhishek
  • 3,337
  • 4
  • 32
  • 51
  • It's always going to be better to explicitly provide the schema, if you have it. You could use `inferSchema` the first time you read it and then save & use the `df.schema` for later, but if the files are so huge that doing it even once is too much, then the workaround suggested is right. The correct answer is a combination of this and https://stackoverflow.com/a/44277219/1335793 – Davos Nov 12 '20 at 12:09
4

The solution given by Jacek Laskowski works well. Presenting an in-memory variation below.

I recently ran into this problem. I was using databricks and had a huge csv directory (200 files of 200MB each)

I originally had

val df = spark.read.format("csv")
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.load("dbfs:/huge/csv/files/in/this/directory/")

display(df)

which took a lot of time (10+ minutes), but then I change it to below and it ran instantly (2 seconds)

val lines = spark.read.text("dbfs:/huge/csv/files/in/this/directory/").as[String].take(1000)

val df = spark.read
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.csv(spark.createDataset(lines))

display(df)

Inferring schema for text formats is hard and it can be done this way for the csv and json (but not if it's a multi-line json) formats.

prongs
  • 9,422
  • 21
  • 67
  • 105
3

Since PySpark 2.3 you can simply load data as text, limit, and apply csv reader on the result:

(spark
  .read
  .options(inferSchema="true", header="true")
  .csv(
      spark.read.text("/path/to/file")
          .limit(20)                   # Apply limit
          .rdd.flatMap(lambda x: x)))  # Convert to RDD[str]

Scala counterpart is available since Spark 2.2:

spark
  .read
  .options(Map("inferSchema" -> "true", "header" -> "true"))
  .csv(spark.read.text("/path/to/file").limit(20).as[String])

In Spark 3.0.0 or later one can also apply limit and use from_csv function, but it requires a schema, so it probably won't fit your requirements.

user10938362
  • 3,991
  • 2
  • 12
  • 29
  • The original question doesn't preclude knowing the schema. I think it's worthwhile adding the Spark 3.0.0 solution and mentioning that _if you know the schema_ then you should provide it and not use inferSchema. That would complete this as the correct answer. – Davos Nov 12 '20 at 12:14
  • strangely that does not shows the header. maybe limit does not preserve the order of rows since it is an RDD – Hanan Shteingart Dec 28 '20 at 17:26
0

Since I didn't see that solution in the answers, the pure SQL-approach is working for me:

df = spark.sql("SELECT * FROM csv.`/path/to/file` LIMIT 10000")

If there is no header the columns will be named _c0, _c1, etc. No schema required.

Khris
  • 3,132
  • 3
  • 34
  • 54
0

May be this would be helpful who is working in java. Applying limit will not help to reduce the time. You have to collect the n rows from the file.

        DataFrameReader frameReader = spark
          .read()
          .format("csv")
          .option("inferSchema", "true");
    //set framereader options, delimiters etc

    List<String> dataset = spark.read().textFile(filePath).limit(MAX_FILE_READ_SIZE).collectAsList();
    return frameReader.csv(spark.createDataset(dataset, Encoders.STRING()));
Faizan
  • 289
  • 4
  • 15