0

A directory in local contains 1000s of log files generated by another application (which runs one time a day). Using Scala I am able to pick the latest files (files generated on the same day) and move them to HDFS. To do that, I have come up with the following code:

    val spark = SparkSession.builder.master("yarn").appName("AutoCheck").enableHiveSupport().getOrCreate()
import spark.implicits._

val t = (x:Long) => { new SimpleDateFormat("yyyy-MM-dd").format(x)}
def getFileTree(f: File): Stream[File] =
  f #:: (if (f.isDirectory) f.listFiles().toStream.flatMap(getFileTree)
  else Stream.empty)

val simpDate = new java.text.SimpleDateFormat("yyyy-MM-dd")
val currDate = simpDate.format(new java.util.Date())
val now = Instant.now                                                                           // Gets current date in the format: 2017-12-13T09:40:29.920Z
val today = now.toEpochMilli
val yesterday = now.minus(Duration.ofDays(1))
val yesterdayMilliSec = yesterday.toEpochMilli
val todaySimpDate = t(today)
val yesterdaySimpDate = t(yesterdayMilliSec)
val local:String = "file://"
val folders = getFileTree(new File("/tmp/hive_audits/")).filterNot(_.getName.endsWith(".log"))  // Gets the date of folder
val folderCrtDateDesc = folders.toList.map(y => (y,y.lastModified)).sortBy(-_._2)
val latestFolder = folderCrtDateDesc.map(y=>(y._1,t(y._2)))
val folderToday = latestFolder.filter(y => y._2==todaySimpDate)
val localFiles = getFileTree(new File("/tmp/hive_audits/")).filter(_.getName.endsWith(".log"))
val fileCrtDateDesc = localFiles.toList.map(y => (y,y.lastModified)).sortBy(-_._2)
val latestFiles = fileCrtDateDesc.toList.map(y => (y._1,t(y._2)))
val filesToday = latestFiles.filter(y => y._2==todaySimpDate)

val localFileNames = filesToday.map(y => local+y._1)
val fileName = localFileNames(2).split("/")(6)
val hadoopConf = new Configuration()
val hdfs = FileSystem.get(hadoopConf)
val localPath = new Path(localFileNames(2))
val hdfsPath = new Path(s"hdfs://devusr/user/etllogs/dailylogs/${fileName}")
hdfs.copyFromLocalFile(localPath,hdfsPath)
val fileDF = spark.read.text("/user/fdlhdpetl/dailylogs")

Using the above code, I can copy all the files from local to HDFS. Each file in the directory contains a status message and there are three types of status: "Error", "Failure", "Success". I need to open the each log file and read the content in it and take further action. AFter loading the files into HDFS, I had loaded the directory: "user/etllogs/dailylogs" in spark to make it as a single dataframe which contains all the files. The data in the files of three status can be seen below:

JobID: 454
[Wed Dec 27 05:38:47 UTC 2017] INFO: Starting Auditing for : baseTable1
[Wed Dec 27 05:38:49 UTC 2017] SEVERE: Error while compiling statement: FAILED: SemanticException [Error 10004]: Line 1:261 Invalid table alias or column
[Wed Dec 27 05:38:49 UTC 2017] INFO:
Completed Auditing for : baseTable1
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...
JobID: 455
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable2
[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 16
[Wed Dec 27 05:38:20 UTC 2017] INFO: Success
Completed Auditing for : baseTable2
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...
JobID: 547
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable3
[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 5
[Wed Dec 27 05:38:20 UTC 2017] INFO: Failed. Invalid data found.
Completed Auditing for : baseTable3
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...

Each file in the dataframe starts with the line: JobID and ends with: INFO: Updating the job keeper...

I have data of all the files in a single dataframe(fileDF). Now I am unable to come up with a solution of how to read the data from it.

Is there a way I can split the dataframe into three different dataframes (one for 'Error', 'Success', 'Failure' or Is there any better way where I can create three different dataframes for each status without loading all of them into a single Dataframe. Another case would be reading the files using Scala and then prepare the files with corresponding content and then load them into Spark to take further action.

Could anyone let me know the efficient way of dealing this situation.

Metadata
  • 2,127
  • 9
  • 56
  • 127

1 Answers1

1

It would be difficult for you to separate all the group of log lines from JobID until INFO: Updating the job keeper... once you have created a dataframe (in your case fileDF) just by looking at strings like Success, Failed or Error as each lines of your log files would be treated as each rows in the dataframe .

What I would suggest you to use sparkContext's wholeTextFiles api and read the files as rdd and manipulate the rdds so that each log block from JobID until INFO: Updating the job keeper... would be reduced to a line

val rdd = sparkContext.wholeTextFiles("/user/fdlhdpetl/dailylogs")
              .flatMap(x => x._2.replace("\n", "#[#").split("JobID:"))

Next step would be separate the lines into three rdds using filter

rdd.filter(x => x.contains("Success"))
rdd.filter(x => x.contains("Failed."))
rdd.filter(x => x.contains("Error"))

Then you would have to create schema for the dataframe to be generated and convert the rdd to RDD[Row] and use the schema to create three dataframes

val simpleSchema = StructType(Seq(StructField("column", StringType)))
val successDF = sqlContext.createDataFrame(rdd.filter(x => x.contains("Success")).map(x => Row.fromSeq(Seq("JobID:"+x.replace("...#[#", "...").replace("#[#", "\n")))), simpleSchema)
val failedDF = sqlContext.createDataFrame(rdd.filter(x => x.contains("Failed.")).map(x => Row.fromSeq(Seq("JobID:"+x.replace("...#[#", "...").replace("#[#", "\n")))), simpleSchema)
val errorDF = sqlContext.createDataFrame(rdd.filter(x => x.contains("Error")).map(x => Row.fromSeq(Seq("JobID:"+x.replace("...#[#", "...").replace("#[#", "\n")))), simpleSchema)

Thus you have three dataframes. I hope the answer is helpful

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97