javareactive-programmingquarkusvert.xmutiny

Reactive endpoints in Quarkus returning swapped results


I'm pretty new to the whole reactive/mutiny/vert.x world and I have the following issue I can't quite figure out:

I've got two REST endpoints, they both retrieve some data (a count) for a certain device (or list of devices) the only difference between the two endpoints is the postgres (timescaleDB) table from which the count is fetched. Now, when I call the two endpoints concurrently, sometimes I get swapped results.

I've tried a few tweaks like using different types in the Uni<List<T>> returns or using blocking calls but nothing seems to work. I am out of ideas and most of all I do not understand how this can happen.

Here's the code from the REST to the Repository layer:

@Path("/api/prefix/foo")
public class DeviceCountResource {

    private static final Logger logger = Logger.getLogger(DeviceCountResource.class);

    @Inject
    DeviceEventService deviceEventService;

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("{agent_id}/device/status-change-count")
    public Uni<List<DeviceCountOutDto>> getDeviceStatusChangeCount(
            @PathParam("agent_id") Long agentId,
            @QueryParam("device_ids") @Separator(",") List<Long> deviceIds
    ) {
        return deviceEventService.getStatusChangeCount(agentId, deviceIds)
                .invoke(counters ->
                logger.debugf(
                        "Retrieved counters status change: %s",
                        counters
                ));
    }

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("{agent_id}/device/heartbeat-count")
    public Uni<List<DeviceCountOutDto>> getHeartbeatCount(
            @PathParam("agent_id") Long agentId,
            @QueryParam("device_ids") @Separator(",") List<Long> deviceIds
    ) {
        return deviceEventService.getHeartbeatCount(agentId, deviceIds)
                .invoke(counters ->
                logger.debugf(
                        "Retrieved counters heartbeat: %s",
                        counters
                )
        );
    }
}
@ApplicationScoped
public class DeviceEventService {

    private static final Logger logger = Logger.getLogger(
        DeviceEventService.class
    );

    @Inject
    DeviceEventRepository deviceEventRepository;

    @Inject
    DeviceStatusCountRepository deviceStatusCountRepository;



    @WithSpan
    public Uni<List<DeviceCountOutDto>> getStatusChangeCount(
        Long agentId, List<Long> deviceIds
    ) {
        return deviceStatusCountRepository.getStatusChangeCount(agentId, deviceIds);

    }

    @WithSpan
    public Uni<List<DeviceCountOutDto>> getHeartbeatCount(
        Long agentId, List<Long> deviceIds
    ) {
        return deviceStatusCountRepository.getHeartbeatCount(agentId,
                deviceIds);

    }
}
@ApplicationScoped
public class DeviceEventService {

    private static final Logger logger = Logger.getLogger(
        DeviceEventService.class
    );

    @Inject
    DeviceEventRepository deviceEventRepository;

    @Inject
    DeviceStatusCountRepository deviceStatusCountRepository;

    

    @WithSpan
    public Uni<List<DeviceCountOutDto>> getStatusChangeCount(
        Long agentId, List<Long> deviceIds
    ) {
        return deviceStatusCountRepository.getStatusChangeCount(agentId, deviceIds);

    }

    @WithSpan
    public Uni<List<DeviceCountOutDto>> getHeartbeatCount(
        Long agentId, List<Long> deviceIds
    ) {
        return deviceStatusCountRepository.getHeartbeatCount(agentId,
                deviceIds);

    }
}
@ApplicationScoped
public class DeviceStatusCountRepository {

    private static final Logger logger = Logger.getLogger(
            DeviceStatusCountRepository.class
    );

    @Inject
    @ReactiveDataSource("timescale")
    private Pool client;

    @WithSpan
    public Uni<List<DeviceCountOutDto>> getStatusChangeCount(long agentId, List<Long> deviceIds) {
        StringBuilder queryBuilder = new StringBuilder("SELECT device_id, up_down_count FROM table_1_with_count WHERE agent_id = $1");
        List<Object> params = new ArrayList<>();
        params.add(agentId);

        if (deviceIds != null && !deviceIds.isEmpty()) {
            queryBuilder.append(" AND device_id IN (");
            for (int i = 0, j = 2; i < deviceIds.size(); i++, j++) {
                if (i > 0) queryBuilder.append(",");
                queryBuilder.append("$");
                queryBuilder.append(j);
            }
            queryBuilder.append(")");
            params.addAll(deviceIds);
        }

        String query = queryBuilder.toString();
        logger.debugf("Query: %s", query);


        return client.withConnection( connection -> connection
        .preparedQuery(query)
        .execute(Tuple.from(params))
        .map(rows -> {
            logger.debugf("Query field up_down_count");
            List<DeviceCountOutDto> events = new ArrayList<>();
            for (Row row : rows) {
                DeviceCountOutDto event = new DeviceCountOutDto(
                        row.getLong("device_id"),
                        row.getInteger("up_down_count")
                );
                events.add(event);
            }
            return events;
        })
        );
    }

    @WithSpan
    public Uni<List<DeviceCountOutDto>> getHeartbeatCount(long agentId,
                                                                   List<Long> deviceIds) {
        StringBuilder queryBuilder = new StringBuilder("SELECT device_id, heartbeat_count FROM table_2_with_count WHERE agent_id = $1");
        List<Object> params = new ArrayList<>();
        params.add(agentId);

        if (deviceIds != null && !deviceIds.isEmpty()) {
            queryBuilder.append(" AND device_id IN (");
            for (int i = 0, j = 2; i < deviceIds.size(); i++, j++) {
                if (i > 0) queryBuilder.append(",");
                queryBuilder.append("$");
                queryBuilder.append(j);
            }
            queryBuilder.append(")");
            params.addAll(deviceIds);
        }

        String query = queryBuilder.toString();
        logger.debugf("Query: %s", query);

        return client.withConnection( connection -> connection
            .preparedQuery(query)
            .execute(Tuple.from(params))
            .map(rows -> {
                logger.debugf("Query field heartbeat_count");
                List<DeviceCountOutDto> events = new ArrayList<>();
                for (Row row : rows) {
                    DeviceCountOutDto event = new DeviceCountOutDto(
                            row.getLong("device_id"),
                            row.getInteger("heartbeat_count")
                    );
                    events.add(event);
                }
                return events;
            })
        );
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8" ?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.company.name</groupId>
    <artifactId>name</artifactId>
    <version>1.1.0-SNAPSHOT</version>

    <properties>
        <compiler-plugin.version>3.13.0</compiler-plugin.version>
        <maven.compiler.release>21</maven.compiler.release>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
        <quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
        <quarkus.platform.version>3.21.3</quarkus.platform.version>
        <skipITs>true</skipITs>
        <surefire-plugin.version>3.5.2</surefire-plugin.version>
        <org.mapstruct.version>1.6.3</org.mapstruct.version>
    </properties>

    <scm>
        <developerConnection>scm:git:https://bitbucket.org/company/name.git</developerConnection>
      <tag>HEAD</tag>
  </scm>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>${quarkus.platform.group-id}</groupId>
                <artifactId>${quarkus.platform.artifact-id}</artifactId>
                <version>${quarkus.platform.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>${quarkus.platform.group-id}</groupId>
                <artifactId>quarkus-cassandra-bom</artifactId>
                <version>${quarkus.platform.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-rest</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-arc</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-hibernate-validator</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-rest-jackson</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-health</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-openapi</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-jacoco</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-messaging-rabbitmq</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-jdbc-postgresql</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-container-image-docker</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-opentelemetry</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-junit5</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.datastax.oss.quarkus</groupId>
            <artifactId>cassandra-quarkus-client</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkiverse.loggingsentry</groupId>
            <artifactId>quarkus-logging-sentry</artifactId>
            <version>2.1.3</version>
        </dependency>
        <dependency>
            <groupId>io.rest-assured</groupId>
            <artifactId>rest-assured</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.mapstruct</groupId>
            <artifactId>mapstruct</artifactId>
            <version>${org.mapstruct.version}</version>
        </dependency>
        <dependency>
            <groupId>de.siegmar</groupId>
            <artifactId>fastcsv</artifactId>
            <version>3.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.awaitility</groupId>
            <artifactId>awaitility</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-junit5-mockito</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>io.quarkus.platform</groupId>
                <artifactId>quarkus-maven-plugin</artifactId>
                <version>${quarkus.platform.version}</version>
                <extensions>true</extensions>
                <executions>
                    <execution>
                        <goals>
                            <goal>build</goal>
                            <goal>generate-code</goal>
                            <goal>generate-code-tests</goal>
                            <goal>native-image-agent</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${compiler-plugin.version}</version>
                <configuration>
                    <parameters>true</parameters>
                    <release>${maven.compiler.release}</release>
                    <annotationProcessorPaths>
                        <path>
                            <groupId>org.mapstruct</groupId>
                            <artifactId>mapstruct-processor</artifactId>
                            <version>${org.mapstruct.version}</version>
                        </path>
                    </annotationProcessorPaths>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>${surefire-plugin.version}</version>
                <configuration>
                    <configuration>
                        <excludeTags>containers</excludeTags>
                    </configuration>
                    <systemPropertyVariables>
                        <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
                        <maven.home>${maven.home}</maven.home>
                    </systemPropertyVariables>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-failsafe-plugin</artifactId>
                <version>${surefire-plugin.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>integration-test</goal>
                            <goal>verify</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <systemPropertyVariables>
                        <native.image.path>
                            ${project.build.directory}/${project.build.finalName}-runner</native.image.path>
                        <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
                        <maven.home>${maven.home}</maven.home>
                    </systemPropertyVariables>
                    <configuration>
                        <excludeTags>containers</excludeTags>
                    </configuration>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-pmd-plugin</artifactId>
                <version>3.22.0</version>
                <configuration>
                    <maxAllowedViolations>150</maxAllowedViolations>
                    <rulesets>
                        <ruleset>/rulesets/java/quickstart.xml</ruleset>
                    </rulesets>
                </configuration>
                <executions>
                    <execution>
                        <phase>verify</phase>
                        <goals>
                            <goal>check</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-release-plugin</artifactId>
                <version>3.1.1</version>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>versions-maven-plugin</artifactId>
                <version>2.18.0</version>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>3.6.0</version>
                <executions>
                    <execution>
                        <id>parse-version</id>
                        <phase>initialize</phase>
                        <goals>
                            <goal>parse-version</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.cyclonedx</groupId>
                <artifactId>cyclonedx-maven-plugin</artifactId>
                <version>2.9.1</version>  <!-- Use the latest version -->
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>makeAggregateBom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <profiles>
        <profile>
            <id>native</id>
            <activation>
                <property>
                    <name>native</name>
                </property>
            </activation>
            <properties>
                <skipITs>false</skipITs>
                <quarkus.native.enabled>true</quarkus.native.enabled>
            </properties>
        </profile>
    </profiles>
</project>

Here's the dependency list:

dependencies


Solution

  • We have finally understood the issue! A piece of the puzzle was missing, as in our testing environment there is a connection pooler (PgBouncer) in front of the database. Since PgBouncer was configured with “Transaction Pooling” some PostgreSQL features (like PREPARE) were broken (see https://www.pgbouncer.org/features.html).

    My understanding is that the connection pooler was in some cases swapping the connections on which the queries (SELECT) were being performed after the queries were prepared.

    I can see two working solutions:

    Solution 1

    Just swap PgBouncer to "Session Pooling". In this way the above code just works, as the connection pool is handled by the library in the same way as it would be if there was no PgBouncer in between and it was handling a pool of connections to postrgres

    Solution 2

    Do everything in a single transaction, that is use:

    return client.withTransaction(trans -> trans.preparedQuery(query)
                .execute(Tuple.from(params))
                .map(rows -> {
                    logger.debugf("Query field heartbeat_count");
                    List<DeviceCountOutDto> events = new ArrayList<>();
                    for (Row row : rows) {
                        DeviceCountOutDto event = new DeviceCountOutDto(
                                row.getLong("device_id"),
                                row.getInteger("heartbeat_count")
                        );
                        events.add(event);
                    }
                    return events;
                })
            );