rx-javareactive-programmingrx-java2purely-functional

Functional-reactive operator in RxJava2 that produces new state from previous state and additional inputs


I am working on a little showcase that is supposed to demonstrate how you can write interactive programs in a purely functional manner using functional-reactive programming (specifically RxJava2).

My goal is to create a simple interactive game. The main function of the game (let's call it Next Game State, or NGS for short) takes the current state of the game, a user input, as well as a random number and computes the next state of the game from those three inputs. Fairly straightforward so far. The user inputs and the random numbers are Flowables that were created from Iterables or via generators. I envisioned that the game state itself would be a Flowable as well (but I might be wrong about that).

I am struggling to find the right functional-reactive operator that applies the function to the three inputs and produces the next state of the game. Initially, I thought that Flowable.zip(source1, source2, source3, zipper) would be right operation: it could take the three flows and combine it via the NGS function into a new Flowable. That, unfortunately, does not account for the fact that the resulting Flowable itself needs to be one of the inputs of the zip operation, which seems an impossible setup. My next idea was to use Flowable.generate, but I need the two additional inputs from other Flowables to calculate the next state, and there is no way to feed those into the generate operator.

In a nutshell, I'm trying to realize something similar to this (pseudo-)marble diagram:

  Game --------------(0)-------------(1)-----------------(2)-----------------(3)---
  State                \   _______   / \       _______   / \       _______   /
                        \_|       |_/   \_____|       |_/   \_____|       |_/
                          | Next  |           | Next  |           | Next  |
                     _____| Game  |      _____| Game  |      _____| Game  |
                    /   __| State |     /   __| State |     /   __| State |
                   /   /  '_______'    /   /  '_______'    /   /  '_______'
                  /   /               /   /               /   /
  User           /   /               /   /               /   /
  Input --------o---/---------------o---/---------------o---/--------------------
                   /                   /                   /
  Random          /                   /                   /
  Number --------o-------------------o-------------------o---------------------------

The top line, I admit, is somewhat non-standard, but it's my best attempt to visualize that, beginning with an initial game state (0), each successive game state (1), (2), (3), etc., is created from the previous game state plus the additional inputs.

I realize that there is probably a relatively straightforward way to do this with an Emitter or maybe inside a downstream subscriber, but one of the goals of this exercise is to show that this can be solved entirely without side effects or void methods.

So, long story short, is there an existing operator that supports this behavior? If not, what would be involved in creating one? Or is there maybe some alternative solution that uses a slightly different overall setup?


Solution

  • Assumption

    The user-input and randome number are two separate streams, which could emit at any time by their own.

    Solution

    long story short, is there an existing operator that supports this behavior?

    Yes, there is. You could use #scan(seed<T>, { current<T>, upstream<Q> -> newValue<T> }.

    import io.reactivex.rxjava3.core.Flowable
    import io.reactivex.rxjava3.functions.BiFunction
    import io.reactivex.rxjava3.processors.PublishProcessor
    import org.junit.jupiter.api.Test
    
    class So65589721 {
        @Test
        fun `65589721`() {
            val userInput = PublishProcessor.create<String>()
            val randomNumber = PublishProcessor.create<Int>()
    
            val scan = Flowable.combineLatest(userInput, randomNumber, BiFunction<String, Int, Pair<String, Int>> { u, r ->
                u to r
            }).scan<GameState>(GameState.State1, { currentState, currentInputTuple ->
                // calculate new state from `currentState` and combined pair
                GameState.State3(currentInputTuple.first, currentInputTuple.second)
            })
    
            val test = scan.test()
    
            randomNumber.offer(42)
    
            userInput.offer("input1")
            userInput.offer("input2")
    
            test
                .assertValues(GameState.State1, GameState.State3("input1", 42), GameState.State3("input2", 42))
        }
    
        interface GameState {
            object State1 : GameState
            data class State3(val value: String, val random: Int) : GameState
        }
    }
    

    This examples shows how to use scan to calculate a new state from a given input pair and a current state. With scan you can "hold" state in a safe manner, without expsosing it to side-effects.

    Note