0

I have a CSV string which is an RDD and I need to convert it in to a spark DataFrame.

I will explain the problem from beginning.

I have this directory structure.

 Csv_files (dir)
    |- A.csv
    |- B.csv
    |- C.csv

All I have is access to Csv_files.zip, which is in a hdfs storage.

I could have directly read if each file was stored as A.gz, B.gz ... But it I have files within a directory which is compressed.

With the help of an answer on SO (How to open/stream .zip files through Spark?), I was able to convert this zip file in to a dictionary.

d = {
    'A.csv':'A,B,C\n1,2,3\n4,5,6 ...'
    'B.csv':'A,B,C\n7,8,9\n1,2,3 ...'
}

Now I should convert this csv_string 'A,B,C\n1,2,3\n4,5,6 ...' to a dataframe. I tried this,

How can I efficiently convert csv_string to a meaningful dataframe ?

My Spark version is 1.6.2 and python 2.6.6.

MaFF
  • 9,551
  • 2
  • 32
  • 41
GeekFactory
  • 399
  • 2
  • 13
  • 1
    That would have been trivial with spark 2.x using DataFrameReader and SparkSession – Michel Lemay Aug 24 '17 at 12:05
  • 1
    I also don't understand why you use Spark 1.6 instead of 2.x. And the sentences *"But now A.csv I am reading from a zip file present in HDFS. So the content I extract is a string."* are totally confusing. – UninformedUser Aug 24 '17 at 12:06
  • Indeed, why not use the built-in support for reading compressed files in spark? Usually spark can read compressed gzip/bzip/etc files like if it were raw text files. So a simple sc.textFile('path to your file.csv.gzip') would yield a RDD[string] that you can use to create your dataframe. – Michel Lemay Aug 24 '17 at 12:10
  • @AKSW: I have improved the question. Please have look at it again. – GeekFactory Aug 28 '17 at 05:36
  • @Michel Lemay: The file is not compressed. File is present in a compressed directory.(Added directory structure) And unfortunately I cannot upgrade Spark. – GeekFactory Aug 28 '17 at 05:36
  • 1
    Why don't you unzip your directory first ? Note that zip and gunzip are not distributed compressed format so it takes longer for spark to read from a zipped file than from a flat file. – MaFF Aug 28 '17 at 13:20

1 Answers1

1

You first have to split your dicts according to some csv compliant rules. For the example here, I will only use a split with newlines but you should pay attention to newlines inside values (spark 2.2 supports multilines csv records)

(scala code)

// original data as shown in the example
val d: Map[String, RDD[String]] = ...

// flatmap lines
val newRDDs: List[RDD[String]] = d.map(curRDD => {
    // Split csv into multiple lines and drop header
    curRDD._2.flatMap(_.split('\n').drop(1))
})

// Beware, this can be extremely costly if you have too many rdds.
val unionAll: RDD[String] = sc.union(newRDDs)

// Finally, create df from rows.
// In spark 2.2, you would do something like spark.read.csv(spark.createDataset(unionAll))
// In spark < 2.x, you need to parse manually to classes (or Row) and then sqlContext.createDataFrame(parsedRows)

NB: Code above has not been compile/tested and is here only to illustrate the idea.

Michel Lemay
  • 2,054
  • 2
  • 17
  • 34