I read json data from Kafka and tried to process the data with flink table API.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(
"create table inputTable(" +
"`src_ip` STRING," +
"`src_port` STRING," +
"`bytes_from_src` BIGINT," +
"`pkts_from_src` BIGINT," +
"`ts` TIMESTAMP(2) METADATA FROM 'timestamp'," +
"WATERMARK FOR ts AS ts" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'test'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'testGroup'," +
"'scan.startup.mode' = 'earliest-offset'," +
"'format' = 'json'," +
"'json.fail-on-missing-field' = 'true'," +
"'json.ignore-parse-errors' = 'false'" +
")");
Table inputTable = tEnv.from("inputTable");
inputTable.printSchema();
inputTable.execute().print();
Table windowedTable = inputTable
.window(Tumble.over(lit(5).seconds()).on($("ts")).as("w"))
.groupBy($("w"), $("src_ip"))
.select($("w").start().as("window_start"),
$("src_ip"),
$("src_ip").count().as("src_ip_count"),
$("bytes_from_src").avg().as("bytes_from_src_mean")
);
windowedTable.execute().print();
There are 4 records in Kafka. The flink program prints out the schema info and the inputTable as the following:
Connected to the target VM, address: '127.0.0.1:62348', transport: 'socket'
(
`src_ip` STRING,
`src_port` STRING,
`bytes_from_src` BIGINT,
`pkts_from_src` BIGINT,
`ts` TIMESTAMP(2) *ROWTIME* METADATA FROM 'timestamp',
WATERMARK FOR `ts`: TIMESTAMP(2) AS `ts`
)
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| op | src_ip | src_port | bytes_from_src | pkts_from_src | ts |
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| +I | 44.38.5.31 | 53159 | 120 | 3 | 2021-08-13 14:59:56.00 |
| +I | 44.38.132.51 | 39409 | 100 | 2 | 2021-08-13 14:58:11.00 |
| +I | 44.38.4.44 | 56758 | 336 | 6 | 2021-08-13 14:59:14.00 |
| +I | 44.38.5.34 | 40001 | 80 | 2 | 2021-08-13 14:57:04.00 |
After that, nothing is printed out. The program did not exit. I am running the flink within IDEA. At this point, it seems like a black box. There is no output, and I do not know how to trace a flink program.
If I commented out the line inputTable.execute().print();
, the schema info is printed out, but nothing after that and the program does not exit.
The flink version used is 1.14.2.