I am looking for the equivalent of jdbcTemplate.queryForStream
using R2DBC. I'm new to R2DBC, but this sounds like an ideal use case for it.
So far all I can find are examples of how to use spring-data-r2dbc for a JPA-like experience. This is really cool, but in my case this is overly complicated. I don't need any entity-mapping features or a persistence framework. I only need to perform simple, non-blocking queries. However, I can't find a single example of that in their docs or anything that even looks like it could be used for that purpose.
So, how can I perform basic queries with Spring + R2DBC? Bonus points if it integrates well with Kotlin coroutines like the CoroutineCrudRepository
does, though this isn't necessary.
I think I found the way to do it, but I have not tested this code extensively yet. Implemented as an extension function on r2dbc.core.DatabaseClient
import io.r2dbc.spi.Row
import io.r2dbc.spi.RowMetadata
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import org.springframework.r2dbc.core.DatabaseClient
/**
* Trying to mimic the behavior of NamedParameterJdbcTemplate.queryForStream
*/
fun <T : Any> DatabaseClient.queryForFlow(
sqlTemplate: String,
params: Map<String, Any>,
rowMapper: (row: Row, metadata: RowMetadata) -> T,
): Flow<T> {
val executionSpec = params.entries.fold(sql(sqlTemplate)) { executeSpec, (name, value) ->
executeSpec.bind(name, value)
}
return executionSpec.map(rowMapper).all().asFlow()
}
Note the type variable is T : Any
not just T
, otherwise you will not be able to use asFlow()
. You can omit this if you want to stay in the reactor framework and not use coroutines.
Trivial usage example:
class Example(private val dbClient: DatabaseClient) {
fun getAllNames(id: Long): Flow<String> {
val sqlTemplate = "SELECT name FROM foobar WHERE id=:id"
val params = mapOf("id" to id)
return dbClient.queryForFlow(sqlTemplate, params) {row, metadata ->
row.get(0, String::class.java)
}
}
}