apache-flinkflink-streamingflink-sqlpyflink

Over Windows in Batch Mode


In the below minimal working example (executed with Flink v. 1.20), the use over an over_window in batch_mode always yields the following error message, although the time column is TIMESTAMP(3):

org.apache.flink.table.api.ValidationException: Ordering must be defined on a time attribute.

When I execute the same script in streaming mode, it works as expected. Is this expected behaviour or am I missing something?

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
    EnvironmentSettings,
    StreamTableEnvironment,
)
from pyflink.table.expressions import col, lit
from pyflink.table.window import Over


if __name__ == "__main__":
    source = """
        CREATE TABLE src_trade (
            `id` INT,
            `unix_time` INT,
            `price` INT,
            `time` AS TO_TIMESTAMP_LTZ(`unix_time` * 1000, 3),
            WATERMARK FOR `time` AS `time`
        ) WITH (
            'connector' = 'datagen',
            'fields.unix_time.start' = '0',
            'fields.unix_time.end' = '1000',
            'fields.unix_time.kind' = 'sequence',
            'fields.price.min' = '0',
            'fields.price.max' = '10',
            'fields.price.kind' = 'random',
            'fields.id.min' = '0',
            'fields.id.max' = '1',
            'rows-per-second' = '1'
        )
    """

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    env_settings = (
        EnvironmentSettings.new_instance()
        .in_batch_mode()  # .in_streaming_mode()  # <- WORKS!
        .build()
    )
    t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
    t_env.execute_sql(source)

    (
        t_env.from_path('src_trade')
        .over_window(
            Over
            .partition_by(col('id'))
            .order_by(col('time'))
            .preceding(lit(24).hours)
            .alias('over_window')
        )
        .select(
            col('*'),
            col('price').max.over(col('over_window')).alias('max_price'),
        )
        .execute().print()
    )


Solution

  • Over windows are apparently not supported in Batch mode.