Coming from Effective Kafka by Emil Koutanov:
The delete policy operates at the granularity of log segments. A background process, operating within a replica, looks at each inactive log segment to determine whether a segment is eligible for deletion.
Recall, only producer-initiated writes are allowed on the active segment; all other manipulations — deletion and compaction — may only occur on the inactive segments
But if I have
retention.ms
set to 30 secondslog.retention.check.interval.ms
to 1 second for the brokerand then produce 2-3 messages and stop, after 30ish seconds, the segment file is deleted and a new one is created.
The segment.ms
and size are default, so it's not the rolling behavior.
Is the author wrong about how the delete policy may actually work on active segments or am I misunderstanding what active means.
In Kafka, each partition has exactly one active segment that accepts incoming writes. Once this segment reaches configured limits (e.g., segment.ms
or segment.bytes
), Kafka rolls the segment, creating a new active segment and marking the previous one as inactive.
Retention policies (retention.ms
and retention.bytes
) apply only to inactive segments by default, as these are no longer receiving writes. Active segments, therefore, are typically exempt from deletion based on retention settings alone.
However
Kafka also includes a mechanism to handle cases where an active segment surpasses the retention.ms
limit, even if it hasn’t yet met segment.ms
or segment.bytes
thresholds. When this happens, Kafka force-rolls the active segment, marking it inactive and making it eligible for deletion under retention policies. This approach ensures that idle segments do not remain open indefinitely when they exceed retention.ms
, particularly in low-volume or idle topics.
When Kafka force-rolls and active segment that has breached theretention.ms
threshold and has stayed idle, you should see a log like this:
[Log partition=<topic>-<partition>, dir=<log_dir>] Found deletable segments with base offsets [0] due to Retention time 604800ms breach
See UnifiedLog.scala code for the implementation of this force-roll logic.
Here are some reference links that can help you understand more: