I can successfully parse XML data dropped into a directory by using the Spark streaming fileStream
method, and I can write the resulting RDDs out to a text file just fine:
val fStream = {
ssc.fileStream[LongWritable, Text, XmlInputFormat](
WATCHDIR, xmlFilter _, newFilesOnly = false, conf = hadoopConf)
}
fStream.foreachRDD(rdd =>
if (rdd.count() == 0) {
logger.info("No files..")
})
val dStream = fStream.map{ case(x, y) =>
logger.info("Hello from the dStream")
logger.info(y.toString)
scalaxb.fromXML[Music](scala.xml.XML.loadString(y.toString))
}
dStream.foreachRDD(rdd => rdd.saveAsTextFile("file:///tmp/xmlout"))
The trouble is when I want to convert the RDDs to DataFrames in order to either register them as a temp table or saveAsParquetFile
.
This code:
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
dStream.foreachRDD(rdd => rdd.distinct().toDF().printSchema())
Results in this error:
java.lang.UnsupportedOperationException: Schema for type scalaxb.DataRecord[scala.Any] is not supported
I would have thought that since scalaxb
generates case classes for my records, and that it would be simple for Spark to infer using reflection, and I see this is what it's trying to do, except Spark doesn't support the scalaxb.DataRecord
type. Are there any Spark or Scalaxb experts who have any ideas on how to make the case classes generated by Scalaxb compatible with Spark?
BTW, here are the generated classes from scalaxb:
package generated
case class Song(attributes: Map[String, scalaxb.DataRecord[Any]] = Map()) {
lazy val title = attributes.get("@title") map { _.as[String] }
lazy val length = attributes.get("@length") map { _.as[String] }
}
case class Album(song: Seq[generated.Song] = Nil,
description: String,
attributes: Map[String, scalaxb.DataRecord[Any]] = Map()) {
lazy val title = attributes.get("@title") map { _.as[String] }
}
case class Artist(album: Seq[generated.Album] = Nil,
attributes: Map[String, scalaxb.DataRecord[Any]] = Map()) {
lazy val name = attributes.get("@name") map { _.as[String] }
}
case class Music(artist: Seq[generated.Artist] = Nil)