2

I am using flink table API to calculate few aggregations. I have stream of data coming from Kafka which is transformed to stream of rows. Using this rows I am creating dynamic table. Ex: consider below three records, primary key is "id"

DataStream<Row> stream = Row.of(RowKind.UPSERT_AFTER, {"id": 1, "globalId": 123, "demand": 10}), Row.of(RowKind.UPSERT_AFTER, {"id": 2, "globalId": 123, "demand": 20}), Row.of(RowKind.UPSERT_AFTER, {"id": 1, "globalId": 123, "demand": 30}).....

(Not exactly like this, but consider these as Row of id, globalId, demand fields)

When I create table using above stream,

Table streamTable = tableEnv.fromChangeLogStream(stream, Schema.builder().primaryKey("id")). 

I see below output

rowKind id globalId demand
+I 1 123 10.
+I 2 123. 20.
-U 1 123 10.(invalidate for id 1, added new entry below)
+U 1 123. 30.

I am using this table to calculate "sum of demand grouped by globalId"

Table demandSum = tableEnv.sqlQuery("select globalId, sum(demand) from "+ streamTable + " group by globalId"); DataStream<Row> final = tableEnv.toChangelogStream(demandSum); final.print();

I am getting below output , which has some intermediate values, because it has to consider "-U" from streamTable. I see a subtracted value in between but I am only interested in end value.

+I, 123, 10

-U, 123, 10

+U, 123, 30 -- correct till this point

-U, 123, 30

+U, 123, 20 -- this dip is because it has to first subtract demand 10 of id 1 and then add 30. But I don't want this in my output stream.

-U, 123, 20

+U, 123, 50 -- again correct value at end

How to handle this case in TableAPI?

  1. I tried using upsertMode so I see only +I /+U in streamTable. But for final table to calculate value correctly it needs -U. For upsertMode , it just added 2 value So total value is correct. In this final result was 60, instead of 50

  2. Window on final table: tried to get only top value of window. This is simple example, But I have usecases where for one -U in first table can generate many intermediate values in final table. But this window can end at any wrong value and I don't have any field to identify if it's right /wrong

I have achieved this use case using stream APIs in flink. But Table API is much easier to write and developer friendly. So want these use cases using Table APIs.

Aishwarya
  • 21
  • 1

0 Answers0