In the below minimal working example (executed with Flink v. 1.20), the use over an over_window
in batch_mode
always yields the following error message, although the time
column is TIMESTAMP(3)
:
org.apache.flink.table.api.ValidationException: Ordering must be defined on a time attribute.
When I execute the same script in streaming mode, it works as expected. Is this expected behaviour or am I missing something?
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
EnvironmentSettings,
StreamTableEnvironment,
)
from pyflink.table.expressions import col, lit
from pyflink.table.window import Over
if __name__ == "__main__":
source = """
CREATE TABLE src_trade (
`id` INT,
`unix_time` INT,
`price` INT,
`time` AS TO_TIMESTAMP_LTZ(`unix_time` * 1000, 3),
WATERMARK FOR `time` AS `time`
) WITH (
'connector' = 'datagen',
'fields.unix_time.start' = '0',
'fields.unix_time.end' = '1000',
'fields.unix_time.kind' = 'sequence',
'fields.price.min' = '0',
'fields.price.max' = '10',
'fields.price.kind' = 'random',
'fields.id.min' = '0',
'fields.id.max' = '1',
'rows-per-second' = '1'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env_settings = (
EnvironmentSettings.new_instance()
.in_batch_mode() # .in_streaming_mode() # <- WORKS!
.build()
)
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(source)
(
t_env.from_path('src_trade')
.over_window(
Over
.partition_by(col('id'))
.order_by(col('time'))
.preceding(lit(24).hours)
.alias('over_window')
)
.select(
col('*'),
col('price').max.over(col('over_window')).alias('max_price'),
)
.execute().print()
)
Over windows are apparently not supported in Batch mode.