I wish to implement the transactional outbox pattern in a system that uses a database table as a transactional outbox in concert with Kafka in order to guarantee exactly-once delivery of my messages to Kafka. Afterwards, I use Kafka Streams which is setup to enable exactly-once processing. Thus, this system should guarantee that each client message is delivered and processed exactly once.
The system in this case looks as follows: Client -> API -> DB -> Kafka
.
Now I understand it, the basic transactional outbox pattern with a polling publisher goes like this:
Write the incoming message in the message
table and the message_outbox
table in the same database transaction.
messageId
).messageId
from Client -> API -> DB
are not reprocessed, at the API
, as a consumer of the Client
message
can check the message
table and ignore the duplicate.Using the polling publisher pattern, either as a separate, dedicated CDC service or as a "publisher" thread in the API
service, poll for new messages.
Acquire distributed lock (for example, using ShedLock).
Update a batch of message_outbox
events to set a "locked" status:
UPDATE message_outbox SET locked = true, locked_by = $self_instance WHERE message_id IN (SELECT message_id FROM message_outbox WHERE locked = false OR locked_by = $self_instance ORDER BY created_at)
Fetch the acquired batch
SELECT * FROM message_outbox WHERE locked = true AND locked_by = $self_instance ORDER BY created_at
After acquiring a lock on a batch of message
events, produce the records to Kafka using idempotent producers.
Done
I have read this approximate implementation sketch in very many blogs, articles, and other SO answers. However, I fail to understand how this supports exactly-once delivery in all cases. I am likely missing some key technical point somewhere in the process.
For example, let's say in step 3, the producer produces the Kafka record and fails to receive an ACK from Kafka. Thus the message
event will not be deleted from the outbox table (i.e. not sent
) and it will be retried. This is no problem with an idempotent producer in the same session - on the next scheduled run it will be retried and the fencing token will make sure that no duplicates are written.
But then there is the case where the producer completely fails without possibility of recovery (i.e. Application receices an OOM) before an ACK is received. In this case:
message
events are not deleted.Now, if/when another instance starts to take over, it will still see the same message
event in the message_outbox
table and it will still retry sending it. As far as I can see, this guarantess only at-least-once delivery, even with idempotent and transactional producers.
As far as I can see, even with an external CDC Service or something like Debezium, this issue still persists - the CDC service might crash before acknowledging that an event was processed. So even in those cases, exactly-once delivery would not be guaranteed.
What am I missing?
Addendum due to the answers below:
This pattern basically make the recovery process idempotent, and thus you get the guarantees you need.
I don't know how Debezium implement this, but the only way to get EOS is to read back from Kafka.
I see. That makes a lot of sense and is, I think, the key point I was missing.
Am I correct in thinking that the reworked flow would then look something like the following:
API
receives a message
and stores it in message_outbox
transactionally.message_outbox
entries, sets the messages in the current batch to PENDING
, and sends these to the target topic in Kafka using a transactional, idempotent Producer.PENDING
is important so that another instance of this process doesn't try to send these again.read_committed
isolation levelmessage_outbox
entry that with the same key/id.Eg, after a crash, you need to use a consumer in "read_committed" mode, to read the tail of the topic to see what messages got successfully written by the last producer TX, and do the cleanup inside the DB base on this information.
This does beg the question though: how does one determine the last known good offset to start consuming from after a crash to facilitate the recovery process (I assume this is what is meant by the tail of the topic, as opposed to the latest
offset? I thought normally the head is the latest)? I can think of two ways:
Run this consumer continuously in its own consumer group so that offsets are automatically tracked. On (an idempotent) deletion of the associated outbox message, offsets are committed.
Process the __transaction_state
itself. Since all associated producer transactional state is stored there, one could ostensibly find the last known committed offset.
Would there be an issue going with route 1, i.e. having the same service subscribe to its own topic with a consumer group and then delete the corresponding messages as it reads them being successfully produced to Kafka?
Is there perhaps a third option I am missing?
You are right. To make this work, you need to use the transactional producer, to also record the progress you made writing into the Kafka topic.
Eg, after a crash, you need to use a consumer in "read_committed" mode, to read the tail of the topic to see what messages got successfully written by the last producer TX, and do the cleanup inside the DB base on this information.
Also, the producer should use the same transactions.id
across restarts, to you get proper fencing. Thus, after a crash, before you use the consumer to read back from the topic, you would need to call producer.initTransaction()
to ensure all pending transactions are completed (either aborted or fully committed), and that potential zombie producers are fenced off. Otherwise, what you read is not reliable information.
This pattern basically make the recovery process idempotent, and thus you get the guarantees you need.
I don't know how Debezium implement this, but the only way to get EOS is to read back from Kafka. Is the actual data topic does not contain enough information for proper recovery, you could also create an additional "tracking topic" -- you would write all necessary metadata into this topic, using the same producer that writes into the actual data topic, within the same TX.
Update (for question addendum):
The new workflow does make sense, however, for the "happy path" you don't need to read back from Kafka. As long as nothing crashes, you just write to Kafka, commit the TX, and delete the pending messages right afterwards.
Only after a crash, your process would first need to see, if there is any pending messages, and how to do the recovery. It needs to figure out if these messages did make it into Kafka or not. If not, it repeats the write into Kafka, if yes, it skips writing and only deletes the pending messages from the database.
This does beg the question though: how does one determine the last known good offset to start consuming from after a crash to facilitate the recovery process (I assume this is what is meant by the tail of the topic, as opposed to the latest offset? I thought normally the head is the latest)? I can think of two ways:
Haha. Head and tail are often confused. Head is oldest message, and tail is the end of the topic. Producers append new messaged to the tail, and thus, consumers also read from the tail -- you only read from the head when re-consuming messages from the beginning. (Think of the unix command head
and tail
which work exactly like this, too.)
So yes, "latest" is not sufficient, that's why I did use the term "tail". I did not think about it in detail, but I thing you could add this information to a helper table in the DB? Each time, you mark output-rows as "pending" you get the topic end-offset and store it in this helper table (using the same DB tx; and maybe also delete it when deleting "pending" records after they got written?). You can also use the producer Callback
to get the offset of the written messages and use this (to avoid end explicit endOffset()
call to Kafka).
However, I would not run a consumer continuously, as it does sound very expensive and it should be unnecessary. As pointed out above, as long as nothing crashes, you don't need to do anything special, like reading back from Kafka -- if the Kafka TX commits, you get the signal and can delete. You just need to worry about the case after a crash, ie, the recovery phase during restart. Only for this case, you need to know the "start offset" and read back from Kafka, so the start-offset must be offset of the last known successful write (ie offset for which we know that the TX did commit) to Kafka.