3

I can successfully parse XML data dropped into a directory by using the Spark streaming fileStream method, and I can write the resulting RDDs out to a text file just fine:

val fStream = {
  ssc.fileStream[LongWritable, Text, XmlInputFormat](
    WATCHDIR, xmlFilter _, newFilesOnly = false, conf = hadoopConf)
}


fStream.foreachRDD(rdd =>
  if (rdd.count() == 0) {
    logger.info("No files..")
  })

val dStream = fStream.map{ case(x, y) =>
  logger.info("Hello from the dStream")
  logger.info(y.toString)
  scalaxb.fromXML[Music](scala.xml.XML.loadString(y.toString))
}

dStream.foreachRDD(rdd => rdd.saveAsTextFile("file:///tmp/xmlout"))

The trouble is when I want to convert the RDDs to DataFrames in order to either register them as a temp table or saveAsParquetFile.

This code:

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
dStream.foreachRDD(rdd => rdd.distinct().toDF().printSchema())

Results in this error:

java.lang.UnsupportedOperationException: Schema for type scalaxb.DataRecord[scala.Any] is not supported

I would have thought that since scalaxb generates case classes for my records, and that it would be simple for Spark to infer using reflection, and I see this is what it's trying to do, except Spark doesn't support the scalaxb.DataRecord type. Are there any Spark or Scalaxb experts who have any ideas on how to make the case classes generated by Scalaxb compatible with Spark?

BTW, here are the generated classes from scalaxb:

package generated

case class Song(attributes: Map[String, scalaxb.DataRecord[Any]] = Map()) {
  lazy val title = attributes.get("@title") map { _.as[String] }
  lazy val length = attributes.get("@length") map { _.as[String] }
}

case class Album(song: Seq[generated.Song] = Nil,
  description: String,
  attributes: Map[String, scalaxb.DataRecord[Any]] = Map()) {
  lazy val title = attributes.get("@title") map { _.as[String] }
}

case class Artist(album: Seq[generated.Album] = Nil,
  attributes: Map[String, scalaxb.DataRecord[Any]] = Map()) {
  lazy val name = attributes.get("@name") map { _.as[String] }
}

case class Music(artist: Seq[generated.Artist] = Nil)
Chris Matta
  • 3,263
  • 3
  • 35
  • 48
  • I know this was like almost 4 years ago....dealing with the same issue and wonder if you every got this to work and how – KipLove Aug 06 '19 at 22:21
  • @KipLove, I'm sure if I ever got this working, it's long ago now that I don't really remember. You might want to ask a new question and reference this one. – Chris Matta Aug 08 '19 at 18:31

0 Answers0