I have a Kafka topic in which I produce an entry every 2-3 seconds Then I have PyFlink job that will format the entries and send them to a db
here's my Flink env setup
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(3, 10000))
env.set_parallelism(4)
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
kafka_consumer = FlinkKafkaConsumer(
topics=SOURCE_TOPIC,
deserialization_schema=deserialization_schema,
properties=KAFKA_PROPERTIES
)
kafka_consumer.set_start_from_group_offsets()
ds = env.add_source(kafka_consumer, "DataFlowSource")
then here's the part where the problem happens
class FormatData(BroadcastProcessFunction):
def process_element(self, value, ctx):
# something happens on value here
time = ctx.current_processing_time() / 1000
yield metrics_stream_tag, (time, value)
yield ("some other information for another table")
Now i am sure that there's at least 2 seconds between each value yet when i look at the db or print the stream i see something like this
2024-04-30 11:48:16+00 5
2024-04-30 11:48:16+00 7
2024-04-30 11:48:16+00 12
and then it keeps going for 10-25 times then the time changes to the current time like
2024-04-30 11:50:22+00 5
and then repeat
Some more context
metrics_stream = ds.get_side_output(metrics_stream_tag)
# metrics_stream.add_sink(psql_metrics_sink)
metrics_stream.print()
I tried to use datetime.now.timestamp()
but this didn't change anything
what I'm expecting is getting the time to be 2-3 seconds apart, it shouldn't get stuck for 2 minutes each before changing
So I still don't know why it is this way but I know a solution
it all starts with the output types, I had it as Types.FLOAT()
this causes these weird conversions when I use Types.DOUBLE()
or Types.STRING()
instead it works as a charm, so if someone is here and they know why please let me know