0

How to check programmatically when to use

spark.read.csv(path) 

or

spark.read.parquet(path)

Without user say whether the path contain parquet or text file. The path should be on HDFS.

Haha TTpro
  • 5,137
  • 6
  • 45
  • 71
  • whooowww this is a cool question. i mean, it's complicated.. you should review first the file format.. at least there is scanning or something to tell you which format is the file – Kenry Sanchez Oct 06 '20 at 04:51
  • Your question is highly similar to https://stackoverflow.com/questions/33394884/spark-scala-list-folders-in-directory – Nick Oct 06 '20 at 09:15
  • @Nick, The solution proposed in the link you put is a very good approach – Chema Oct 06 '20 at 09:58

2 Answers2

4

I would have leverage the use of scala Try and try to import the file types one by one using orElse function instead of checking their extension programmatically -

def readCsv(): Try[DataFrame] = ???
def readParquet(): Try[DataFrame] = ???

val dfTry: Try[DataFrame] = readCsv().orElse(readParquet())

You can put readParquet() call first if you have more parquet read requests.

Ishan
  • 996
  • 3
  • 13
  • 34
-2

One way could be running hdfs dfs -ls command and check the output to see if the input dir contains csv or parquet files.

As an example

// This two imports are necessary to run shell commands from Scala
import scala.sys.process._
import scala.language.postfixOps 

// As a little example of how it could be
def getExtension(s: String): String = {
    if(s.contains(".parquet")) "parquet"
    else if(s.contains(".csv")) "csv"
    else if (s.contains(".txt")) "txt"
    else "unknown"
}

val inputDirCsv = "hdfs://quickstart.cloudera:8020/user/cloudera/csv"

val inputDirParquet = "hdfs://quickstart.cloudera:8020/user/cloudera/parquet"

val lsCommand = Seq("hdfs", "dfs", "-ls", inputDirCsv).!!
println(lsCommand)
/*
Found 4 items
-rw-r--r--   1 cloudera supergroup        109 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/EmployeeManager.csv
-rw-r--r--   1 cloudera supergroup       8754 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/amigos.csv
-rw-r--r--   1 cloudera supergroup        142 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/updated_departments.csv
-rw-r--r--   1 cloudera supergroup         79 2020-10-06 06:40 hdfs://quickstart.cloudera:8020/user/cloudera/csv/user.csv
 */
println(getExtension(lsCommand)) // csv

val lsCommand1 = Seq("hdfs", "dfs", "-ls", inputDirParquet).!!
println(lsCommand1)
/*
Found 5 items
-rw-r--r--   3 cloudera supergroup          0 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/_SUCCESS
-rw-r--r--   3 cloudera supergroup        599 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00000-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
-rw-r--r--   3 cloudera supergroup        645 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00001-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
-rw-r--r--   3 cloudera supergroup        586 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00002-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
-rw-r--r--   3 cloudera supergroup        645 2020-04-24 22:28 hdfs://quickstart.cloudera:8020/user/cloudera/parquet/part-00003-ad9007ac-c3a8-45b1-bad3-fb608c759303-c000.snappy.parquet
 */
println(getExtension(lsCommand1)) // parquet

Chema
  • 2,748
  • 2
  • 13
  • 24
  • well this is a cool way, but my concern is if he can call the execution in scala – Kenry Sanchez Oct 06 '20 at 06:05
  • What if the server version of the hdfs cli changed, and the format of the output changed with it? It is much better to use a scala library to do this, and not rely on command line. – Nick Oct 06 '20 at 09:15