Can you help me understand? There is a Spring Boot (3.0.2) service with Spring Data R2DBC repository and r2dbc-postgresql driver. By default, a pool of 10 connections and 10 reactor-tcp-nio threads is created. I have enabled logging of R2DBC queries in the database and logging in the operators after the reactive repository. 1000 HTTP requests per second have been generated using JMeter. The result is that I see that all queries in the database and all operators after the reactive repository are executed in the reactor-tcp-nio-1 thread. If a breakpoint is set in the doOnNext operator (not on the line, but on the lambda) in the IDE, the reactor-tcp-nio-1 thread will suspend, and all other threads will still not be utilized.
package com.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class R2DBCApplication {
private static final Logger log = LoggerFactory.getLogger(R2DBCApplication.class);
public static void main(final String[] args) {
SpringApplication.run(R2DBCApplication.class, args);
}
}
package com.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.util.UUID;
@RestController
public class ValueController {
private static final Logger log = LoggerFactory.getLogger(ValueController.class);
@Autowired
private ValueRepository repository;
@GetMapping("/value")
public Mono<String> get() {
final var uuid = UUID.randomUUID().toString();
log.info("start for uuid = {}", uuid);
return repository
.findById(1L)
.map(ValueEntity::getId)
.map(String::valueOf)
.switchIfEmpty(Mono.fromSupplier(() -> uuid))
.doOnNext(v -> log.info("after switchIfEmpty uuid = {}", v));
}
}
package com.example;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;
@Table("value")
public class ValueEntity {
@Id
@Column("id")
private Long id;
@Column("value")
private String value;
public Long getId() {
return id;
}
public void setId(final Long id) {
this.id = id;
}
public String getValue() {
return value;
}
public void setValue(final String value) {
this.value = value;
}
}
package com.example;
import org.springframework.data.r2dbc.repository.R2dbcRepository;
public interface ValueRepository extends R2dbcRepository<ValueEntity, Long> {
}
server.port: 9080
spring:
application.name: r2dbc-service
r2dbc:
url: r2dbc:postgresql://localhost:5433/postgres
username: postgres
password: 12345
properties:
schema: public
logging.level.root: DEBUG
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Is this the expected behavior? I assumed that all 10 reactor-tcp-nio threads would be utilized. What can be done to make queries and operator execution happen in different threads? (reactor-tcp-nio-1, ..., reactor-tcp-nio-10).
Do you really need to add publishOn or subscribeOn after every call to a reactive repository?
This behavior is likely already fixed. See https://github.com/r2dbc/r2dbc-pool/issues/190