I have a stream of ResponseMessage
which can be of different subtypes. I would like to split the stream into streams where I can handle each type in its own stream.
My first try resulted in this which I can not see working out.
file.readLines()
.toObservable()
.map { mapper.readValue(it, ResponseMessage::class.java) }
.groupBy { when(it) {
is MarketChangeMessage -> it::class
else -> it::class
}}
.map { it.????? } //How can possible this work?
My question is now: What is the idiomatic way to divide a stream into streams on one specific sub type?
You could use the ofType
operator:
ofType( ) — emit only those items from the source Observable that are of a particular class.
Example:
val messages = file.readLines()
.toObservable()
.map { mapper.readValue(it, ResponseMessage::class.java) }
.share() // <-- or other multicasting operator
messages
.ofType(MarketChangeMessage::class)
.subscribe()
messages
.ofType(Other::class)
.subscribe()