iosswiftreactive-swift

ReactiveSwift pipeline count failures after all complete


I have a pipeline in ReactiveSwift for uploads. I want to make sure that even if one of the uploads fail, the rest will not be interrupted.

After all of them complete with success or failure, I should either return success from the performUploads method, or if any have failed, I should return an error, so the next step, the downloads part won't start. Even if there are errors, all uploads should have a chance to upload, they should not be stopped.

Is there a way to figure out if there are any errors after the uploads complete? See the methods here:

let pendingUploadItemsArray: [Items] = ...
func performUploads() -> SignalProducer<(), MyError> {
    return upload(pendingUploadItemsArray)
        .then(doAnything())
}

private func upload(_ items: [Items]) -> SignalProducer<Signal<(), MyError>.Event, Never> {
    let producers = items
        .filter { item in
            return item.readyForUpload
        }
        .map { self.upload($0).materialize() }
    
    return SignalProducer.merge(producers)
}

private func upload(_ item: Item) -> SignalProducer<(), MyError> {
    return internalUploader.upload(item)
        .on(failed: failedToUpload(item),
            value: successfullyUploaded(item))
        .ignoreValues()
}

where the internalUploader upload method is:

func upload(_ item: Item) -> SignalProducer<Any, MyError>

And then in another class you would call this uploader:

let sync = self.uploader.performUploads()
        .then(startDownloads())

The startDownloads should only run if all the uploads have completed with success. Thanks for any insight.

This might be something that should be done in a completely different manner.


Solution

  • I'm don't know exactly what successfullyUploaded and failedToUpload are doing in your code, but presumably you're keeping track of successes and failures to provide some kind of live progress UI. This is how I would structure it:

    struct UploadResult {
        let item: Item
        let error: Error? // nil if the upload succeeded
    
        var succeeded: Bool { error == nil }
        var failed: Bool { !succeeded }
    }
    
    ...
    
    static func upload(_ items: [Item]) -> SignalProducer<[UploadResult], Never> {
        SignalProducer(items)
            .filter(\.readyForUpload)
            .flatMap(.merge) { item in
                Self.internalUploader(item)
                    .map { UploadResult(item: item, error: nil) }
                    .flatMapError { error in
                        SignalProducer(value: UploadResult(item: item, error: error))
                    }
            }
            .scan(into: [UploadResult]()) { ( results: inout [UploadResult], nextResult) in
                results.append(nextResult)
            }
    }
    
    1. I create an UploadResult struct that represents an item that has succeeded or failed to upload.
    2. In the upload function, instead of creating an array of producers and then merging them, I convert the array of items into a signal producer of items with SignalProducer(items) and then use flatMap(.merge) to merge the uploads into a single signal producer.
    3. Instead of using materialize, I use map to convert successful uploads into an UploadResult and I use flatMapError to convert failed uploads into an UploadResult.
    4. I use scan to accumulate the results as each upload completes. Each time an upload finishes (either successfully or with an error), scan will send an updated array of upload results that can be used to update the UI.

    Then you could use it like this:

    Uploader.upload(someItems)
        .on(value: { resultsSoFar in
            // Update UI here
        })
        .take(last: 1)
        .attempt { results in
            if !results.allSatisfy(\.succeeded) {
                // At least one of the uploads failed, so send an error
                throw MyError()
            }
        }
        .then(startDownloads)
    
    1. I use the on(value:) operator to update the UI based on the current results. Each time a download succeeds or fails, this closure will be called with the updated results.
    2. I use take(last: 1) to filter out all intermediate results; it'll only send along the final results when all uploads have completed.
    3. I use attempt to check if any of the uploads have failed and throw an error if so. This ensures the downloads will only be started if all uploads succeeded.

    Hopefully this handles your use case, but let me know in a comment if I missed something about the broader context!

    EDIT

    If you only care about dealing with results one at a time rather than as a running array, you can get rid of the scan and then replace take(last: 1) with collect:

    static func upload(_ items: [Item]) -> SignalProducer<UploadResult, Never> {
        SignalProducer(items)
            .filter(\.readyForUpload)
            .flatMap(.merge) { item in
                Self.internalUploader(item)
                    .map { UploadResult(item: item, error: nil) }
                    .flatMapError { error in
                        SignalProducer(value: UploadResult(item: item, error: error))
                    }
            }
    }
    
    ...
    
    Uploader.upload(someItems)
        .on(value: { latestResult in
            // Do something with the latest result
        })
        .collect()
        .attempt { results in
            if !results.allSatisfy(\.succeeded) {
                // At least one of the uploads failed, so send an error
                throw MyError()
            }
        }
        .then(startDownloads)