postgresqlreactiver2dbcr2dbc-postgresql

How R2DBC Statement.fetchsize exaclty works


I am using the r2dbc-postgresql driver. Let's say we have a table with 1000 records and the fetchSize is 100:

connectionMono.flatMapMany(
                connection -> connection
                        .createStatement("select age from users")
                        .fetchSize(100)
                        .execute())

How many network calls will be executed? I know that using JDBC Statement.SetFetchsize, driver will fetch all rows in 10 batches of 100 rows each.


Solution

  • Looking at the code in the r2dbc driver, the behaviour is the same: it fetches rows by chunk with the specified size, so 100 in your case.

    Here is the code of the method handling that in ExtendedQueryMessageFlow:

        /**
         * Execute the query and indicate to fetch rows in chunks with the {@link Execute} message.
         *
         * @param bindFlow  the initial bind flow
         * @param client    client to use
         * @param portal    the portal
         * @param fetchSize fetch size per roundtrip
         * @return the resulting message stream
         */
        private static Flux<BackendMessage> fetchCursored(Flux<FrontendMessage> bindFlow, Client client, String portal, int fetchSize) {
    
            DirectProcessor<FrontendMessage> requestsProcessor = DirectProcessor.create();
            FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();
            AtomicBoolean isCanceled = new AtomicBoolean(false);
    
            return client.exchange(bindFlow.concatWithValues(new CompositeFrontendMessage(new Execute(portal, fetchSize), Flush.INSTANCE)).concatWith(requestsProcessor))
                .handle((BackendMessage message, SynchronousSink<BackendMessage> sink) -> {
                    if (message instanceof CommandComplete) {
                        requestsSink.next(new Close(portal, PORTAL));
                        requestsSink.next(Sync.INSTANCE);
                        requestsSink.complete();
                        sink.next(message);
                    } else if (message instanceof ErrorResponse) {
                        requestsSink.next(Sync.INSTANCE);
                        requestsSink.complete();
                        sink.next(message);
                    } else if (message instanceof PortalSuspended) {
                        if (isCanceled.get()) {
                            requestsSink.next(new Close(portal, PORTAL));
                            requestsSink.next(Sync.INSTANCE);
                            requestsSink.complete();
                        } else {
                            requestsSink.next(new Execute(portal, fetchSize));
                            requestsSink.next(Flush.INSTANCE);
                        }
                    } else {
                        sink.next(message);
                    }
                })
                .as(flux -> Operators.discardOnCancel(flux, () -> isCanceled.set(true)));
        }