2

We are interested in connecting to a regular Flink Streaming application from new Stateful Functions , ideally using the Table API. The idea is to consult tables registered in Flink from Statefun, is this possible, and what is the right way to do it?

My idea so far has been to initialize my table stream in some main function and register a stateful function provider to connect to the table:

@AutoService(StatefulFunctionModule.class)
public class Module implements StatefulFunctionModule {

  @Override
  public void configure(Map<String, String> globalConfiguration, Binder binder) {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // ingest a DataStream from an external source
    DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);

    // SQL query with an inlined (unregistered) table
    Table myTable = tableEnv.fromDataStream(ds, "user, product, amount");
    tableEnv.createTemporaryView("my_table", myTable);

    TableFunctionProvider tableProvider = new TableFunctionProvider();
    binder.bindFunctionProvider(FnEnrichmentCallback.TYPE, tableProvider);

    //continue registering my other messages
    //...
  }
}

The stateful function provider would return a FnTableQuery which simply queries the table whenever it receives a message:

public class TableFunctionProvider implements StatefulFunctionProvider {

  @Override
  public StatefulFunction functionOfType(FunctionType type) {
    return new FnTableQuery();
  }
}

The query function object would then operate as an actor for every established process, and simply query the table when invoked:

public class FnTableQuery extends StatefulMatchFunction {

  static final FunctionType TYPE = new FunctionType(Identifiers.NAMESPACE, "my-table");

  private Table myTable;

  @Override
  public void configure(MatchBinder binder) {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    myTable = tableEnv.from("my_table");

    binder
        .otherwise(this::catchAll);
  }

  private void catchAll(Context context, Object message) {
    context.send(FnEnrichmentCallback.TYPE, myTable.select("max(amount)").toString(), message);
  }
}

I apologize in advance if this approach doesn't make sense, because I don't know if:

  1. Flink and Statefun applications can work together outside the realm of sources/sinks, especially since this particular function is stateless and the table is stateful

  2. We can query Flink tables like this, I have only queried them as an intermediate object to send to a sink or datastream

  3. It makes sense to initialize things in Module.configure, and if both the stateful function provider and its match function are called once per parallel worker

David Anderson
  • 39,434
  • 4
  • 33
  • 60
Leo
  • 21
  • 1

1 Answers1

0

The Apache Flink community does have in mind to support Flink DataStreams as StateFun ingress / egresses in the future.

What this would mean is that you can take the result streams of using the Flink Table API / Flink CEP / DataStream API etc., and invoke functions using the events in the streams.

Gordon Tai
  • 41
  • 2