I'm writing an app that works with Redis Stream using Spring Data Redis. I'm using spring-data-redis
with Lettuce. I can successfully write to the stream as I can validate it directly in Redis via redis-cli, and I see the messages are in Redis. When it comes to reading from the stream using StreamReceiver
, it kind of works but my tests fail for the coroutines version.
So, I've implemented two versions for reading with different return types:
Flux<TestData>
. I test it using reactor-test
classes, similar to what Spring Data Redis team does. It works fine: received items are printed out and test passes.Flow<TestData>
. I test it using FlowTurbine. This tests fails, even though the received items are printed out; FlowTurbine just times out. I tried using directly blocking Flow.toList()
instead of Turbine's test
, but in this case the call just blocks forever. I'm probably doing something wrong, when I deal with the Flow. What am I doing wrong? And how to fix it?TestDataRedisRepository.kt
content:
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.reactive.asFlow
import org.slf4j.LoggerFactory
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.connection.stream.RecordId
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.connection.stream.StreamRecords
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.data.redis.core.addAndAwait
import org.springframework.data.redis.core.trimAndAwait
import org.springframework.data.redis.stream.StreamReceiver
import org.springframework.stereotype.Repository
import reactor.core.publisher.Flux
const val STREAM_KEY = "test-data"
@Repository
class TestDataRedisRepository(
val reactiveRedisTemplate: ReactiveRedisTemplate<String, TestData>,
val streamReceiver: StreamReceiver<String, MapRecord<String, String, String>>
) {
private val log = LoggerFactory.getLogger(this::class.java)
@FlowPreview
fun saveAll(entityStream: Flow<TestData>): Flow<RecordId> {
return entityStream
.map { toMapRecord(it) }
.flatMapConcat {
log.info("Saving record: $it")
reactiveRedisTemplate
.opsForStream<String, TestData>()
.add(it)
.asFlow()
}
}
suspend fun save(TestData: TestData): RecordId {
val record = toMapRecord(TestData)
log.info("Saving record: $record")
return reactiveRedisTemplate
.opsForStream<String, TestData>()
.addAndAwait(record)
}
private fun toMapRecord(TestData: TestData): MapRecord<String, String, String> =
StreamRecords.newRecord()
.`in`(STREAM_KEY)
.ofMap(TestData.toMap())
fun readAllAsFlux(): Flux<TestData> {
return streamReceiver
.receive(StreamOffset.fromStart(STREAM_KEY))
.doOnEach { log.info("Received stream record: $it") }
.map { it.value.fromMap() }
}
fun readAllAsFlow(): Flow<TestData> {
return streamReceiver
.receive(StreamOffset.fromStart(STREAM_KEY))
.doOnEach { log.info("Received stream record: $it") }
.map { it.value.fromMap() }
.asFlow()
}
suspend fun deleteAll() {
reactiveRedisTemplate
.opsForStream<String, TestData>()
.trimAndAwait(STREAM_KEY, 0)
}
}
Test class TestDataRedisRepositoryTest.kt
content:
import app.cash.turbine.test
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import reactor.test.StepVerifier
import java.time.Duration
import java.time.Instant
import kotlin.time.ExperimentalTime
@FlowPreview
@ExperimentalTime
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@SpringBootTest
internal class TestDataRedisRepositoryTest @Autowired constructor(
private val testDataRedisRepository: TestDataRedisRepository
) {
private val now: Instant = Instant.now()
@BeforeAll
fun setUp() {
runBlocking { testDataRedisRepository.deleteAll() }
}
@AfterEach
fun afterEach() {
runBlocking {
testDataRedisRepository.deleteAll()
}
}
@Test //passes
fun `test Flux`() {
runBlocking {
testDataRedisRepository.saveAll(
flowOf(
TestData(now.minusSeconds(1), "test2"),
TestData(now, "test3")
)
).toList()
}
testDataRedisRepository.readAllAsFlux().`as`(StepVerifier::create) //
.consumeNextWith {
assertThat(it).isEqualTo(TestData(now.minusSeconds(1), "test2"))
}
.consumeNextWith {
assertThat(it).isEqualTo(TestData(now, "test3"))
}
.thenCancel()
.verify(Duration.ofSeconds(1))
}
@Test //fails
fun `test Flow`() {
runBlocking {
testDataRedisRepository.saveAll(
flowOf(
TestData(now.minusSeconds(1), "test2"),
TestData(now, "test3")
)
).toList()
// val list = testDataRedisRepository.readAllAsFlow().toList() // this call blocks forever
// FlowTurbine just times out
testDataRedisRepository.readAllAsFlow()
.test {
assertThat(expectItem())
.isEqualTo(TestData(now.minusSeconds(1), "test2"))
assertThat(expectItem())
.isEqualTo(TestData(now, "test3"))
expectComplete()
}
}
}
}
My RedisConfig.kt
content:
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.data.redis.serializer.RedisSerializationContext
import org.springframework.data.redis.serializer.RedisSerializer
import org.springframework.data.redis.stream.StreamReceiver
@Configuration
class RedisConfig {
/**
* For writing to Redis Stream
*/
@Bean
fun reactiveRedisTemplate(
factory: ReactiveRedisConnectionFactory,
serializationContext: RedisSerializationContext<String, TestData>
): ReactiveRedisTemplate<String, TestData> = ReactiveRedisTemplate(
factory,
serializationContext
)
/**
* For reading from Redis Stream
*/
@Bean
fun streamReceiver(
factory: ReactiveRedisConnectionFactory,
serializationContext: RedisSerializationContext<String, TestData>
): StreamReceiver<String, MapRecord<String, String, String>> {
return StreamReceiver.create(
factory,
StreamReceiver.StreamReceiverOptions.builder()
.serializer(serializationContext)
.build()
)
}
@Bean
fun serializationContext(): RedisSerializationContext<String, TestData> =
RedisSerializationContext.newSerializationContext<String, TestData>(
RedisSerializer.string()
).build()
}
TestData.kt
import java.time.Instant
data class TestData(
val instant: Instant,
val content: String
)
const val INSTANT = "instant"
const val CONTENT = "content"
fun TestData.toMap(): Map<String, String> {
return mapOf(
INSTANT to instant.toString(),
CONTENT to content
)
}
fun Map<String, String>.fromMap(): TestData {
return TestData(
Instant.parse(this[INSTANT]),
this[CONTENT]!!
)
}
Redis is running in a Docker container on the default port.
For completeness, here are aplication.yaml
and build.gradle.kts
:
spring:
redis:
host: localhost
port: 6379
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "2.5.2"
id("io.spring.dependency-management") version "1.0.11.RELEASE"
kotlin("jvm") version "1.5.20"
kotlin("plugin.spring") version "1.5.20"
}
group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.projectreactor:reactor-test")
testImplementation("app.cash.turbine:turbine:0.5.2")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
If the test needs to check multiple values then calling toList()
causes the flow to wait for the source to emit all its values and then returns those values as a list. Note that this works only for finite data streams.
So, in your case if its a stream that emits infinite values then it will wait forever to collect the values which is why your test is blocked.
A solution can be to take a finite number of items from flow and then do the assertion. For example, you can do something like the code below:
// Take the first item and cancel the flow
val firstItem = testDataRedisRepository.readAllAsFlow().first()
// Take the second item
val secondItem = testDataRedisRepository.readAllAsFlow().drop(1).first()
// Take the first five items
val firstFiveItems = testDataRedisRepository.readAllAsFlow().take(5).toList()
For more scenarios, you can refer to this Android Developers link for testing Kotlin flow.