I am trying to set up a dataflow beam streaming pipeline in python where source is kafka and sink is postgres table (Refer my pipeline code below). Kafka topic has multiple partitions with multiple brokers. When I run this pipeline, I observe that there is continuous increase in workers' memory.
I am running with two workers, each using n2d-standard-64 machine. Message inflow from kafka is quite high. My assumption is that messages are not being processed by pipeline at that pace. Due to which many messages (unprocessed messages) remain in memory for longer and workers memory utilization keeps on increasing and finally result into OOM error.
I tried few different things to control memory usage:
None of the above solution helped to control the growth of workers' memory. Can someone suggest a solution to control Kafka inflow in below dataflow beam pipeline?
Also is it possible to do commit offsets manually after writing message to postgres in beam pipeline?
with beam.Pipeline(options=options) as pipeline:
kafka_config = {
'bootstrap.servers': ','.join(KAFKA_BOOTSTRAP_SERVERS),
'group.id': 'my_group_id',
'auto.offset.reset': 'earliest',
'enable.auto.commit': 'true',
}
_ = (
pipeline
| 'Kafka Read'
>> ReadFromKafka(
consumer_config=kafka_config,
topics=[KAFKA_TOPIC],
)
| 'filter' >> beam.Map(apply_filter)
| 'Write'
>> beam.ParDo(
PostgresWriter())
)
My initial assumption was wrong. OOM error was not due to high inflow from kafka. Rather it seems it was due to latest beam versions (2.55.0 and above). Once I switched to version 2.53.0, there is no OOM error