spring-data-redis-reactive

Spring Data Redis Reactive: StreamReceiver to coroutines Flow


I'm writing an app that works with Redis Stream using Spring Data Redis. I'm using spring-data-redis with Lettuce. I can successfully write to the stream as I can validate it directly in Redis via redis-cli, and I see the messages are in Redis. When it comes to reading from the stream using StreamReceiver, it kind of works but my tests fail for the coroutines version.

So, I've implemented two versions for reading with different return types:

TestDataRedisRepository.kt content:

import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.reactive.asFlow
import org.slf4j.LoggerFactory
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.connection.stream.RecordId
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.connection.stream.StreamRecords
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.data.redis.core.addAndAwait
import org.springframework.data.redis.core.trimAndAwait
import org.springframework.data.redis.stream.StreamReceiver
import org.springframework.stereotype.Repository
import reactor.core.publisher.Flux

const val STREAM_KEY = "test-data"

@Repository
class TestDataRedisRepository(
    val reactiveRedisTemplate: ReactiveRedisTemplate<String, TestData>,
    val streamReceiver: StreamReceiver<String, MapRecord<String, String, String>>
) {
    private val log = LoggerFactory.getLogger(this::class.java)

    @FlowPreview
    fun saveAll(entityStream: Flow<TestData>): Flow<RecordId> {
        return entityStream
            .map { toMapRecord(it) }
            .flatMapConcat {
                log.info("Saving record: $it")
                reactiveRedisTemplate
                    .opsForStream<String, TestData>()
                    .add(it)
                    .asFlow()
            }
    }

    suspend fun save(TestData: TestData): RecordId {
        val record = toMapRecord(TestData)
        log.info("Saving record: $record")
        return reactiveRedisTemplate
            .opsForStream<String, TestData>()
            .addAndAwait(record)
    }

    private fun toMapRecord(TestData: TestData): MapRecord<String, String, String> =
        StreamRecords.newRecord()
            .`in`(STREAM_KEY)
            .ofMap(TestData.toMap())

    fun readAllAsFlux(): Flux<TestData> {
        return streamReceiver
            .receive(StreamOffset.fromStart(STREAM_KEY))
            .doOnEach { log.info("Received stream record: $it") }
            .map { it.value.fromMap() }
    }

    fun readAllAsFlow(): Flow<TestData> {
        return streamReceiver
            .receive(StreamOffset.fromStart(STREAM_KEY))
            .doOnEach { log.info("Received stream record: $it") }
            .map { it.value.fromMap() }
            .asFlow()
    }

    suspend fun deleteAll() {
        reactiveRedisTemplate
            .opsForStream<String, TestData>()
            .trimAndAwait(STREAM_KEY, 0)
    }
}

Test class TestDataRedisRepositoryTest.kt content:

import app.cash.turbine.test
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import reactor.test.StepVerifier
import java.time.Duration
import java.time.Instant
import kotlin.time.ExperimentalTime

@FlowPreview
@ExperimentalTime
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@SpringBootTest
internal class TestDataRedisRepositoryTest @Autowired constructor(
    private val testDataRedisRepository: TestDataRedisRepository
) {
    private val now: Instant = Instant.now()

    @BeforeAll
    fun setUp() {
        runBlocking { testDataRedisRepository.deleteAll() }
    }

    @AfterEach
    fun afterEach() {
        runBlocking {
            testDataRedisRepository.deleteAll()
        }
    }

    @Test //passes
    fun `test Flux`() {
        runBlocking {
            testDataRedisRepository.saveAll(
                flowOf(
                    TestData(now.minusSeconds(1), "test2"),
                    TestData(now, "test3")
                )
            ).toList()
        }

        testDataRedisRepository.readAllAsFlux().`as`(StepVerifier::create) //
            .consumeNextWith {
                assertThat(it).isEqualTo(TestData(now.minusSeconds(1), "test2"))
            }
            .consumeNextWith {
                assertThat(it).isEqualTo(TestData(now, "test3"))
            }
            .thenCancel()
            .verify(Duration.ofSeconds(1))
    }

    @Test //fails
    fun `test Flow`() {
        runBlocking {
            testDataRedisRepository.saveAll(
                flowOf(
                    TestData(now.minusSeconds(1), "test2"),
                    TestData(now, "test3")
                )
            ).toList()

//            val list = testDataRedisRepository.readAllAsFlow().toList() // this call blocks forever

            // FlowTurbine just times out
            testDataRedisRepository.readAllAsFlow()
                .test {
                    assertThat(expectItem())
                        .isEqualTo(TestData(now.minusSeconds(1), "test2"))
                    assertThat(expectItem())
                        .isEqualTo(TestData(now, "test3"))

                    expectComplete()
                }
        }
    }
}

My RedisConfig.kt content:

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
import org.springframework.data.redis.connection.stream.MapRecord
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.data.redis.serializer.RedisSerializationContext
import org.springframework.data.redis.serializer.RedisSerializer
import org.springframework.data.redis.stream.StreamReceiver

@Configuration
class RedisConfig {

    /**
     * For writing to Redis Stream
     */
    @Bean
    fun reactiveRedisTemplate(
        factory: ReactiveRedisConnectionFactory,
        serializationContext: RedisSerializationContext<String, TestData>
    ): ReactiveRedisTemplate<String, TestData> = ReactiveRedisTemplate(
        factory,
        serializationContext
    )

    /**
     * For reading from Redis Stream
     */
    @Bean
    fun streamReceiver(
        factory: ReactiveRedisConnectionFactory,
        serializationContext: RedisSerializationContext<String, TestData>
    ): StreamReceiver<String, MapRecord<String, String, String>> {
        return StreamReceiver.create(
            factory,
            StreamReceiver.StreamReceiverOptions.builder()
                .serializer(serializationContext)
                .build()
        )
    }

    @Bean
    fun serializationContext(): RedisSerializationContext<String, TestData> =
        RedisSerializationContext.newSerializationContext<String, TestData>(
            RedisSerializer.string()
        ).build()
}

TestData.kt

import java.time.Instant

data class TestData(
    val instant: Instant,
    val content: String
)

const val INSTANT = "instant"
const val CONTENT = "content"

fun TestData.toMap(): Map<String, String> {
    return mapOf(
        INSTANT to instant.toString(),
        CONTENT to content
    )
}

fun Map<String, String>.fromMap(): TestData {
    return TestData(
        Instant.parse(this[INSTANT]),
        this[CONTENT]!!
    )
}

Redis is running in a Docker container on the default port. For completeness, here are aplication.yaml and build.gradle.kts:

spring:
  redis:
    host: localhost
    port: 6379
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "2.5.2"
    id("io.spring.dependency-management") version "1.0.11.RELEASE"
    kotlin("jvm") version "1.5.20"
    kotlin("plugin.spring") version "1.5.20"
}

group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")
    implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")

    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("io.projectreactor:reactor-test")
    testImplementation("app.cash.turbine:turbine:0.5.2")
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "11"
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}

Solution

  • If the test needs to check multiple values then calling toList() causes the flow to wait for the source to emit all its values and then returns those values as a list. Note that this works only for finite data streams.

    So, in your case if its a stream that emits infinite values then it will wait forever to collect the values which is why your test is blocked.

    A solution can be to take a finite number of items from flow and then do the assertion. For example, you can do something like the code below:

    // Take the first item and cancel the flow
    val firstItem = testDataRedisRepository.readAllAsFlow().first()
    
    // Take the second item
    val secondItem = testDataRedisRepository.readAllAsFlow().drop(1).first()
    
    // Take the first five items
    val firstFiveItems = testDataRedisRepository.readAllAsFlow().take(5).toList()
    

    For more scenarios, you can refer to this Android Developers link for testing Kotlin flow.