androidkotlinrx-java2

How to create a sequential RxJava chain from Completable to Single?


I have a scenario where I need to create a sequential chain using RxJava in Kotlin. Specifically, I want to create a chain that starts with a Completable and then proceeds to a Single operation.

Here is the relevant code snippet:

class TestRepository {
    fun performCompletableTask(): Completable {
        Timber.v("Migration performCompletableTask")
        return Completable.fromAction {
            // Perform some operation here
            for (i in 1..100) {
                // Do something with the number i
                // Example: Print the number i
                Timber.v("$i ")
            }
        }
    }

    fun performSingleStringTask(): Single<String> {
        Timber.v("Migration performSingleStringTask")
        return Single.fromCallable {
            // Perform some operation here
            val stringBuilder = StringBuilder()
            for (i in 1..100) {
                // Append the number i to the string builder
                stringBuilder.append(i).append(" ")
            }
            // Return the final string
            stringBuilder.toString()
        }
    }

    fun performSingleIntTask(): Single<Int> {
        Timber.v("Migration performSingleIntTask")
        return Single.fromCallable {
            // Perform some operation here
            var sum = 0
            for (i in 1..100) {
                // Accumulate the sum of numbers
                sum += i
            }
            // Return the final sum
            sum
        }
    }
}


 fun testMigration() {
        Timber.v("Performing migration...")

        val completable = testRepository.performCompletableTask()
            .andThen(testRepository.performSingleStringTask())
            .flatMap {
                Timber.v("flatMap Single<String> result: $it")
                testRepository.performSingleIntTask()
            }

        val subscribe = completable
            .subscribe({
                Timber.v("subscribe Single<Int> result: $it")
                Timber.v("Migration completed.")
            }, { error ->
                Timber.e(error, "Error occurred during migration: ${error.message}")
            })
    }

In the given code, both performCompletableTask() and performSingleStringTask() are called simultaneously. However, I want to modify the chain to ensure that performSingleStringTask() is called only after performCompletableTask() has completed execution.

Can you suggest how I can modify the code to achieve sequential execution in the RxJava chain? I appreciate any guidance or example code you can provide. Thank you!


Solution

  • performSingleStringTask() already is only executed after performCompletableTask() completes; that is how andThen works. It just looks like they run simultaneously because you put the logging outside of the fromAction and fromCallable operators. Unlike most other RxJava operators, andThen does not take a callback, it takes an Rx type directly, so the performSingleStringTask() method is called immediately after performSingleStringTask() is called, and both are called before suscribe is called. However, the callbacks you passed to the fromAction and fromCallable operators still happen sequentially, starting from the point subscribe is called. So the order of operations is:

    1. performCompletableTask() is called, logs a message, and returns a Completable
    2. performSingleStringTask() is called logs a message, and returns a Single, which is passed as a parameter to andThen().
    3. flatMap is called and is passed a lambda function
    4. subscribe is called, which triggers the Completable.fromAction() callback to run and emit onComplete when its finished.
    5. The Single.fromCallable callback runs and emits onSuccess when its finished.