Consider I have a cold source of UTF-8 bytes (e. g.: reading a file on disk, or the body of an HTTP response), in a form of a Flow<Byte>
. How do I convert the above source to a flow of strings?
In other words, I want the following behaviour:
/*
* A multi-line string, not terminated with a newline character.
*/
val string = """
first line
第二行
третья строка
""".trimIndent()
assertNotEquals('\n', string.last())
assertEquals(2, string.asSequence().count { it == '\n' })
val source: Flow<Byte> = string.toByteArray().asSequence().asFlow()
val transformed: Flow<String> = TODO()
val target = runBlocking {
transformed.toList(mutableListOf()).toTypedArray()
}
assertArrayEquals(
arrayOf("first line", "第二行", "третья строка"),
target
)
As an extra restriction, this is a Kotlin/JS project, so java.io
APIs can't be used.
Eventually, I came up with the following solution:
fun Flow<Byte>.decodeToString(): Flow<String> =
flow {
val buffer: MutableList<Byte> = arrayListOf()
collect { value ->
when (value) {
/*
* Ignore.
*/
'\r'.code.toByte() -> Unit
'\n'.code.toByte() -> {
emit(buffer)
buffer.clear()
}
else -> buffer.add(value)
}
}
if (buffer.isNotEmpty()) {
emit(buffer)
}
}
.map(Collection<Byte>::toByteArray)
.map(ByteArray::decodeToString)
The ArrayList<Byte>
above can be replaced with either okio.Buffer
from okio or kotlinx.io.core.BytePacketBuilder
from kotlinx-io, e.g.:
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import okio.Buffer
fun Flow<Byte>.decodeToString(): Flow<String> =
flow {
val buffer = Buffer()
collect { value ->
when (value) {
/*
* Ignore.
*/
'\r'.code.toByte() -> Unit
'\n'.code.toByte() -> {
emit(buffer.readUtf8())
buffer.clear()
}
else -> buffer.writeByte(value.toInt())
}
}
if (buffer.size > 0) {
emit(buffer.readUtf8())
}
}