I have a PostgreSQL events table partitioned by event_timestamp:
CREATE TABLE events
(
id SERIAL PRIMARY KEY,
event_timestamp TIMESTAMP NOT NULL,
processed BOOLEAN DEFAULT FALSE,
payload JSONB
) PARTITION BY RANGE (event_timestamp);
Currently, a single worker polls and processes events, marking them as processed to avoid reprocessing. The query used is:
SELECT *
FROM events
WHERE processed = false
ORDER BY event_timestamp
LIMIT 10_000;
To increase throughput, I need multiple workers. However, this risks duplicate processing as workers may select the same events simultaneously.
I'm seeking an efficient strategy to allow multiple workers to process events concurrently without duplicates. The solution should ensure each event is processed exactly once. How can I achieve this in PostgreSQL? Any guidance or examples would be greatly appreciated.
You can use explicit row locks. Add FOR UPDATE SKIP LOCKED
at the end of this select
and that's it:
SELECT *
FROM events
WHERE processed = false
ORDER BY event_timestamp
LIMIT 10_000
FOR UPDATE SKIP LOCKED;--here
Once worker A reads their 10k rows, they leave them locked FOR UPDATE
until they COMMIT
or ROLLBACK
their transaction. If another worker requests another 10k before then, they'll see the first 10k is locked and they'll skip them thanks to SKIP LOCKED
.
Here's a thread where you can find this demo1 showing how workers collide and all grab the same row, and how they begin to skip those locked rows and each ends up going for a different one in demo2, after adding the locks.
Make sure your workers use separate sessions/transactions - some connection pools can be configured to re-use the same session and transaction for different queries, which won't work with this type of locking.
You might want to take a look at NOTIFY
/pg_notify()
and LISTEN
. You can CREATE TRIGGER t1 AFTER INSERT ON events
and whenever there's something coming in, the trigger can immediately pg_notify()
on the same channel a sleeping client is LISTEN
ing on, to wake it up and make it deploy a worker to process the newly added events.