I'm working on a simple data aggregation example. It just groups by record ID and calculate average values over a tumbling window of 1 second.
It works until the step 3 but records are not sent into the Kafka topic. It may be related to watermark assignment?
Could you please inform me how to fix it? Attached are the job status screen shot and source scripts.
Pyflink app
import os
import time
from typing import Iterable, Tuple
from pyflink.common import Row, WatermarkStrategy
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import DataStream
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.window import TumblingEventTimeWindows, Time
from pyflink.datastream.functions import ProcessWindowFunction
from pyflink.table import StreamTableEnvironment
from pyflink.datastream.connectors.kafka import (
KafkaSink,
KafkaRecordSerializationSchema,
DeliveryGuarantee,
)
from pyflink.datastream.formats.json import JsonRowSerializationSchema
from models import SensorReading
RUNTIME_ENV = os.getenv("RUNTIME_ENV", "local")
BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:29092")
class AggreteProcessWindowFunction(ProcessWindowFunction):
def process(
self, key: str, context: ProcessWindowFunction.Context, elements: Iterable[Tuple[int, int]]
) -> Iterable[Row]:
id, count, temperature = SensorReading.process_elements(elements)
yield Row(
id=id,
timestamp=int(context.window().end),
temperature=round(temperature / count, 2),
)
def define_workflow(source_stream: DataStream):
sensor_stream = (
source_stream.key_by(lambda e: e[0])
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.process(AggreteProcessWindowFunction(), output_type=SensorReading.get_value_type())
)
return sensor_stream
if __name__ == "__main__":
"""
## local execution
python src/c01_sensor_reading.py
## cluster execution
docker exec jobmanager /opt/flink/bin/flink run \
--python /tmp/src/c01_sensor_reading.py \
--pyFiles file:///tmp/src/models.py \
-d
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
if RUNTIME_ENV == "local":
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
jar_files = ["flink-faker-0.5.3.jar", "flink-sql-connector-kafka-1.17.1.jar"]
jar_paths = tuple(
[f"file://{os.path.join(CURRENT_DIR, 'jars', name)}" for name in jar_files]
)
print(jar_paths)
env.add_jars(*jar_paths)
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql(
"""
CREATE TABLE sensor_source (
`id` INT,
`rand` INT
)
WITH (
'connector' = 'faker',
'rows-per-second' = '10',
'fields.id.expression' = '#{number.numberBetween ''0'',''20''}',
'fields.rand.expression' = '#{number.numberBetween ''0'',''100''}'
);
"""
)
class DefaultTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp):
return int(time.time_ns() / 1000000)
source_stream = t_env.to_append_stream(
t_env.from_path("sensor_source"), Types.TUPLE([Types.INT(), Types.INT()])
).assign_timestamps_and_watermarks(
WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
DefaultTimestampAssigner()
)
)
sensor_sink = (
KafkaSink.builder()
.set_bootstrap_servers(BOOTSTRAP_SERVERS)
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("sensor-reading")
.set_key_serialization_schema(
JsonRowSerializationSchema.builder()
.with_type_info(SensorReading.get_key_type())
.build()
)
.set_value_serialization_schema(
JsonRowSerializationSchema.builder()
.with_type_info(SensorReading.get_value_type())
.build()
)
.build()
)
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
)
# define_workflow(source_stream).print() # it works!
define_workflow(source_stream).sink_to(sensor_sink).name("sensor_sink").uid("sensor_sink")
env.execute()
Data Model
import dataclasses
from typing import Iterable, Tuple
from pyflink.common.typeinfo import Types
@dataclasses.dataclass
class SensorReading:
id: str
timestamp: int
temperature: float
@staticmethod
def process_elements(elements: Iterable[Tuple[int, int]]):
id, count, temperature = None, 0, 0
for e in elements:
next_id = f"sensor_{e[0]}"
if id is not None:
assert id == next_id
id = next_id
count += 1
temperature += e[1] / 100 * 20
return id, count, temperature
@staticmethod
def get_key_type():
return Types.ROW_NAMED(
field_names=["id"],
field_types=[Types.STRING()],
)
@staticmethod
def get_value_type():
return Types.ROW_NAMED(
field_names=["id", "timestamp", "temperature"],
field_types=[Types.STRING(), Types.LONG(), Types.DOUBLE()],
)
You are using event time windows, which requires watermarks. You'll have to specify a real WatermarkStrategy, not WatermarkStrategy.no_watermarks()
.
It looks like the events should be in order (by time), so you can use
WatermarkStrategy.for_monotonous_timestamps()
instead.