apache-flinkwindow-functionsflink-sqlflink-table-api

Is there a Flink Table API equivalent to Window Functions using row_number(), rank(), dense_rank()?


In an attempt to discover the possibilities and limitations of the Flink Table API for use in a current project, I was trying to translate a Flink SQL statement into its equivalent Flink Table API version.

For most parts, I am able to translate the statement using the documentation except for the window function row_number().

Flink SQL (working)

final Table someTable = tableEnvironment.sqlQuery("SELECT" +
            "     T.COLUMN_A," +
            "     T.COLUMN_B," +
            "     T.COLUMN_C," +
            "     row_number() OVER (" +
            "         PARTITION BY" +
            "             T.COLUMN_A" +
            "         ORDER BY" +
            "             T.EVENT_TIME DESC" +
            "     ) AS ROW_NUM" +
            " FROM SOME_TABLE T"
    )
    .where($("ROW_NUM").isEqual(1))
    .select(
            $("COLUMN_A"),
            $("COLUMN_B"),
            $("COLUMN_C")
    );

The closest I get, is the code below, but I don't seem to find what should be placed at the location of the question marks (/* ??? */).

Flink Table API (not working)

final Table someTable = tableEnvironment.from("SOME_TABLE")
    .window(Over.partitionBy($("COLUMN_A"))
            .orderBy($("EVENT_TIME").desc())
            .as($("window"))
    )
    .select(
            $("COLUMN_A"),
            $("COLUMN_B"),
            $("COLUMN_C"),
            /* ??? */.over($("window")).as("ROW_NUM")
    )
    .where($("ROW_NUM").isEqual(1));

On https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#over-window-aggregation I find how it works for other window functions like avg(), min(), max()...; but the one(s) I require (row_number(), rank(), dense_rank()) are not (yet) described on this page.

My question is twofold:

Additional information:

Thank you in advance for you help!


Solution

  • The page where you can look this up is at https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/systemfunctions/. You will see that ROW_NUMBER, RANK, and DENSE_RANK have examples for SQL, but not for Table API.

    In the end, it shouldn't matter though. As you've done, you can just use SQL directly in your Table API program.