2

I have complex XML data (it might contain a lot of data and could have more than 15GB) which has a sophisticated nature with a deep structure. We need stream processing for our huge XML. Using the new Alpakka library is our first choice since it is a promising solution.

There are outdated threads on scala-xml serialization and other Scala libraries, but we need to process huge amounts of XML as event streams.

In order to simplify things let's consider that we have a PurchaseOrder (XML comes from this page).

<?xml version="1.0"?>  
<PurchaseOrder PurchaseOrderNumber="99503" OrderDate="1999-10-20">  
  <Address Type="Shipping">  
    <Name>Ellen Adams</Name>  
    <Street>123 Maple Street</Street>  
    <City>Mill Valley</City>  
    <State>CA</State>  
    <Zip>10999</Zip>  
    <Country>USA</Country>  
  </Address>  
  <Address Type="Billing">  
    <Name>Tai Yee</Name>  
    <Street>8 Oak Avenue</Street>  
    <City>Old Town</City>  
    <State>PA</State>  
    <Zip>95819</Zip>  
    <Country>USA</Country>  
  </Address>  
  <DeliveryNotes>Please leave packages in shed by driveway.</DeliveryNotes>  
  <Items>  
    <Item PartNumber="872-AA">  
      <ProductName>Lawnmower</ProductName>  
      <Quantity>1</Quantity>  
      <USPrice>148.95</USPrice>  
      <Comment>Confirm this is electric</Comment>  
    </Item>  
    <Item PartNumber="926-AA">  
      <ProductName>Baby Monitor</ProductName>  
      <Quantity>2</Quantity>  
      <USPrice>39.98</USPrice>  
      <ShipDate>1999-05-21</ShipDate>  
    </Item>  
  </Items>  
</PurchaseOrder>  

I am trying to stream all Item's from XML and deserialize them. Be aware of fact that, the same tags might appear on the different levels. Moreover, the elements/attributes inside Item can appear in arbitrary order. An approach, which I see (mostly based on Alpakka's XmlProcessingTest - does anyone can suggest better references?), might look like the following:

import akka.actor.ActorSystem
import akka.stream.alpakka.xml.scaladsl.XmlParsing
import akka.stream.alpakka.xml.{Characters, EndElement, ParseEvent, StartElement}
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, IOResult}
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import akka.util.ByteString

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.xml.{EndElement, ParseEvent, StartElement}

import scala.collection.mutable

trait Builder[T] {
  def build(): T
}

case class Item(partNumber: String)

object Item {
  def apply(builder: ItemBuilder,
            path: mutable.Stack[String]): PartialFunction[ParseEvent, Unit] = {
    case elem @ StartElement("Item", _, _, _, _) =>
      val partNumber = elem.findAttribute("PartNumber").map(_.value).getOrElse("")
      path.push(s"Item")
      builder.partNumber = partNumber
    case EndElement("Item") =>
      path.pop()
  }
}

class ItemBuilder() extends Builder[Item] {
  var partNumber = ""

  override def build(): Item =
    Item(
      partNumber = partNumber
    )

  def reset(): Unit = {
    partNumber = ""
  }
}


implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()

val path: mutable.Stack[String] = new mutable.Stack[String]()

val xml =
  """<PurchaseOrder PurchaseOrderNumber="99503" OrderDate="1999-10-20">
    |<Address Type="Shipping">
    |  <Name>Ellen Adams</Name>
    |  <Street>123 Maple Street</Street>
    |  <City>Mill Valley</City>
    |  <State>CA</State>
    |  <Zip>10999</Zip>
    |  <Country>USA</Country>
    |</Address>
    |<Address Type="Billing">
    |  <Name>Tai Yee</Name>
    |  <Street>8 Oak Avenue</Street>
    |  <City>Old Town</City>
    |  <State>PA</State>
    |  <Zip>95819</Zip>
    |  <Country>USA</Country>
    |</Address>
    |<DeliveryNotes>Please leave packages in shed by driveway.</DeliveryNotes>
    |<Items>
    |  <Item PartNumber="872-AA">
    |    <ProductName>Lawnmower</ProductName>
    |    <Quantity>1</Quantity>
    |    <USPrice>148.95</USPrice>
    |    <Comment>Confirm this is electric</Comment>
    |  </Item>
    |  <Item PartNumber="926-AA">
    |    <ProductName>Baby Monitor</ProductName>
    |    <Quantity>2</Quantity>
    |    <USPrice>39.98</USPrice>
    |    <ShipDate>1999-05-21</ShipDate>
    |  </Item>
    |</Items>
    |</PurchaseOrder>""".stripMargin

val documentStream = Source.single(xml)

val builder = new ItemBuilder()

val default: PartialFunction[ParseEvent, Unit] = {
  case Characters(any) =>
  case StartElement(localName, _, _, _, _) =>
    path.push(localName)
  case EndElement(localName) =>
    path.pop()
  case any =>
}

val handle: PartialFunction[ParseEvent, Unit] = Item(builder, path) orElse
  default

val source: Source[Item, akka.NotUsed] = documentStream
  .map(ByteString(_))
  .via(XmlParsing.parser)
  .splitWhen(_ match {
    case StartElement("Item", _, _, _, _) =>
      true
    case _ =>
      false
  })
  .fold[ItemBuilder](new ItemBuilder()) {
  case (_, parseEvent) =>
    handle(parseEvent)
    builder
}
  .map { builder: ItemBuilder =>
    val item = builder.build()
    builder.reset()
    item
  }
  .concatSubstreams
  .filterNot(_.partNumber.isEmpty)

val resultFuture: Future[Seq[Item]] = source
  .runWith(Sink.seq)

val result: Seq[Item] = Await.result(resultFuture, 5.seconds)

println("items : " + result)
println("END")

The example is posted on Scastie

This approach requires a lot of handlers for each tag (val handle: PartialFunction) which might be error prone and too fragile.

I am wondering how to handle ParseEvent in a more concise way and combine them into the required Item objects. Any suggestion how to avoid a boilerplate code? Is there a more concise pattern for deserializers?

sergiusz.kierat
  • 139
  • 1
  • 10
  • streaming selective parser might help : https://github.com/Tradeshift/ts-reaktive/blob/master/ts-reaktive-marshal/src/test/java/com/tradeshift/reaktive/xml/XMLProtocolSpec.java#L48 I have to check it. – sergiusz.kierat Feb 20 '18 at 10:07

1 Answers1

1

I have cleaned up your code a bit, and extended the builder, to illustrate how it can actually build objects after a series of sub-element events. Maybe others can improve upon my version.

I suggest that you split the stream after the Item is parsed. That way, you can keep using the singleton builder and the simple path stack.

You can add similar handlers and builders for any hypothetical complex subtrees of Item; the only difference is to assign the SubElement.build() result to some ItemBuilder attribute, rather than return it.

If you need to address sub-elements with the same name, you can either look deeper into the path stack, or add additional state tracking.

import akka.stream.alpakka.xml.scaladsl.XmlParsing
import akka.stream.scaladsl._

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import akka.util.ByteString
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.xml.{Characters, EndElement, ParseEvent, StartElement}

import scala.collection.mutable

case class Item(partNumber: String, productName: String)

object Item {
  def apply(path: mutable.Stack[String]): PartialFunction[ParseEvent, Any] = {
    case elem @ StartElement("Item", _, _, _, _) =>
      path.push("Item")
      ItemBuilder.reset()
      ItemBuilder.partNumber = elem.findAttribute("PartNumber").map(_.value).getOrElse("")
    case Characters(text) =>
      path.top match {
        case "ProductName" => ItemBuilder.productName = text
        case _ => ()
      }
    case EndElement("Item") =>
      path.pop()
      ItemBuilder.build()
  }
}

object ItemBuilder {
  var partNumber = ""
  var productName = ""

  def build(): Item =
    Item(
      partNumber = partNumber,
      productName = productName)

  def reset(): Unit = {
    partNumber = ""
    productName = ""
  }
}

object AlpakkaDemo extends App {

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()

  val path: mutable.Stack[String] = new mutable.Stack[String]()

  val xml =
    """<PurchaseOrder PurchaseOrderNumber="99503" OrderDate="1999-10-20">
      |<Address Type="Shipping">
      |  <Name>Ellen Adams</Name>
      |  <Street>123 Maple Street</Street>
      |  <City>Mill Valley</City>
      |  <State>CA</State>
      |  <Zip>10999</Zip>
      |  <Country>USA</Country>
      |</Address>
      |<DeliveryNotes>Please leave packages in shed by driveway.</DeliveryNotes>
      |<Items>
      |  <Item PartNumber="872-AA">
      |    <ProductName>Lawnmower</ProductName>
      |    <Quantity>1</Quantity>
      |    <USPrice>148.95</USPrice>
      |    <Comment>Confirm this is electric</Comment>
      |  </Item>
      |  <Item PartNumber="926-AA">
      |    <ProductName>Baby Monitor</ProductName>
      |    <Quantity>2</Quantity>
      |    <USPrice>39.98</USPrice>
      |    <ShipDate>1999-05-21</ShipDate>
      |  </Item>
      |</Items>
      |</PurchaseOrder>""".stripMargin

  val defaultElementHandler: PartialFunction[ParseEvent, Any] = {
    case StartElement(localName, _, _, _, _) => path.push(localName)
    case EndElement(localName) => path.pop()
    case _ => ()
  }

  val handlersChain: PartialFunction[ParseEvent, Any] =
    Item(path) orElse
    defaultElementHandler

  val source: Source[Item, akka.NotUsed] = Source.single(xml)
    .map(ByteString(_))
    .via(XmlParsing.parser)
    .map(handlersChain)
    .collect {
      case item: Item => item
    }.splitWhen(_ => true)  // also consider mapAsyncUnordered()
    .map {
      identity  // placeholder for any subsequent heavy Item processing
    }
    .concatSubstreams

  val resultFuture: Future[Seq[Item]] = source.runWith(Sink.seq)

  val result: Seq[Item] = Await.result(resultFuture, 5.seconds)

  println("items : " + result)
  println("END")
  system.terminate()
}

Here are also a few (very remotely) related articles that may inspire some more ideas for organizing your event-based parsing.

http://www.ficksworkshop.com/blog/post/design-pattern-for-event-based-parsing-of-hierarchical-data

https://www.xml.com/pub/a/2003/09/17/stax.html

https://www.infoq.com/articles/HIgh-Performance-Parsers-in-Java-V2