1

I have a single stream with multiple different types of json messages. There are 65 json event types in total, all with different schemas. All of them share a user id in common.

{'id': 123, 'event': 'clicked', 'target': 'my_button'}
{'id': 123, 'event': 'viewed', 'website': 'http://xyz1...'}
{'id': 123, 'event': 'viewed', 'website': 'http://xyz2...'}
{'id': 123, 'event': 'login', 'username': 'Bob'}
{'id': 456, 'event': 'viewed', 'website': 'http://xyz3...'}
{'id': 456, 'event': 'login', 'username': 'Susy'}

I would like to process all event types, each with custom fields, and then aggregate everything by user across all filter types.

{'id': 123, 'page_view_cnt': 100, 'user': 'Bob', 'click_cnt': 20}
{'id': 456, 'page_view_cnt': 14, 'user': 'Susy'}

Does anyone know an efficient way to do this. Here is current thought process

  • Start with stream of lines
  • Use GSON to parse the json, instead of using built-in json parser which may try to infer types .
  • Create 65 filters statements based on each type. The json will have event=xyz I can differentiate on.
  • Aggregate the custom properties on each filter into a mapping of user id -> properties
  • Merge all maps from all filters

Does that sound sane or is there a better way to do this?

ForeverConfused
  • 1,607
  • 3
  • 26
  • 41
  • could you clarify the apparent contradiction of “I have a single stream” and “process these streams and merge back” ? – Christophe Mar 20 '19 at 08:34
  • Thanks, I'll update question. The 65 json types come in a single stream, but with many different types of events. I meant process all these events from the same stream, then merge them back to single user ids. – ForeverConfused Mar 20 '19 at 08:51
  • Is it possible to have multiple users with same event id? In the output example both aggregations have `id=123` and users `Bob` and `Sue`. If that is the case, how to know if this is Bob or Sue `{'id': 123, 'event': 'viewed', 'website': 'http://xyz2...'}`? – ollik1 Mar 21 '19 at 08:04

1 Answers1

1

Here is what I came up with using the RDD API and Jackson. I chose the low level Spark API as it is schemaless and not sure how the structured API would fit with the variable input event types. If the mentioned Gson supports polymorphic deserialisation it can be used instead of Jackson, I simply picked Jackson as I'm more familiar with it.

The problem can be split into steps:

  1. Deserialize input into objects by the event type.
  2. Reduce by id and type. The reduce needs to behave differently for different types, for example views are simply reduced to a sum while user name needs to be handled in a different way. In this example let's just assume user name is unique within id and pick the first one.
  3. Collect the reduced items by id.

Step 2 needs most attention as there is no such functionality in the Spark API and there needs to be some kind of runtime checking if the deserialized events are of different class. To overcome this, let's introduce a generic trait Reducible that can encapsulate different types:

trait Reducible[T] {
    def reduce(that: Reducible[_]): this.type

    def value: T
}

// simply reduces to sum
case class Sum(var value: Int) extends Reducible[Int] {
    override def reduce(that: Reducible[_]): Sum.this.type = that match {
        case Sum(thatValue) =>
            value += thatValue
            this
    }
}

// for picking the first element, i.e. username
case class First(value: String) extends Reducible[String] {
    override def reduce(that: Reducible[_]): First.this.type = this
}

The runtime checking is handled in these classes, for example Sum will fail if the right hand object is not of the same type.

Next, let's define the models for the events and tell Jackson how to handle polymorphism:

@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, include=JsonTypeInfo.As.PROPERTY, property="event", visible=true)
sealed trait Event[T] {
    val id: Int
    val event: String

    def value: Reducible[T]
}

abstract class CountingEvent extends Event[Int] {
    override def value: Reducible[Int] = Sum(1)
}

@JsonTypeName("clicked") case class Click(id: Int, event: String, target: String) extends CountingEvent
@JsonTypeName("viewed") case class View(id: Int, event: String, website: String) extends CountingEvent
@JsonTypeName("login") case class Login(id: Int, event: String, username: String) extends Event[String] {
    override def value: Reducible[String] = First(username)
}

object EventMapper {
    private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
    // the list of classes could be auto-generated, see
    // https://stackoverflow.com/questions/34534002/getting-subclasses-of-a-sealed-trait
    mapper.registerSubtypes(classOf[Click], classOf[View], classOf[Login])

    def apply(v1: String): Event[_] = mapper.readValue(v1, classOf[Event[_]])
}

All events are expected to have fields id and event. The latter one is used to determine into which class to deserialize, Jackson needs to know all the classes beforehand. Trait Event is declared as a sealed trait so all the implementing classes can be determined at compile time. I'm omitting this reflective step and simply hard-coding the list of classes, there is a good answer here how to do it automatically Getting subclasses of a sealed trait

Now we are ready to write the application logic. For the sake of simplicity, sc.parallelize is used to load the example data. Spark streaming could be used as well.

val in = List(
    "{\"id\": 123, \"event\": \"clicked\", \"target\": \"my_button\"}",
    "{\"id\": 123, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}",
    "{\"id\": 123, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}",
    "{\"id\": 123, \"event\": \"login\", \"username\": \"Bob\"}",
    "{\"id\": 456, \"event\": \"login\", \"username\": \"Sue\"}",
    "{\"id\": 456, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}"
)

// partition (id, event) pairs only by id to minimize shuffle
// when we later group by id
val partitioner = new HashPartitioner(10) {

    override def getPartition(key: Any): Int = key match {
        case (id: Int, _) => super.getPartition(id)
        case id: Int => super.getPartition(id)
    }
}

sc.parallelize(in)
    .map(EventMapper.apply)
    .keyBy(e => (e.id, e.event))
    .mapValues(_.value)
    .reduceByKey(partitioner, (left, right) => left.reduce(right))
    .map {
        case ((id, key), wrapper) => (id, (key, wrapper.value))
    }
    .groupByKey(partitioner)
    .mapValues(_.toMap)
    .foreach(println)

Output:

(123,Map(clicked -> 1, viewed -> 2, login -> Bob))
(456,Map(login -> Sue, viewed -> 1))
ollik1
  • 4,460
  • 1
  • 9
  • 20