1

I have 3 sets of CSV files that are basically a list of double values (with one double value each row) and are split per month:

A: aJan.csv, aFeb.csv, aMarch.csv
B: bJan.csv, bFeb.csv, bMarch.csv
C: cJan.csv, cFeb.csv, cMarch.csv 
D: DJan.csv, DFeb.csv, DMarch.csv

I wanted to calculate all pair Pearson correlation on A,B,C,D. PySpark has a correlation method.

data = sc.parallelize(
    np.array([range(10000), range(10000, 20000),range(20000, 30000)]).transpose()) 
print(Statistics.corr(data, method="pearson")) 

My question is how I could make an 1 RDD from 3 files i.e. aJan.csv, aFeb.csv, aMarch.csv and then similarly for other. I know I could do something as mentioned here:How to read multiple text files into a single RDD? but I wanted the single view in month wise append format i.e first data is from Jan then append Feb.csv and then March.csv.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Kumar Roshan Mehta
  • 3,078
  • 2
  • 27
  • 50

2 Answers2

0

how I could make an 1 RDD from 3 files

Please don't. Given your question it seems you've just started your journey into Spark and you're going to use the low-level RDD API that...is...not...for...you (sorry for the pauses, but wanted to express how much I feel about it).

If you insist...

I think you should start with SparkContext.wholeTextFiles operator.

wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)] Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

That will give you the content of the CSV files and their path. With that just transform the RDD as you want, oder it and...you're done.


Please consider using Spark SQL's Dataset API that gives you spark.read.csv and orderBy and far more. Please do yourself a favour.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • 1
    I used to solve it this way: map the collection of csv onto DataFrames, append month-information, then use SparkContext.unionAll(RDD) to merge them, then use the schema of one of the dataFrames, and apply it to the merged RDD to regain a DataFrame. – Rick Moritz Jul 20 '17 at 18:14
  • If your solution is any better than mine (which sounds it really is), please answer your question with the solution. Much appreciated. – Jacek Laskowski Jul 20 '17 at 19:26
  • @RickMoritz Could you explain little more as a separate answer. It sounds you know exactly what I wanted to do. – Kumar Roshan Mehta Jul 20 '17 at 19:53
  • 1
    @JacekLaskowski done - I just wanted to bounce the general idea around, before committing to writing a fully fledged answer :) – Rick Moritz Jul 21 '17 at 08:40
0

I propose the following approach:

First, obtain a ParallelSet (for optimized scheduling, if you do anything else with the data, before the union below) of your initial data, containing an explicit or implicit mapping of month -> file_for_month.csv

i.e.:

val files = Set (("January","aJan.csv"),("February","aFeb.csv")).par

then you can generate a set of DataFrames like this:

val monthDfs = files.map(
                         month => 
                           spark.read.csv("month._2")
                                .withColum("month", literal(month._1))
                        )

to combine them into a single DataFrame:

spark.createDataFrame(
    spark.sparkContext.union(
         monthDfs.map(_.rdd).toSeq
         ),
    monthDfs.head.schema)

This is a bit hacky, since it uses .rdd().... I had .rdd inexplicably fail during runtime previously -- I could fix it by assigning it to a variable outside the scope of the final mapping. YYMV)

But, Voilà, you have a single DataFrame with a "month" column, containing all your data. If you're scared of .rdd (and you should be), and the number of files isn't in the tens of thousands, then you can also simply use something like this:

files.reduce((a,b) => a.union(b))

These operations are part of the execution graph though, and will increase it in size by the number of elements in files -- eventually causing a slowdown or even crashes observed somewhere in the ~1000 elements range. See: SPARK-15326 "Not a Problem" and Non-Linear-Analysis Cost.

Rick Moritz
  • 1,449
  • 12
  • 25
  • Interesting...where did you get `files` from?! I'd call it... *cheating* :) You *could* assume to have it, but it was not given explicitly. You don't even know if it's doable to have the list (think big data). – Jacek Laskowski Jul 21 '17 at 08:42
  • @JacekLaskowski Well, you get the files from hdfs: ```FileSystem.status```. Then you can parse the filenames to extract the month. But that's mostly housekeeping. – Rick Moritz Jul 21 '17 at 08:59