1

I have a large number of parquet files in a directory that represents different tables of the same data schema and I want to merge them together into one big RDD. Ideally, I would like to do a map reduce where the mapper emits small RDD's and the reducer merges them. However, I couldnt figure out how to emit RDD's within a mapper. Any ideas?

The first line below generates the list of files in the directory and the second line should generate the complete RDD. However, it gives an unable to serialize error since I dont think you can create a RDD within a map instance.

arr = map(lambda x: ["/mnt/s3/rds/27jul2017-parquet/%s-%s-%s.parquet" % (x[0], x[1], x[2]), x[1].zfill(10), x[2].zfill(10)], map(lambda x: x.name.split('.')[0].split('-'), dbutils.fs.ls('/mnt/s3/rds/27jul2017-parquet/')))   
result = sorted(arr, key=lambda x: x[1])      
sc.parallelize(arr).map(lambda x: (1, spark.read.parquet(x[0]))).reduceByKey(lambda x,y: x.unionAll(y) )
SriK
  • 1,011
  • 1
  • 15
  • 29
  • https://stackoverflow.com/a/37257709/647053 – Ram Ghadiyaram Sep 06 '17 at 15:20
  • 1
    Possible duplicate of [Reading parquet files from multiple directories in Pyspark](https://stackoverflow.com/questions/37257111/reading-parquet-files-from-multiple-directories-in-pyspark) – Ram Ghadiyaram Sep 06 '17 at 15:21
  • also look at https://stackoverflow.com/questions/37257111/reading-parquet-files-from-multiple-directories-in-pyspark – Ram Ghadiyaram Sep 06 '17 at 15:30
  • 1
    Can you read multiple parquet files generated manually from different tables of the same schema and read them in like that? i will try – SriK Sep 06 '17 at 17:38
  • Yes, that worked. Looks like having multiple parquet files in one directory can be merged into a spark dataframe as long as it has the same schema – SriK Sep 07 '17 at 01:05

1 Answers1

2

Instead of specifying a file in spark.read.parquet specify the directory, you'll get a dataframe (not an RDD) containing all the data:

df = spark.read.parquet("/mnt/s3/rds/27jul2017-parquet/")

map iterates through the rows of your RDD to operate changes, it cannot load files if it could you'd end up with an RDD whose rows are dataframes...

MaFF
  • 9,551
  • 2
  • 32
  • 41