Here is what I came up with using the RDD API and Jackson. I chose the low level Spark API as it is schemaless and not sure how the structured API would fit with the variable input event types. If the mentioned Gson supports polymorphic deserialisation it can be used instead of Jackson, I simply picked Jackson as I'm more familiar with it.
The problem can be split into steps:
- Deserialize input into objects by the event type.
- Reduce by id and type. The reduce needs to behave differently for different types, for example views are simply reduced to a sum while user name needs to be handled in a different way. In this example let's just assume user name is unique within
id
and pick the first one.
- Collect the reduced items by
id
.
Step 2 needs most attention as there is no such functionality in the Spark API and there needs to be some kind of runtime checking if the deserialized events are of different class. To overcome this, let's introduce a generic trait Reducible
that can encapsulate different types:
trait Reducible[T] {
def reduce(that: Reducible[_]): this.type
def value: T
}
// simply reduces to sum
case class Sum(var value: Int) extends Reducible[Int] {
override def reduce(that: Reducible[_]): Sum.this.type = that match {
case Sum(thatValue) =>
value += thatValue
this
}
}
// for picking the first element, i.e. username
case class First(value: String) extends Reducible[String] {
override def reduce(that: Reducible[_]): First.this.type = this
}
The runtime checking is handled in these classes, for example Sum
will fail if the right hand object is not of the same type.
Next, let's define the models for the events and tell Jackson how to handle polymorphism:
@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, include=JsonTypeInfo.As.PROPERTY, property="event", visible=true)
sealed trait Event[T] {
val id: Int
val event: String
def value: Reducible[T]
}
abstract class CountingEvent extends Event[Int] {
override def value: Reducible[Int] = Sum(1)
}
@JsonTypeName("clicked") case class Click(id: Int, event: String, target: String) extends CountingEvent
@JsonTypeName("viewed") case class View(id: Int, event: String, website: String) extends CountingEvent
@JsonTypeName("login") case class Login(id: Int, event: String, username: String) extends Event[String] {
override def value: Reducible[String] = First(username)
}
object EventMapper {
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
// the list of classes could be auto-generated, see
// https://stackoverflow.com/questions/34534002/getting-subclasses-of-a-sealed-trait
mapper.registerSubtypes(classOf[Click], classOf[View], classOf[Login])
def apply(v1: String): Event[_] = mapper.readValue(v1, classOf[Event[_]])
}
All events are expected to have fields id
and event
. The latter one is used to determine into which class to deserialize, Jackson needs to know all the classes beforehand. Trait Event
is declared as a sealed trait so all the implementing classes can be determined at compile time. I'm omitting this reflective step and simply hard-coding the list of classes, there is a good answer here how to do it automatically Getting subclasses of a sealed trait
Now we are ready to write the application logic. For the sake of simplicity, sc.parallelize
is used to load the example data. Spark streaming could be used as well.
val in = List(
"{\"id\": 123, \"event\": \"clicked\", \"target\": \"my_button\"}",
"{\"id\": 123, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}",
"{\"id\": 123, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}",
"{\"id\": 123, \"event\": \"login\", \"username\": \"Bob\"}",
"{\"id\": 456, \"event\": \"login\", \"username\": \"Sue\"}",
"{\"id\": 456, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}"
)
// partition (id, event) pairs only by id to minimize shuffle
// when we later group by id
val partitioner = new HashPartitioner(10) {
override def getPartition(key: Any): Int = key match {
case (id: Int, _) => super.getPartition(id)
case id: Int => super.getPartition(id)
}
}
sc.parallelize(in)
.map(EventMapper.apply)
.keyBy(e => (e.id, e.event))
.mapValues(_.value)
.reduceByKey(partitioner, (left, right) => left.reduce(right))
.map {
case ((id, key), wrapper) => (id, (key, wrapper.value))
}
.groupByKey(partitioner)
.mapValues(_.toMap)
.foreach(println)
Output:
(123,Map(clicked -> 1, viewed -> 2, login -> Bob))
(456,Map(login -> Sue, viewed -> 1))