I am kind of new in spark and i have a requirement where i am required to read from different part folders and then merge them all together to create a single df based on a passed schema. it is something like this
/feed=abc -> contains multiple part folders based on date like below
/feed=abc/date=20221220
/feed=abc/date=20221221
.....
/feed=abc/date=20221231
Each part folder can have multiple part files. All the files are in parquet format but the schema across two different part folders may vary either in the number of cols or in datatype. So my approach is
1 - create an empty final_df based on the schema passed 2 - Iterate over the list of part folders using the below code
hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path(inp_fl_loc)
for f in fs.get(conf).listStatus(path):
path2 = str(hadoop.fs.Path(str(f.getPath())))
if(f.isDirectory()):
path2= path2 + "/"
print("the inp_path is ",str(path2))
#splitting the individual name to get the corresponding partition col name and value
temp_path = path2.split("/")[-2]
part_col,part_val = temp_path.split("=")[0],temp_path.split("=")[1]
elif('_' in path2.split("/")[-1]):
continue
#reading the file
df =
spark.read.format(inp_fl_frmt).option("mergeSchema","true").load(str(path2))
#other operation follows :-
3 - Once a particular part folder is read, comparing the schema of the read_df with that of the final_df and selecting only the req cols and if required typecasting the req col of the read_df based on the final_df schema. Note in this process i might have to type cast a sub-col within a struct type variable as well. For that i am actually expanding the struct variables into new cols, type casting them and then again converting them back in the original structure. 4 - Unioning the typecasted read_df with the final_df. 5 - Repeat steps 3-4 for all the part folders ultimately giving me the final final_df
The thing is in the presence of large data (in one of my feed i am reading 340 part folders totalling around 13000 files close to around 7GB in total) the job is running for a large amount of time (7hrs+ in the above case). Since i am working on a shared cluster i dont have the exact details of the number of nodes and the number of cores and following the standard configuration used in our team...but seems like that is not enough. The above details are not yet handy but i am trying to get those but i am more concerned if any tuning is possible from the code perspective. Few questions that i have in mind :-
- Since i am using the loop to read each part folder one by one i think the reading is happening serially rather than parallelizing the operation. Is it possible to read the different part folders parallely. I tried reduce operation but that isn't working properly.
- Post the union of read-df with the empty df i am caching the empty_df so that in the next union operation the empty_df is not recalculated. But that doesn't seem to help in perf. Shouldn't i cache the empty-df ?
Any help regarding this is much appreciated.