apache-flinkpostgisflink-streamingflink-sqlapache-sedona

Sedona Flink SQL Lookup on external database failing when using 'FOR SYSTEM_TIME AS OF' statement


I' m writing a Flink application with the goal of enriching the data obtained from a Kinesis source with data present on a database (Postgis).

I found on the docs that the available feature on Flink to achieve that is to use a SQL Lookup Join.

However, when trying to use it the application raises "Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's time attribute field" error.

This is the code snippet of what I'm trying to do:

// Using Sedona Flink SQL

...

DataStream<Row> inputStream = geometryStream
                .map(i -> Row.of(i.f0.customerId(), i.f1))
                .returns(
                        Types.ROW_NAMED(
                                new String[]{"customer_id", "geometry"},
                                Types.STRING, Types.GENERIC(Geometry.class)
                        )
                );

Table inputStreamTbl = sedona.fromDataStream(
                inputStream,
                Schema.newBuilder()
                        // "proc_time" would be needed to make the lookup using 'FOR SYSTEM_TIME AS OF'; see: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
                        .columnByExpression("proc_time", "PROCTIME()")
                        // .columnByExpression("proc_time", "CURRENT_TIMESTAMP") -> doesn't work either
                        // .columnByExpression("proc_time", "CURRENT_TIME") -> doesn't work either
                        .build()
        );

inputStreamTbl.printSchema();
DataStream<Row> enrichedInputStream = sedona.toDataStream(inputStreamTbl);
sedona.createTemporaryView("Input", enrichedInputStream);

Table postgisTbl = sedona.from(
        TableDescriptor
                .forConnector("jdbc")
                .option("url", "jdbc:postgresql://localhost:5432/sedona-flink-test")
                .option("table-name", "public.vw_customer_area")
                ...
                .schema(
                        Schema.newBuilder()
                                .column("area_id", DataTypes.STRING().notNull())
                                .column("customer_id", DataTypes.STRING().notNull())
                                .column("area_geometry", DataTypes.BYTES().notNull())
                                .column("created_at", DataTypes.TIMESTAMP())
                                .column("updated_at", DataTypes.TIMESTAMP())
                                .columnByExpression("rowtime", "CAST(updated_at AS TIMESTAMP_LTZ(3))")
                                .build()
                )
                .build()
);
postgisTbl.printSchema();
sedona.createTemporaryView("CustomerArea", postgisTbl);

Table joinedTable = sedona.sqlQuery(
"SELECT i.proc_time, i.customer_id, ca.area_id " +
        "FROM Input AS i " +
        // FIXME the lookup should be made on Postgres but it's not working using the 'FOR SYSTEM_TIME AS OF' statement
        "INNER JOIN CustomerArea FOR SYSTEM_TIME AS OF i.proc_time AS ca ON ca.customer_id = i.customer_id " +
        "WHERE ST_Intersects(i.geometry, ST_GeomFromWKB(ca.area_geometry))"
);
joinedTable.printSchema();
DataStream<Row> joinedStream = sedona.toDataStream(joinedTable);
joinedStream.print("sedona-sql-join-stream");

And this is the output:

(
  `customer_id` STRING NOT NULL,
  `geometry` RAW('org.locationtech.jts.geom.Geometry', '...'),
  `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
)
(
  `area_id` STRING NOT NULL,
  `customer_id` STRING NOT NULL,
  `area_geometry` BYTES NOT NULL,
  `created_at` TIMESTAMP(6),
  `updated_at` TIMESTAMP(6),
  `rowtime` TIMESTAMP_LTZ(3) AS CAST(updated_at AS TIMESTAMP_LTZ(3))
)
(
  `proc_time` TIMESTAMP_LTZ(3) NOT NULL,
  `customer_id` STRING NOT NULL,
  `area_id` STRING NOT NULL
)
Exception in thread "main" org.apache.flink.table.api.ValidationException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's time attribute field
    at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableRule.validateSnapshotInCorrelate(LogicalCorrelateToJoinFromTemporalTableRule.scala:74)
    at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:259)
    at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
    at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
    at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:420)
    at org.apache.calcite.plan.hep.HepPlanner.executeRuleInstance(HepPlanner.java:243)
    at org.apache.calcite.plan.hep.HepInstruction$RuleInstance$State.execute(HepInstruction.java:178)
    at org.apache.calcite.plan.hep.HepPlanner.lambda$executeProgram$0(HepPlanner.java:211)
    at org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422)
    at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210)
    at org.apache.calcite.plan.hep.HepProgram$State.execute(HepProgram.java:118)
    at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:205)
    at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:191)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.immutable.Range.foreach(Range.scala:155)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
    at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
    at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:324)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
    at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
    at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
    at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:305)
    at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:259)
    ...

Process finished with exit code 1

The 'FOR SYSTEM_TIME AS OF' statement evaluation seems to be not working well or did I miss something? The referred column is the one from the "left table" as it is documented.

I'd like to understand also if this would be the best approach to enrich the incoming stream data with data from an external database.... I think this is some common scenario for people starting to develop streaming applications.


Solution

  • Not sure if this is a bug on Flink's query validator or if the documentation is not clear (or if I missed something).

    Anyway, I suspected that this issue might be happening due to the tables referred on the query not being on Flink's catalog. And after rewriting my application to make sure both tables were on the catalog, I got the expected behavior (Postgis being looked up whenever needed).

    Before adding the solution, I think that the issue is related to the transformation between a DataStream to a Table. When this happens (by using e.g. tblEnv.fromDataStream(kinesisDataStream, Schema.newBuilder()...)) the Table is not registered on the catalog... the same if you use a temporary table/view (e.g. tblEnv.createTemporaryView("Input", kinesisDataStream)).

    I could check the catalog and what was registered or not using this code:

    tblEnv.getCatalog(tblEnv.getCurrentCatalog()).ifPresent(catalog -> {
            log.info("Databases: {}", catalog.listDatabases());
            catalog.listDatabases().forEach(db -> {
                try {
                    log.info("Tables: {}", catalog.listTables(db));
                    log.info("Views: {}", catalog.listViews(db));
                    log.info("Functions: {}", catalog.listFunctions(db));
                } catch (DatabaseNotExistException e) {
                    throw new RuntimeException(e);
                }
            });
        });
    

    That said, this is the solution I used:

    // Make sure the Tables (specially the one with PROCTIME()) is registered on internal Catalog
    tblEnv.createTable("Input", TableDescriptor.forConnector("kinesis")...);
    tblEnv.createTable("CustomerArea", TableDescriptor.forConnector("jdbc")...);
    
    Table joinedTable = sedona.sqlQuery(
    "SELECT i.proc_time, i.customer_id, ca.area_id " +
            "FROM Input AS i " +
            "INNER JOIN CustomerArea FOR SYSTEM_TIME AS OF i.proc_time AS ca ON ca.customer_id = i.customer_id " +
            "WHERE ST_Intersects(i.geometry, ST_GeomFromWKB(ca.area_geometry))"
    );
    
    

    That was a simple solution but it has its drawbacks: I had already a working FlinkKinesisConsumer<> with some ProcessFunction/KeyedProcessFunction/ReduceFunction applied and I was forced to re-implement that processing flow.