androidkotlinrx-javacoroutine

How to call suspend fun in async way from flow scope?


So I have never done something like this, so I have no clue which is the better approach.

Right now, I have a useCase, which returns with a flow. To keep it simple, I don't attach my code, but it is very similar to this.

class UseCase constructor(
    private val userRepository: UserRepository,
    private val carRepository: CarRepository
){
    operator fun invoke(): Flow<Result<UserWithCars>>{
        return flow {
            try {
                //UserRepository under the hood returns with a RxJava Single<List<User>> list
                // But I converted to suspend fun with await() fun
                val users = userRepository.getAllUsers().await()
                fetchUserCars(users)
            }catch (e: Exception){
                emit(Result.Error())
            }
        }
    }

    private suspend fun fetchUserCars(userList: List<User>){
        userList.forEach { user->
            user.cars = try {
                //REpository also returns with a Single<>
                carRepository.getCarsByUserId(user.id).await()
            }catch (e: Exception){
                emptyList<Car>()
            }
        }
    }
}

If I'm not wrong, the foreach will be executed in synchronously. I'm not sure, but I think, the foreach can be executed in async way, but I don't know how.

Back then, I used the ansyc{} fun , but I requires to be inside a Coroutine scope.

Previously, the User Single was flatMap-ed, and the cars was fetched to the user object.

return users.flatMap { user->
                    fetchCars(user)
                }

In the fetchCars() method, I basically mapped every userId to a Single<> which queries the cars, so I get a List<Single> and after that, I used RxJava Zip fun, but in this case, I'm not sure, how the Zip runs, so every Single will be executed each other, or simultaneously

So can I optimize it a little bit more, or is it fine right now?


Solution

  • Broot's answer is good as it allows you to await the Singles asynchronously, but it launches a new coroutine to await each Single.

    I'm not that familiar with RxJava, but I think the following should work without launching any additional coroutines:

    private suspend fun fetchUserCars(userList: List<User>) {
        userList
            .map { it to carRepository.getCarsByUserId(it.id) }
            .map {
                it.first.cars = try {
                    it.second.await()
                } catch (e: Exception) {
                    emptyList()
                }
            }
    }
    

    The list of users is iterated twice: On the first iteration the Singles are retrieved (without awaiting their value, so this is fast and no coroutine is needed), on the second iteration await is called on each Single to wait for the value. This needs a coroutine, but it can be done sequentially in the same coroutine that the suspend function runs in because you want to wait until everything is awaited.

    This is a similar to how the built-in joinAll works on a List<Deferred>.