I want to join a big table, impossible to be contained in TM memory and a stream (kakfa). I successfully joined both on my tests, mixing table-api with datastream api. I did the following:
val stream: DataStream[MyEvent] = env.addSource(...)
stream
.timeWindowAll(...)
.trigger(...)
.process(new ProcessAllWindowFunction[MyEvent, MyEvent, TimeWindow] {
var tableEnv: StreamTableEnvironment = _
override def open(parameters: Configuration): Unit = {
//init table env
}
override def process(context: Context, elements: Iterable[MyEvent], out: Collector[MyEvent]): Unit = {
val table = tableEnv.sqlQuery(...)
elements.map(e => {
//do process
out.collect(...)
})
}
})
It is working, but I have never seen anywhere this type of implementation. Is it ok ? what would be the drawback ?