I'm working on an app where I want to achieve the following using RxSwift and RxCocoa
... etc
The key here is that the processing of each file has to complete before downloading the next file. At the very least the order of the file processing must be performed in order. If I can start downloading file 2 while file 1 is processing, that would be awesome, but is not necessary.
I've tried using SerialDispatchQueueScheduler to make this work, but I since the files are of different sizes, the download of each file finishes at different times, and therefore the processing code fires in a different order than I started the downloads.
I could easily implement this without using Rx by using NSOperations and the like, but I'd like to keep using Rx in this app, as it's what I use elsewhere in this app.
Below I've included a snippet with some of the code. Comments have been added for the sake of this question.
.flatMap { [unowned self] (tasks: [DiffTask]) -> Observable<ApplyDiffStatus> in
return Observable.from(tasks)
.observeOn(self.backgroundScheduler) // StackOverflow: backgroundScheduler is a SerialDispatchQueueScheduler
.flatMapWithIndex({ [unowned self] (task, index) in
return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count) // StackOverflow: Downloads a file from a URL
})
.catchError({ (error) -> Observable<DictionaryUpdater.DiffTaskProgress> in
observable.onError(error)
throw error
})
.map({ (diffTask : DiffTaskProgress) -> DiffTaskProgress.Progress in
// Stack Overflow: I've wrapped much of the progress observable in a Observable<UpdateProgress>
switch diffTask.progress {
case .started(currentTask: let currentTask, taskCount: let taskCount):
observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
case .finished(data: _, currentTask: let currentTask, taskCount: let taskCount):
observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
case .progress(completion: _, currentTask: let currentTask, taskCount: let taskCount):
observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
}
return diffTask.progress
})
.flatMap({ [unowned self] (progress: DiffTaskProgress.Progress) -> Observable<ApplyDiffStatus> in
switch progress {
case .finished(data: let data, currentTask: let currentTask, taskCount: let taskCount):
return self.applyDiff(data, currentTask: currentTask, taskCount: taskCount) // StackOverflow: PROCESSES THE FILE THAT WAS DOWNLOADED
default:
return Observable.empty()
}
})
}
I managed to solve it by using the concatMap
operator. So instead of
.flatMapWithIndex({ [unowned self] (task, index) in
return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count) // StackOverflow: Downloads a file from a URL
})
I did something like this:
tasks.enumerated().concatMap { (index, task) in
return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count)
}
The concatMap
operator makes sure that the first observable is finished before emitting any more signals. I had to use enumerated()
since concatMap does not come with a concatMapWithIndex
, but it works :)