kotlinhaskellrx-javafunctorapplicative

How can I generalize the arity of rxjava2 Zip function (from Single/Observable) to n Nullable arguments without lose its types?


Two Main Problems to solve:

1) Type check is lost

Using the array argument Single.zip() version I lose the strongly typed arguments.

2) Source argument Cannot be Nullable

I cannot send nullable source values as argument of Single.zip() function

3) I want an alternative to the method taking an Object[] not typed:

4) I don't want Mutable Objects, I don't want the use of var in my class, i want to use val

public static <T, R> Single<R> zipArray(Function<? super Object[], ? extends R> zipper, SingleSource<? extends T>... sources) ...

In haskell, there is a question related How can I implement generalized "zipn" and "unzipn" in Haskell?:

And in haskell I can achieve this with applicative functors:

f <$> a1 <*> a2 <*> a3 <*> a4 <*> a5 <*> a6 <*> a7 <*> a8 <*> a9 <*> a10 <*> a11

being f :: Int -> Int -> Int -> Int -> Int -> Int -> Int -> String -> String -> String -> Int

and a1 .. a11 values corresponding each type

There is a list of similar functions in the library:

And so on...

In all those cases, is just fine, because each argument is typed. But there is a limitation until 9 Single sources

In our project, we needed more sources, because we have a lot of services that we want to reach async (in our case was 11 arguments).

But the issue is the arguments lose their strong types, and worse, some of them could be Nullable

For example we wanted to solve this use case:

//Given
val bothSubscribed = CountDownLatch(2) // Change this value to 0 to run the test faster
val subscribeThreadsStillRunning = CountDownLatch(1) // Change this value to 0 to run the test faster

val service = { s1: String,
                s2: Int,
                s3: String?,
                s4: Int,
                s5: String,
                s6: String,
                s7: String,
                s8: String,
                s9: String,
                s10: String?,
                s11: String ->
    val result =
        listOf(s1, "$s2", s3 ?: "none", "$s4", s5, s6, s7, s8, s9, s10 ?: "none", s11).joinToString(separator = ";")
    Single.just("Values:$result")
}

val createSingle = { value: String ->
    Observable
        .create<String> { emitter ->
            println("Parallel subscribe $value on ${Thread.currentThread().name}")
            bothSubscribed.countDown()
            subscribeThreadsStillRunning.await(20, TimeUnit.SECONDS)
            emitter.onNext(value)
            emitter.onComplete()
        }
        .singleOrError()
        .subscribeOn(io())
}

val s1 = createSingle("v1")
val s2 = Single.just(2)
val s3 = null
val s4 = Single.just(4)
val s5 = createSingle("v5")
val s6 = createSingle("v6")
val s7 = createSingle("v7")
val s8 = createSingle("v8")
val s9 = createSingle("v9")
val s10 = null
val s11 = createSingle("v11")

//When

 val result = Single.zipArray(
    listOf(
        s1,
        s2,
        s3,
        s4,
        s5,
        s6,
        s7,
        s8,
        s9,
        s10,
        s11
    )
) { arrayResult ->
    service(
        arrayResult[0] as String,
        arrayResult[1] as String,
        arrayResult[2] as String?,
        arrayResult[3] as String,
        arrayResult[4] as String,
        arrayResult[5] as String,
        arrayResult[6] as String,
        arrayResult[7] as String,
        arrayResult[8] as String,
        arrayResult[9] as String?,
        arrayResult[10] as String
    )
}

//Then
result
    .test()
    .awaitDone(50, TimeUnit.SECONDS)
    .assertSubscribed()
    .assertValues("Values:v1;2;none;4;v5;v6;v7;v8;v9;none;v11")

As you can see, problems may occur if I do for example:

arrayResult[0] as String,
arrayResult[1] as Int,
arrayResult[2] as String?,
arrayResult[3] as Int,
arrayResult[4] as String,
arrayResult[5] as String,
arrayResult[6] as String,
arrayResult[7] as String,
arrayResult[8] as String,
arrayResult[9] as String?,
arrayResult[10] as String

Fails because:

1) None of the Single.zip() functions can take a nullable value as argument.

2) You can change in the array the order of the values and it can fail because of type-check casting


Solution

  • I have accomplished that goal using:

    1. Kotlin Extension Functions
    2. Curried functions (Kotlin allows that)
    3. Partial Application (Kotlin allows that too)
    4. Functor and Applicative Functors concepts (Single and Observable classes are Applicative functors)
    5. Mix it all together:

    First, the zipOver function, for not Nullable values:

    /**
     * Returns a Single that is the result of applying the function inside the context (a Single in this case).
     * This function is curried and will be used as an Applicative Functor, so each argument will be given
     * one by one
     * @param <B> the result value type
     * @param applicativeValue
     *            a Single that contains the input value of the function
     * @return the Single returned when the function is applied to the applicative value.
     * Each application will be executed on <b>a new thread</b> if and only if the Single is subscribed on a specific scheduler
     */
    infix fun <A, B> Single<(A) -> (B)>.zipOver(applicativeValue: Single<A>): Single<B> =
        Single.zip(this, applicativeValue, BiFunction { f, a -> f(a) })
    

    Then, zipOverNullable for Nullable values:

    /**
     * Returns a Single that is the result of applying the function inside the context (a Single in this case).
     * This function is curried and will be used as an Applicative Functor, so each argument will be given
     * one by one
     * @param <B> the result value type
     * @param applicativeValue
     *            a Single that contains the input value of the function and it can be null
     * @return the Single returned when the function is applied to the applicative value even when
     * it is null.
     * Each application will be executed on <b>a new thread</b> if and only if the Single is subscribed on a specific scheduler
     */
    infix fun <A, B> Single<(A?) -> (B)>.zipOverNullable(applicativeValue: Single<A>?): Single<B> =
        when {
            applicativeValue != null -> Single.zip(this, applicativeValue, BiFunction { f, a -> f(a) })
            else -> this.map { it(null) }
        }
    

    I used org.funktionale.currying for the curried() function

    By combining those two you could write:

        //Given
        val bothSubscribed = CountDownLatch(0) // Change this value to 2 to run the test slowly
        val subscribeThreadsStillRunning = CountDownLatch(0) // Change this value to 1 to run the test slowly
    
        val service: (String, String, String?, String, String, String, String, String, String, String?, String) -> Single<String> = { 
                        s1: String,
                        s2: Int,
                        s3: String?,
                        s4: Int,
                        s5: String,
                        s6: String,
                        s7: String,
                        s8: String,
                        s9: String,
                        s10: String?,
                        s11: String ->
            val result =
                listOf(s1, "$s2", s3 ?: "none", "$s4", s5, s6, s7, s8, s9, s10 ?: "none", s11).joinToString(separator = ";")
            Single.just("Values:$result")
        }
    
        val createSingle = { value: String ->
            Observable
                .create<String> { emitter ->
                    println("Parallel subscribe $value on ${Thread.currentThread().name}")
                    bothSubscribed.countDown()
                    subscribeThreadsStillRunning.await(20, TimeUnit.SECONDS)
                    emitter.onNext(value)
                    emitter.onComplete()
                }
                .singleOrError()
                .subscribeOn(io())
        }
    
        val s1: Single<String> = createSingle("v1")
        val s2: Single<Int> = Single.just(2)
        // Here, we move the Nullable value outside, so the whole Single<String> is Nullable, and not the value inside the Single`enter code here`
        val s3: Single<String>? = null
        val s4: Single<String> = Single.just(4)
        val s5: Single<String> = createSingle("v5")
        val s6: Single<String> = createSingle("v6")
        val s7: Single<String> = createSingle("v7")
        val s8: Single<String> = createSingle("v8")
        val s9: Single<String> = createSingle("v9")
        val s10: Single<String>? = null
        val s11 = createSingle("v11")
    
        //When
        // Here I curry the function, so I can apply one by one the the arguments via zipOver() and preserve the types 
    
        val singleFunction: Single<(String) -> (String) -> (String?) -> (String) -> (String) -> (String) -> (String) -> (String) -> (String) -> (String?) -> (String) -> Single<String>> =
            Single.just(service.curried()).subscribeOn(io())
    
        val result = singleFunction
            .zipOver(s1)
            .zipOver(s2)
            .zipOverNullable(s3)
            .zipOver(s4)
            .zipOver(s5)
            .zipOver(s6)
            .zipOver(s7)
            .zipOver(s8)
            .zipOver(s9)
            .zipOverNullable(s10)
            .zipOver(s11)
            .flatMap { it }
    
        //Then
        result
            .test()
            .awaitDone(50, TimeUnit.SECONDS)
            .assertSubscribed()
            .assertValues("Values:v1;2;none;4;v5;v6;v7;v8;v9;none;v11")
    

    Then it prints something like:

    Parallel subscribe v11 on RxCachedThreadScheduler-10
    Parallel subscribe v8 on RxCachedThreadScheduler-8
    Parallel subscribe 4 on RxCachedThreadScheduler-4
    Parallel subscribe v5 on RxCachedThreadScheduler-5
    Parallel subscribe v9 on RxCachedThreadScheduler-9
    Parallel subscribe 2 on RxCachedThreadScheduler-3
    Parallel subscribe v6 on RxCachedThreadScheduler-6
    Parallel subscribe v1 on RxCachedThreadScheduler-2
    Parallel subscribe v7 on RxCachedThreadScheduler-7
    

    Now, if I do:

        val result = singleFunction
            .zipOver(s1)
            .zipOver(s1)
            .zipOverNullable(s3)
            .zipOver(s1)
            .zipOver(s5)
            .zipOver(s6)
            .zipOver(s7)
            .zipOver(s8)
            .zipOver(s9)
            .zipOverNullable(s10)
            .zipOver(s11)
            .flatMap { it }
    

    It will break at compilation time