spring-bootkotlin-coroutinesspring-data-r2dbcr2dbc

How to perform simple streaming queries using Spring R2DBC?


I am looking for the equivalent of jdbcTemplate.queryForStream using R2DBC. I'm new to R2DBC, but this sounds like an ideal use case for it.

So far all I can find are examples of how to use spring-data-r2dbc for a JPA-like experience. This is really cool, but in my case this is overly complicated. I don't need any entity-mapping features or a persistence framework. I only need to perform simple, non-blocking queries. However, I can't find a single example of that in their docs or anything that even looks like it could be used for that purpose.

So, how can I perform basic queries with Spring + R2DBC? Bonus points if it integrates well with Kotlin coroutines like the CoroutineCrudRepository does, though this isn't necessary.


Solution

  • I think I found the way to do it, but I have not tested this code extensively yet. Implemented as an extension function on r2dbc.core.DatabaseClient

    import io.r2dbc.spi.Row
    import io.r2dbc.spi.RowMetadata
    import kotlinx.coroutines.flow.Flow
    import kotlinx.coroutines.reactive.asFlow
    import org.springframework.r2dbc.core.DatabaseClient
    
    /**
     * Trying to mimic the behavior of NamedParameterJdbcTemplate.queryForStream
     */
    fun <T : Any> DatabaseClient.queryForFlow(
        sqlTemplate: String,
        params: Map<String, Any>,
        rowMapper: (row: Row, metadata: RowMetadata) -> T,
    ): Flow<T> {
    
        val executionSpec = params.entries.fold(sql(sqlTemplate)) { executeSpec, (name, value) ->
            executeSpec.bind(name, value)
        }
    
        return executionSpec.map(rowMapper).all().asFlow()
    }
    

    Note the type variable is T : Any not just T, otherwise you will not be able to use asFlow(). You can omit this if you want to stay in the reactor framework and not use coroutines.

    Trivial usage example:

    class Example(private val dbClient: DatabaseClient) {
        
        fun getAllNames(id: Long): Flow<String> {
            
            val sqlTemplate = "SELECT name FROM foobar WHERE id=:id"
            val params = mapOf("id" to id)
            
            return dbClient.queryForFlow(sqlTemplate, params) {row, metadata -> 
                row.get(0, String::class.java)
            }
        }
    }