A directory in local contains 1000s of log files generated by another application (which runs one time a day). Using Scala I am able to pick the latest files (files generated on the same day) and move them to HDFS. To do that, I have come up with the following code:
val spark = SparkSession.builder.master("yarn").appName("AutoCheck").enableHiveSupport().getOrCreate()
import spark.implicits._
val t = (x:Long) => { new SimpleDateFormat("yyyy-MM-dd").format(x)}
def getFileTree(f: File): Stream[File] =
f #:: (if (f.isDirectory) f.listFiles().toStream.flatMap(getFileTree)
else Stream.empty)
val simpDate = new java.text.SimpleDateFormat("yyyy-MM-dd")
val currDate = simpDate.format(new java.util.Date())
val now = Instant.now // Gets current date in the format: 2017-12-13T09:40:29.920Z
val today = now.toEpochMilli
val yesterday = now.minus(Duration.ofDays(1))
val yesterdayMilliSec = yesterday.toEpochMilli
val todaySimpDate = t(today)
val yesterdaySimpDate = t(yesterdayMilliSec)
val local:String = "file://"
val folders = getFileTree(new File("/tmp/hive_audits/")).filterNot(_.getName.endsWith(".log")) // Gets the date of folder
val folderCrtDateDesc = folders.toList.map(y => (y,y.lastModified)).sortBy(-_._2)
val latestFolder = folderCrtDateDesc.map(y=>(y._1,t(y._2)))
val folderToday = latestFolder.filter(y => y._2==todaySimpDate)
val localFiles = getFileTree(new File("/tmp/hive_audits/")).filter(_.getName.endsWith(".log"))
val fileCrtDateDesc = localFiles.toList.map(y => (y,y.lastModified)).sortBy(-_._2)
val latestFiles = fileCrtDateDesc.toList.map(y => (y._1,t(y._2)))
val filesToday = latestFiles.filter(y => y._2==todaySimpDate)
val localFileNames = filesToday.map(y => local+y._1)
val fileName = localFileNames(2).split("/")(6)
val hadoopConf = new Configuration()
val hdfs = FileSystem.get(hadoopConf)
val localPath = new Path(localFileNames(2))
val hdfsPath = new Path(s"hdfs://devusr/user/etllogs/dailylogs/${fileName}")
hdfs.copyFromLocalFile(localPath,hdfsPath)
val fileDF = spark.read.text("/user/fdlhdpetl/dailylogs")
Using the above code, I can copy all the files from local to HDFS. Each file in the directory contains a status message and there are three types of status: "Error", "Failure", "Success". I need to open the each log file and read the content in it and take further action. AFter loading the files into HDFS, I had loaded the directory: "user/etllogs/dailylogs" in spark to make it as a single dataframe which contains all the files. The data in the files of three status can be seen below:
JobID: 454
[Wed Dec 27 05:38:47 UTC 2017] INFO: Starting Auditing for : baseTable1
[Wed Dec 27 05:38:49 UTC 2017] SEVERE: Error while compiling statement: FAILED: SemanticException [Error 10004]: Line 1:261 Invalid table alias or column
[Wed Dec 27 05:38:49 UTC 2017] INFO:
Completed Auditing for : baseTable1
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...
JobID: 455
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable2
[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 16
[Wed Dec 27 05:38:20 UTC 2017] INFO: Success
Completed Auditing for : baseTable2
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...
JobID: 547
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable3
[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 5
[Wed Dec 27 05:38:20 UTC 2017] INFO: Failed. Invalid data found.
Completed Auditing for : baseTable3
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...
Each file in the dataframe starts with the line: JobID and ends with: INFO: Updating the job keeper...
I have data of all the files in a single dataframe(fileDF). Now I am unable to come up with a solution of how to read the data from it.
Is there a way I can split the dataframe into three different dataframes (one for 'Error', 'Success', 'Failure' or Is there any better way where I can create three different dataframes for each status without loading all of them into a single Dataframe. Another case would be reading the files using Scala and then prepare the files with corresponding content and then load them into Spark to take further action.
Could anyone let me know the efficient way of dealing this situation.