I have a stream that contains JSON messages that look like this :
{"operation":"CREATE","data":{"id":"id-1", "value":"value-1"}}
{"operation":"CREATE","data":{"id":"id-2", "value":"value-2"}}
{"operation":"DELETE","data":{"id":"id-1"}}
{"operation":"UPDATE","data":{"id":"id-2", "value":"value-3"}}
This stream is handled in a DataStream<Row>
that is registered as a TableSource
.
I want to use this stream as a changelog stream to update the content of a Flink Table, but I can't find a way to do that.
I have defined a StreamTableSource
as :
public class MyTableSource implements StreamTableSource<Row>, ... {
@Override
public DataStream<Row> getDataStream(final StreamExecutionEnvironment env) {
DataStream<Row> stream = getDataStream(env) // Retrieve changelog stream
.keyBy([SOME KEY]) // Aggregate by key
.map(new MyMapFunction()); // Map the update message with the correct encoding ?
return stream;
}
...
}
And this TableSource
is used in
public void process(final StreamExecutionEnvironment env) {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.registerTableSource("MyTableSource", new MyTableSource());
Table result = tableEnv.sqlQuery("SELECT * FROM MyTableSource"); // This table content should be updated according to operation described in the changelog stream.
result.insertInto([SOME SINK]);
}
What is the good way to do this ? (And more specificaly, how can I use a stream to delete rows from a Table ?)