flink-streamingflink-table-api

flink table API not processing records


I read json data from Kafka and tried to process the data with flink table API.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(
    "create table inputTable(" +
    "`src_ip` STRING," +
    "`src_port` STRING," +
    "`bytes_from_src` BIGINT," +
    "`pkts_from_src` BIGINT," +
    "`ts` TIMESTAMP(2) METADATA FROM 'timestamp'," +
    "WATERMARK FOR ts AS ts" +
") WITH (" +
    "'connector' = 'kafka'," +
    "'topic' = 'test'," +
    "'properties.bootstrap.servers' = 'localhost:9092'," +
    "'properties.group.id' = 'testGroup'," +
    "'scan.startup.mode' = 'earliest-offset'," +
    "'format' = 'json'," +
    "'json.fail-on-missing-field' = 'true'," +
    "'json.ignore-parse-errors' = 'false'" +
")");

Table inputTable = tEnv.from("inputTable");
inputTable.printSchema();
inputTable.execute().print();

Table windowedTable = inputTable
   .window(Tumble.over(lit(5).seconds()).on($("ts")).as("w"))
   .groupBy($("w"), $("src_ip"))
   .select($("w").start().as("window_start"),
           $("src_ip"),
           $("src_ip").count().as("src_ip_count"),                         
           $("bytes_from_src").avg().as("bytes_from_src_mean")                     
    );
windowedTable.execute().print();

There are 4 records in Kafka. The flink program prints out the schema info and the inputTable as the following:

Connected to the target VM, address: '127.0.0.1:62348', transport: 'socket'
(
  `src_ip` STRING,
  `src_port` STRING,
  `bytes_from_src` BIGINT,
  `pkts_from_src` BIGINT,
  `ts` TIMESTAMP(2) *ROWTIME* METADATA FROM 'timestamp',
  WATERMARK FOR `ts`: TIMESTAMP(2) AS `ts`
)
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| op |                         src_ip |                       src_port |       bytes_from_src |        pkts_from_src |                      ts |
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| +I |                     44.38.5.31 |                          53159 |                  120 |                    3 |  2021-08-13 14:59:56.00 |
| +I |                   44.38.132.51 |                          39409 |                  100 |                    2 |  2021-08-13 14:58:11.00 |
| +I |                     44.38.4.44 |                          56758 |                  336 |                    6 |  2021-08-13 14:59:14.00 |
| +I |                     44.38.5.34 |                          40001 |                   80 |                    2 |  2021-08-13 14:57:04.00 |

After that, nothing is printed out. The program did not exit. I am running the flink within IDEA. At this point, it seems like a black box. There is no output, and I do not know how to trace a flink program.

If I commented out the line inputTable.execute().print();, the schema info is printed out, but nothing after that and the program does not exit.

The flink version used is 1.14.2.


Solution

  • I believe those records are being processed, and are being added to the window. But event time windows are triggered by watermarks, and the watermark isn't becoming large enough to trigger the window. To get this to work you need to process an event with a timestamp past the end of the window -- i.e., 2021-08-13 15:00:00.00 or larger.

    For debugging, the Flink web dashboard is helpful in situations like this. You can see if events are being processed, examine the watermarks, etc. See Flink webui when running from IDE for help in setting it up.