2

Let's suppose we have 2 files, file#1 created at 12:55 and file#2 created at 12:58. While reading these two files I want to add a new column "creation_time". Rows belong to file#1 have 12:55 in "creation_time" column and Rows belong to file#2 have 12:58 in "creation_time".

new_data = spark.read.option("header", "true").csv("s3://bucket7838-1/input")

I'm using above code snippet to read the files in "input" directory.

Abdul Haseeb
  • 442
  • 4
  • 22

2 Answers2

7

Use input_file_name() function to get the filename and then use hdfs file api to get the file timestamp finally join both dataframes on filename.

Example:

from pyspark.sql.types import *
from pyspark.sql.functions import *
URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

fs = FileSystem.get(URI("hdfs://<namenode_address>:8020"), Configuration())

status = fs.listStatus(Path('<hdfs_directory>'))

filestatus_df=spark.createDataFrame([[str(i.getPath()),i.getModificationTime()/1000] for i in status],["filename","modified_time"]).\
withColumn("modified_time",to_timestamp(col("modified_time")))

input_df=spark.read.csv("<hdfs_directory>").\
withColumn("filename",input_file_name())

#join both dataframes on filename to get filetimestamp
df=input_df.join(filestatus_df,['filename'],"left")
notNull
  • 30,258
  • 4
  • 35
  • 50
2

Here are the steps

  1. Use sparkcontext.wholeTextFiles("/path/to/folder/containing/all/files")
  2. The above returns an RDD where key is the path of the file, and value is the content of the file
  3. rdd.map(lambda x:x[1]) - this give you an rdd with only file contents
  4. rdd.map(lambda x: customeFunctionToProcessFileContent(x))
  5. since map function works in parallel, any operations you do, would be faster and not sequential - as long as your tasks don't depend on each other, which is the main criteria for parallelism
import os
import time

import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import *

# reading all the files to create PairRDD 
input_rdd = sc.wholeTextFiles("file:///home/user/datatest/*",2)

#convert RDD to DF

input_df=spark.createDataFrame(input_rdd)

input_df.show(truncate=False)
'''
+---------------------------------------+------------+
|_1                                     |_2          |
+---------------------------------------+------------+
|file:/home/user/datatest/test.txt      |1,2,3  1,2,3|
|file:/home/user/datatest/test.txt1     |4,5,6  6,7,6|
+---------------------------------------+------------+
'''
input_df.select("_2").take(2)
#[Row(_2=u'1,2,3\n1,2,3\n'), Row(_2=u'4,5,6\n6,7,6\n')]


# function to get a creation time of a file
def time_convesion(filename):
    return time.ctime(os.path.getmtime(filename.split(":")[1]))

#udf registration
time_convesion_udf = udf(time_convesion, StringType())

#udf apply over the DF
final_df = input_df.withColumn("created_time", time_convesion_udf(input_df['_1']))

final_df.show(2,truncate=False)
'''
+---------------------------------------+------------+------------------------+
|_1                                     |_2          |created_time            |
+---------------------------------------+------------+------------------------+
|file:/home/user/datatest/test.txt      |1,2,3  1,2,3|Sat Jul 11 18:31:03 2020|
|file:/home/user/datatest/test.txt1     |4,5,6  6,7,6|Sat Jul 11 18:32:43 2020|
+---------------------------------------+------------+------------------------+
'''
# proceed with the next steps for the implementation

The above works with default partition though. So you might not get input files count equal to output file count(as output is number of partitions).

You can re-partition the RDD based on count or any other unique value based on your data, so you end up with output files count equal to input count. This approach will have only parallelism but will not have the performance achieved with optimal number of partitions

sathya
  • 1,982
  • 1
  • 20
  • 37
  • Thanks, I will look into this solution. Is there any possible solution using dataframes? – Abdul Haseeb Jul 11 '20 at 09:57
  • and in the above solution where i'm getting creation time of file ? – Abdul Haseeb Jul 11 '20 at 10:02
  • write an UDF for getting a creation time for each row you process. you can process files one by one and pass it to UDF and spark load it too. https://stackoverflow.com/questions/9679344/how-can-i-get-last-modified-datetime-of-s3-objects-with-boto OR the below code is for reading it from NAS ```import os path = '/home/user/datatest/' files = [] # r=root, d=directories, f = files for r, d, f in os.walk(path): for file in f: if '.txt' in file: files.append(os.path.join(r, file)) for f in files: print(f) time.ctime(os.path.getmtime(f)) ``` – sathya Jul 11 '20 at 10:03
  • spark.read.option("header", "true").csv("s3://bucket7838-1/input") this reads all the files from the directory, who will I know that which row belongs to which file ? – Abdul Haseeb Jul 11 '20 at 10:06
  • You want me to read files one by one and getting theri metadata using boto3 ? – Abdul Haseeb Jul 11 '20 at 10:07
  • Yes Abdul, If your data size is small then you can use this one by one read approach(I used to do it for near real time spark streaming jobs). but If you have huge volume of data then you can find the approach in the above answer. – sathya Jul 11 '20 at 10:10
  • Actually I have data in TBs to process – Abdul Haseeb Jul 11 '20 at 10:11
  • Thanks sathiyara, Can you provide some links which can give me some help in implementing above solution? – Abdul Haseeb Jul 11 '20 at 10:13
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/217665/discussion-between-abdul-haseeb-and-sathiyarajan). – Abdul Haseeb Jul 11 '20 at 18:45
  • Add the end we have three columns, one is text file, 2nd is data and 3rd is creation_data. I want output like this column_1, column_2, column_3, creation_date 1 2 3 Sat Jul 11 18:31:03 2020 1 2 3 Sat Jul 11 18:31:03 2020 4 5 6 Sat Jul 11 18:32:43 2020 4 5 6 Sat Jul 11 18:32:43 2020 – Abdul Haseeb Jul 11 '20 at 18:48