First of all: credits. This code is based on the solution from here: Use Scala parser combinator to parse CSV files
The CSV files I want to parse can have comments, lines starting with #. And to avoid confusion: The CSV files are tabulator-separated. There are more constraints which would make the parser a lot easier, but since I am completly new to Scala I thought it would be best to stay as close to the (working) original as possible.
The problem I have is that I get a type mismatch. Obviously the regex for a comment does not yield a list. I was hoping that Scala would interpret a comment as a 1-element-list, but this is not the case.
So how would I need to modify my code that I can handle this comment lines? And closly related: Is there an elegant way to query the parser result so I can write in myfunc something like
if (isComment(a)) continue
So here is the actual code:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.parsing.combinator._
object MyParser extends RegexParsers {
override val skipWhitespace = false // meaningful spaces in CSV
def COMMA = ","
def TAB = "\t"
def DQUOTE = "\""
def HASHTAG = "#"
def DQUOTE2 = "\"\"" ^^ { case _ => "\"" } // combine 2 dquotes into 1
def CRLF = "\r\n" | "\n"
def TXT = "[^\",\r\n]".r
def SPACES = "[ ]+".r
def file: Parser[List[List[String]]] = repsep((comment|record), CRLF) <~ (CRLF?)
def comment: Parser[List[String]] = HASHTAG<~TXT
def record: Parser[List[String]] = "[^#]".r<~repsep(field, TAB)
def field: Parser[String] = escaped|nonescaped
def escaped: Parser[String] = {
((SPACES?)~>DQUOTE~>((TXT|COMMA|CRLF|DQUOTE2)*)<~DQUOTE<~(SPACES?)) ^^ {
case ls => ls.mkString("")
}
}
def nonescaped: Parser[String] = (TXT*) ^^ { case ls => ls.mkString("") }
def applyParser(s: String) = parseAll(file, s) match {
case Success(res, _) => res
case e => throw new Exception(e.toString)
}
def myfunc( a: (String, String)) = {
val parserResult = applyParser(a._2)
println("APPLY PARSER FOR " + a._1)
for( a <- parserResult ){
a.foreach { println }
}
}
def main(args: Array[String]) {
val filesPath = "/home/user/test/*.txt"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.wholeTextFiles(filesPath).cache()
logData.foreach( x => myfunc(x))
}
}