apache-flinkflink-streamingflink-sqlmatch-recognize

Nested match_recognize query not supported in flink SQL?


I am using flink 1.11 and trying nested query where match_recognize is inside, as shown below :

select * from events where id = (SELECT * FROM events MATCH_RECOGNIZE (PARTITION BY org_id ORDER BY proctime MEASURES A.id AS startId ONE ROW PER MATCH PATTERN (A C* B) DEFINE A AS A.tag = 'tag1', C AS C.tag <> 'tag2', B AS B.tag = 'tag2'));

And I am getting an error as : org.apache.calcite.sql.validate.SqlValidatorException: Table 'A' not found

Is this not supported ? If not what's the alternative ?


Solution

  • I was able to get something working by doing this:

    Table events = tableEnv.fromDataStream(input,
        $("sensorId"),
        $("ts").rowtime(),
        $("kwh"));
    
    tableEnv.createTemporaryView("events", events);
    
    Table matches = tableEnv.sqlQuery(
                    "SELECT id " +
                        "FROM events " +
                        "MATCH_RECOGNIZE ( " +
                            "PARTITION BY sensorId " +
                            "ORDER BY ts " +
                            "MEASURES " +
                                "this_step.sensorId AS id " +
                            "AFTER MATCH SKIP TO NEXT ROW " +
                            "PATTERN (this_step next_step) " +
                            "DEFINE " +
                                "this_step AS TRUE, " +
                                "next_step AS TRUE " +
                        ")"
            );
    
    tableEnv.createTemporaryView("mmm", matches);
    
    Table results = tableEnv.sqlQuery(
        "SELECT * FROM events WHERE events.sensorId IN (select * from mmm)");
    
    tableEnv
        .toAppendStream(results, Row.class)
        .print();
    
    

    For some reason, I couldn't get it to work without defining a view. I kept getting Calcite errors.

    I guess you are trying to avoid enumerating all of the columns from A in the MEASURES clause of the MATCH_RECOGNIZE. You may want to compare the resulting execution plans to see if there's any significant difference.