kotlinkotlin-coroutineskotlin-multiplatformkotlin-js

Transforming a Flow<Byte> to a Flow<String> in Kotlin


Consider I have a cold source of UTF-8 bytes (e. g.: reading a file on disk, or the body of an HTTP response), in a form of a Flow<Byte>. How do I convert the above source to a flow of strings?

In other words, I want the following behaviour:

        /*
         * A multi-line string, not terminated with a newline character.
         */
        val string = """
            first line
            第二行
            третья строка
        """.trimIndent()

        assertNotEquals('\n', string.last())
        assertEquals(2, string.asSequence().count { it == '\n' })

        val source: Flow<Byte> = string.toByteArray().asSequence().asFlow()

        val transformed: Flow<String> = TODO()

        val target = runBlocking {
            transformed.toList(mutableListOf()).toTypedArray()
        }

        assertArrayEquals(
            arrayOf("first line", "第二行", "третья строка"),
            target
        )

As an extra restriction, this is a Kotlin/JS project, so java.io APIs can't be used.


Solution

  • Eventually, I came up with the following solution:

    fun Flow<Byte>.decodeToString(): Flow<String> =
        flow {
            val buffer: MutableList<Byte> = arrayListOf()
    
            collect { value ->
                when (value) {
                    /*
                     * Ignore.
                     */
                    '\r'.code.toByte() -> Unit
    
                    '\n'.code.toByte() -> {
                        emit(buffer)
                        buffer.clear()
                    }
    
                    else -> buffer.add(value)
                }
            }
    
            if (buffer.isNotEmpty()) {
                emit(buffer)
            }
        }
            .map(Collection<Byte>::toByteArray)
            .map(ByteArray::decodeToString)
    

    The ArrayList<Byte> above can be replaced with either okio.Buffer from okio or kotlinx.io.core.BytePacketBuilder from kotlinx-io, e.g.:

    import kotlinx.coroutines.flow.Flow
    import kotlinx.coroutines.flow.flow
    import okio.Buffer
    
    fun Flow<Byte>.decodeToString(): Flow<String> =
        flow {
            val buffer = Buffer()
    
            collect { value ->
                when (value) {
                    /*
                     * Ignore.
                     */
                    '\r'.code.toByte() -> Unit
    
                    '\n'.code.toByte() -> {
                        emit(buffer.readUtf8())
                        buffer.clear()
                    }
    
                    else -> buffer.writeByte(value.toInt())
                }
            }
    
            if (buffer.size > 0) {
                emit(buffer.readUtf8())
            }
        }