Here are the steps
- Use sparkcontext.wholeTextFiles("/path/to/folder/containing/all/files")
- The above returns an RDD where key is the path of the file, and value is the content of the file
- rdd.map(lambda x:x[1]) - this give you an rdd with only file contents
- rdd.map(lambda x: customeFunctionToProcessFileContent(x))
- since map function works in parallel, any operations you do, would be faster and not sequential - as long as your tasks don't depend on each other, which is the main criteria for parallelism
import os
import time
import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import *
# reading all the files to create PairRDD
input_rdd = sc.wholeTextFiles("file:///home/user/datatest/*",2)
#convert RDD to DF
input_df=spark.createDataFrame(input_rdd)
input_df.show(truncate=False)
'''
+---------------------------------------+------------+
|_1 |_2 |
+---------------------------------------+------------+
|file:/home/user/datatest/test.txt |1,2,3 1,2,3|
|file:/home/user/datatest/test.txt1 |4,5,6 6,7,6|
+---------------------------------------+------------+
'''
input_df.select("_2").take(2)
#[Row(_2=u'1,2,3\n1,2,3\n'), Row(_2=u'4,5,6\n6,7,6\n')]
# function to get a creation time of a file
def time_convesion(filename):
return time.ctime(os.path.getmtime(filename.split(":")[1]))
#udf registration
time_convesion_udf = udf(time_convesion, StringType())
#udf apply over the DF
final_df = input_df.withColumn("created_time", time_convesion_udf(input_df['_1']))
final_df.show(2,truncate=False)
'''
+---------------------------------------+------------+------------------------+
|_1 |_2 |created_time |
+---------------------------------------+------------+------------------------+
|file:/home/user/datatest/test.txt |1,2,3 1,2,3|Sat Jul 11 18:31:03 2020|
|file:/home/user/datatest/test.txt1 |4,5,6 6,7,6|Sat Jul 11 18:32:43 2020|
+---------------------------------------+------------+------------------------+
'''
# proceed with the next steps for the implementation
The above works with default partition though. So you might not get input files count equal to output file count(as output is number of partitions).
You can re-partition the RDD based on count or any other unique value based on your data, so you end up with output files count equal to input count. This approach will have only parallelism but will not have the performance achieved with optimal number of partitions