0

I read json data from Kafka and tried to process the data with flink table API.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(
    "create table inputTable(" +
    "`src_ip` STRING," +
    "`src_port` STRING," +
    "`bytes_from_src` BIGINT," +
    "`pkts_from_src` BIGINT," +
    "`ts` TIMESTAMP(2) METADATA FROM 'timestamp'," +
    "WATERMARK FOR ts AS ts" +
") WITH (" +
    "'connector' = 'kafka'," +
    "'topic' = 'test'," +
    "'properties.bootstrap.servers' = 'localhost:9092'," +
    "'properties.group.id' = 'testGroup'," +
    "'scan.startup.mode' = 'earliest-offset'," +
    "'format' = 'json'," +
    "'json.fail-on-missing-field' = 'true'," +
    "'json.ignore-parse-errors' = 'false'" +
")");

Table inputTable = tEnv.from("inputTable");
inputTable.printSchema();
inputTable.execute().print();

Table windowedTable = inputTable
   .window(Tumble.over(lit(5).seconds()).on($("ts")).as("w"))
   .groupBy($("w"), $("src_ip"))
   .select($("w").start().as("window_start"),
           $("src_ip"),
           $("src_ip").count().as("src_ip_count"),                         
           $("bytes_from_src").avg().as("bytes_from_src_mean")                     
    );
windowedTable.execute().print();

There are 4 records in Kafka. The flink program prints out the schema info and the inputTable as the following:

Connected to the target VM, address: '127.0.0.1:62348', transport: 'socket'
(
  `src_ip` STRING,
  `src_port` STRING,
  `bytes_from_src` BIGINT,
  `pkts_from_src` BIGINT,
  `ts` TIMESTAMP(2) *ROWTIME* METADATA FROM 'timestamp',
  WATERMARK FOR `ts`: TIMESTAMP(2) AS `ts`
)
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| op |                         src_ip |                       src_port |       bytes_from_src |        pkts_from_src |                      ts |
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| +I |                     44.38.5.31 |                          53159 |                  120 |                    3 |  2021-08-13 14:59:56.00 |
| +I |                   44.38.132.51 |                          39409 |                  100 |                    2 |  2021-08-13 14:58:11.00 |
| +I |                     44.38.4.44 |                          56758 |                  336 |                    6 |  2021-08-13 14:59:14.00 |
| +I |                     44.38.5.34 |                          40001 |                   80 |                    2 |  2021-08-13 14:57:04.00 |

After that, nothing is printed out. The program did not exit. I am running the flink within IDEA. At this point, it seems like a black box. There is no output, and I do not know how to trace a flink program.

If I commented out the line inputTable.execute().print();, the schema info is printed out, but nothing after that and the program does not exit.

The flink version used is 1.14.2.

Charles Ju
  • 1,095
  • 1
  • 9
  • 28

1 Answers1

0

I believe those records are being processed, and are being added to the window. But event time windows are triggered by watermarks, and the watermark isn't becoming large enough to trigger the window. To get this to work you need to process an event with a timestamp past the end of the window -- i.e., 2021-08-13 15:00:00.00 or larger.

For debugging, the Flink web dashboard is helpful in situations like this. You can see if events are being processed, examine the watermarks, etc. See Flink webui when running from IDE for help in setting it up.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • The timestamp of the data in Kafka is out of order, which cause the problem I was experiencing. I have one followup question regarding table API. The inputTable is defined by the DDL. However there is no key involved as in keyed Window. As there are several source reader fetching data from Kafka, does each source reader maintain a separate "table instance" for parallel processing? Is there a way to key the data source and then define table on the keyed data source? – Charles Ju Jun 29 '22 at 01:24
  • When you use GROUP BY in Flink SQL that is implemented as a keyBy -- so this is already a keyed window, keyed by the src_ip. – David Anderson Jun 29 '22 at 10:38