Following use case:
I want to aggregate data for a specific time and then downstream them. Since the built-in suppress-feature does not support wall clock time, I have to implement this on my own by using a transformer.
After the time window is closed I downstream the aggregated data and delete them from the state store. I tested the behaviour with a limited amount of data. I.e. after all data have been processed the state store should be empty again and the memory should decrease. Unfortunately the memory always stays at the same level.
SuppressTransformer.scala
class SuppressTransformer[T](stateStoreName: String, windowDuration: Duration) extends Transformer[String, T, KeyValue[String, T]] {
val scheduleInterval: Duration = Duration.ofSeconds(180)
private val keySet = mutable.HashSet.empty[String]
var context: ProcessorContext = _
var store: SessionStore[String, Array[T]] = _
override def init(context: ProcessorContext): Unit = {
this.context = context;
this.store = context.getStateStore(stateStoreName).asInstanceOf[SessionStore[String, Array[T]]]
this.context.schedule(
scheduleInterval,
PunctuationType.WALL_CLOCK_TIME,
_ => {
for (key <- keySet) {
val storeEntry = store.fetch(key)
while (storeEntry.hasNext) {
val keyValue: KeyValue[Windowed[String], Array[T]] = storeEntry.next()
val peekKey = keyValue.key
val now = Instant.now()
val windowAge: Long = ChronoUnit.SECONDS.between(peekKey.window().startTime(), now)
if (peekKey.window().start() > 0 && windowAge > windowDuration.toSeconds) { // Check if window is exceeded. If yes, downstream data
val windowedKey: Windowed[String] = keyValue.key
val storeValue = keyValue.value
context.forward(key, storeValue, To.all().withTimestamp(now.toEpochMilli))
context.commit()
this.store.remove(windowedKey) // Delete entry from state store
keySet -= key
}
}
storeEntry.close() // Close iterator to avoid memory leak
}
}
)
}
override def transform(key: String, value: T): KeyValue[String, T] = {
if (!keySet.contains(key)) {
keySet += key
}
null
}
override def close(): Unit = {}
}
class SuppressTransformerSupplier[T](stateStoreName: String, windowDuration: Duration) extends TransformerSupplier[String, T, KeyValue[String, T]] {
override def get(): SuppressTransformer[T] = new SuppressTransformer(stateStoreName, windowDuration)
}
Topology.scala
val windowDuration = Duration.ofMinutes(5)
val stateStore: Materialized[String, util.ArrayList[Bytes], ByteArraySessionStore] =
Materialized
.as[String, util.ArrayList[Bytes]](
new RocksDbSessionBytesStoreSupplier(stateStoreName,
stateStoreRetention.toMillis)
)
builder.stream[String, Bytes](Pattern.compile(topic + "(-\\d+)?"))
.filter((k, _) => k != null)
.groupByKey
.windowedBy(SessionWindows `with` sessionWindowMinDuration `grace` sessionGracePeriodDuration)
.aggregate(initializer = {
new util.ArrayList[Bytes]()
}
)(aggregator = (_: String, instance: Bytes, agg: util.ArrayList[Bytes]) => {
agg.add(instance)
agg
}, merger = (_: String, state1: util.ArrayList[Bytes], state2: util.ArrayList[Bytes]) => {
state1.addAll(state2)
state1
}
)(stateStore)
.toStream
.map((k, v) => (k.key(), v))
.transform(new SuppressTransformerSupplier[util.ArrayList[Bytes]](stateStoreName, windowDuration), stateStoreName)
.unsetRepartitioningRequired()
.to(f"$topic-aggregated")