postgresqljdbctransactionsquarkusreactive

Postgres LISTEN/NOTIFY in Quarkus - transaction for each notification


I'm trying to learn Quarkus by creating a simple MQ above Postgres. I'm sticking to JDBC to keep things as performant as possible, and I've set up a trivial messages table along with triggers that NOTIFY when a new record is added.

I want to use PgSubscriber to LISTEN to these notifications, and provide an API that would allow you to react to each such notification - either by passing a handler, or by exposing a Multi.

Here's the crucial thing: I want the semantics of the API to be that each notification is wrapped in its own transaction, including the client handler/downstream operations. I've tried @Transactional, QuarkusTransaction.<whatever>, client.withTransaction and who knows what else, and I just can't get it to work.

The worst part is, I'm not even sure which parts I might be doing wrong:

I don't particularly need an answer to all/any of the above, I think I can figure it out once I see some working code and iterate from there, but I just can't seem to get there.

Would anybody be willing to provide a minimal example of how to get from a val channel: PgChannel = pgSubscriber.channel(topic) to an API that has the transactional semantics described above, and also provide a simple demonstration of how I would do "Insert a record in the handler/downstream, then throw an exception"?

Thanks!


Solution

  • Looks like the night is darkest before dawn - I finally managed to crack a working version, which then allowed me to finally make sense of all the stuff that was confusing me.

    The heart of my confusions was this: transactions in JDBC (and anything that builds on it) and reactive clients are completely incompatible and cannot be interchanged. This is because, fundamentally, they go through entirely different database connections, managed by entirely different pools and clients:

    As a consequence:

    Now, a big reason for this confusion was what is said in the docs on transactions and reactive extensions, as from that, it seemed like these two worlds are interoperable. However, this only applied to reactive pipelines using JDBC connections, and NOT to reactive pipelines using reactive clients. For pipelines using JDBC connections, and only those, the JDBC transaction is propagated via context propagation so its lifecycle matches the lifecycle of the reactive pipeline, not the function from which it is returned.

    Another source of confusion: for the reactive client specifically, if you want to perform multiple operations within the reactive transaction, you need to manually pass around the connection - unlike with JDBC (and everything that builds on it, such as JPA, Hibernate, etc.) there's no behind-the-scenes magic that extracts the connection from some place. I think this could be done in theory, but it's not done in practice, and this key difference is not really emphasized in the docs.

    Given that, the answers to my questions are:

    A quick, dirty, begging-to-be-cleaned-up version that works with the reactive approach is

    // Implementation
    
    override fun subscribe(topic: String, handler: SqlConnection.(Message) -> Uni<Unit>, termination: () -> Unit) {
            val channel: PgChannel = pgSubscriber.channel(topic)
    
            channel.handler { id ->
                client.connection.map { it.begin() }
                client.withTransaction { connection ->
                    connection.preparedQuery("SELECT id, topic, payload, created_at FROM $TABLE_NAME WHERE id = $1")
                        .execute(Tuple.of(UUID.fromString(id)))
                        .map { rowSet ->
                            val row = rowSet.first()
                            Message(
                                id = row.getUUID("id"),
                                topic = row.getString("topic"),
                                payload = objectMapper.readTree(row.getString("payload")),
                                createdAt = row.getLocalDateTime("created_at").atZone(ZoneId.systemDefault())
                            ).also { logger.info("Fetched NOTIFY message: id=$it.id, topic=$it.topic") }
                        }
                        .emitOn(Infrastructure.getDefaultExecutor())
                        .flatMap { connection.handler(it) }
                        .onTermination().invoke(termination)
                }.subscribe().with(
                    {},
                    { e -> logger.error("Failed processing message $id", e) }
                )
            }
        }
    
    override fun publish(topic: String, payload: JsonNode): Uni<Message> {
            val jsonPayload = payload.toString()
            return client
                .preparedQuery("INSERT INTO $TABLE_NAME (topic, payload) VALUES ($1, $2::jsonb) RETURNING id, created_at")
                .execute(Tuple.of(topic, jsonPayload))
                .onItem().transform { rowSet ->
                    val row = checkNotNull(rowSet.firstOrNull()) { "Unable to publish message for topic $topic" }
                    Message(
                        id = row.getUUID("id"),
                        topic = topic,
                        payload = payload,
                        createdAt = row.getLocalDateTime("created_at").atZone(ZoneId.systemDefault())
                    ).also {
                        logger.info("Published message: id='${it.id}', topic='${it.topic}'")
                    }
                }
        }
    
    // Test
    @Test
        fun `subscribe should isolate transactions between messages`() {
            val latch = CountDownLatch(2)
            val failedMessageIndex = AtomicInteger(-1)
            val successMessageIndex = AtomicInteger(-1)
    
            val otherTopic = "otherTopic"
            val otherPayload = objectMapper.createObjectNode().put("otherIndex", 1)
    
            // Subscribe with a handler that will fail for one message but succeed for another
            messageQueue.subscribe(testTopic, { message ->
                val jsonPayload = otherPayload.toString()
                preparedQuery("INSERT INTO messages (topic, payload) VALUES ($1, $2::jsonb)")
                    .execute(Tuple.of(otherTopic, jsonPayload))
                    .map {
                        val index = message.payload.get("index").asInt()
    
                        if (index == 1) {
                            successMessageIndex.set(index)
                        } else if (index == 2) {
                            failedMessageIndex.set(index)
                            // Throwing an exception to simulate a failure
                            throw IllegalStateException("Simulated failure for message 2")
                        }
                    }
            }, { latch.countDown() })
    
            // Publish two messages
            val payload1 = objectMapper.createObjectNode().put("index", 1)
            messageQueue.publish(testTopic, payload1).await().indefinitely()
    
            val payload2 = objectMapper.createObjectNode().put("index", 2)
            messageQueue.publish(testTopic, payload2).await().indefinitely()
    
            // Wait for both messages to be processed
            assertTrue(latch.await(1, TimeUnit.SECONDS))
            
            // Verify the first message was processed successfully
            assertEquals(1, successMessageIndex.get())
            // Verify the second message attempted processing but failed
            assertEquals(2, failedMessageIndex.get())
    
            // Verify only one message was published to otherTopic
            var otherTopicMessageCount = dataSource.connection.use { connection ->
                val statement = connection.prepareStatement(
                    "SELECT count(*) FROM messages WHERE topic = ?"
                )
                statement.setString(1, otherTopic)
                val resultSet = statement.executeQuery()
                resultSet.next()
                resultSet.getInt(1)
            }
            
            assertEquals(1, otherTopicMessageCount, "Only one message should have been published to otherTopic")
        }
    

    Hope this helps any wanderers that stumble upon this.