1

What is the correct spark|scala technique to parse multi-line logfile entries? The SQL trace textfile :

# createStatement call (thread 132053, con-id 422996) at 2015-07-24 12:39:47.076339
# con info [con-id 422996, tx-id 47, cl-pid 50593, cl-ip 10.32.50.24, user: SAPABA, schema: SAPABA]
cursor_140481797152768_c22996 = con_c22996.cursor()

# begin PreparedStatement_execute (thread 132053, con-id 422996) at 2015-07-24 12:39:47.076422
# con info [con-id 422996, tx-id 47, cl-pid 50593, cl-ip 10.32.50.24, user: SAPABA, schema: SAPABA]
cursor_140481797152768_c22996.execute("SELECT DISTINCT  blah blah blah")
# end PreparedStatement_execute (thread 132053, con-id 422996) at 2015-07-24 12:39:47.077706

Each record consists of three lines; the attributes for each record type (for example createStatement and PreparedStatement) differ. I want to read the file line by line, determine the record type, and then create a dataframe row for each record:

Example:

insert into prepared_statements values (132053,422996, '2015-07-24 12:39:47.076422','SELECT DISTINCT  blah blah blah')

To achieve this I need to examine each line to determine which record type it is, and then read the next two lines to get the attributes for that record type. In addition, the line formats differ, depending on the record, so I need to conditionally examine the start of each block of three lines, to determine the record type. Is there a spark technique to parse multi-line records?

haihui
  • 1,075
  • 1
  • 18
  • 25
MarkTeehan
  • 303
  • 7
  • 18

1 Answers1

3

Here's a working solution, based on matching each line with the next empty-line's index, and then grouping by these indices to group the lines of each "logical record" together. Assuming the input is in rdd:

val indexedRows: RDD[(String, Long)] = rdd.zipWithIndex().cache()
val emptyRowIndices = indexedRows.filter(_._1.isEmpty).values.collect().sorted

val withIndexOfNextGap: RDD[(String, Long)] = indexedRows
  .filter(!_._1.isEmpty)
  .mapValues(i => emptyRowIndices.find(_ > i).getOrElse(0)) // finds lowest index of empty line greater than current line index

val logicalRecords: RDD[Iterable[String]] = withIndexOfNextGap.map(_.swap).groupByKey().values

logicalRecords.map(f) // f maps each Iterable[String] into whatever you need

NOTE that this solution has a few caveats:

  • It assumes that the number of "logical records" (multi-line entries) isn't too great to collect their indices to driver memory
  • It's not super efficient as we'll scan these indices per line
Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85