I am using flink table API to calculate few aggregations. I have stream of data coming from Kafka which is transformed to stream of rows. Using this rows I am creating dynamic table. Ex: consider below three records, primary key is "id"
DataStream<Row> stream = Row.of(RowKind.UPSERT_AFTER, {"id": 1, "globalId": 123, "demand": 10}), Row.of(RowKind.UPSERT_AFTER, {"id": 2, "globalId": 123, "demand": 20}), Row.of(RowKind.UPSERT_AFTER, {"id": 1, "globalId": 123, "demand": 30}).....
(Not exactly like this, but consider these as Row of id, globalId, demand fields)
When I create table using above stream,
Table streamTable = tableEnv.fromChangeLogStream(stream, Schema.builder().primaryKey("id")).
I see below output
rowKind | id | globalId | demand |
---|---|---|---|
+I | 1 | 123 | 10. |
+I | 2 | 123. | 20. |
-U | 1 | 123 | 10.(invalidate for id 1, added new entry below) |
+U | 1 | 123. | 30. |
I am using this table to calculate "sum of demand grouped by globalId"
Table demandSum = tableEnv.sqlQuery("select globalId, sum(demand) from "+ streamTable + " group by globalId"); DataStream<Row> final = tableEnv.toChangelogStream(demandSum); final.print();
I am getting below output , which has some intermediate values, because it has to consider "-U" from streamTable. I see a subtracted value in between but I am only interested in end value.
+I, 123, 10
-U, 123, 10
+U, 123, 30 -- correct till this point
-U, 123, 30
+U, 123, 20 -- this dip is because it has to first subtract demand 10 of id 1 and then add 30. But I don't want this in my output stream.
-U, 123, 20
+U, 123, 50 -- again correct value at end
How to handle this case in TableAPI?
I tried using upsertMode so I see only +I /+U in streamTable. But for final table to calculate value correctly it needs -U. For upsertMode , it just added 2 value So total value is correct. In this final result was 60, instead of 50
Window on final table: tried to get only top value of window. This is simple example, But I have usecases where for one -U in first table can generate many intermediate values in final table. But this window can end at any wrong value and I don't have any field to identify if it's right /wrong
I have achieved this use case using stream APIs in flink. But Table API is much easier to write and developer friendly. So want these use cases using Table APIs.