2

I want to join a big table, impossible to be contained in TM memory and a stream (kakfa). I successfully joined both on my tests, mixing table-api with datastream api. I did the following:

val stream: DataStream[MyEvent] = env.addSource(...)
stream
   .timeWindowAll(...)
   .trigger(...)
   .process(new ProcessAllWindowFunction[MyEvent, MyEvent, TimeWindow] {
        
        var tableEnv: StreamTableEnvironment = _
        
        override def open(parameters: Configuration): Unit = {
          //init table env
        }

        override def process(context: Context, elements: Iterable[MyEvent], out: Collector[MyEvent]): Unit = {
          val table = tableEnv.sqlQuery(...)
          elements.map(e => {
            //do process
            out.collect(...)
          })
        }
      })

It is working, but I have never seen anywhere this type of implementation. Is it ok ? what would be the drawback ?

Fray
  • 173
  • 6

1 Answers1

1

One should not use StreamExecutionEnvironment or TableEnvironment within a Flink function. An environment is used to construct a pipeline that is submitted to the cluster.

Your example submits a job to the cluster within a cluster's job.

This might work for certain use cases but is generally discouraged. Imagine your outer stream contains thousands of events and your function would create a job for every event, it could potentially DDoS your cluster.

twalthr
  • 2,584
  • 16
  • 15