I have an observable that gets items from a lot of sources:
Source { List<Item> data }
Relationship between sources and items is many-to-many and in different sources items could duplicate themselves. Item is an entity that should be uploaded to server and server does not accept duplicates. To achieve this I merge Sources and distinct their Items by their ids and then upload unique items to server. Like below:
Observable.merge(source1(), source2(), source3())
.flatMapIterable(sources -> sources)
.flatMapIterable(source::getItems)
.distinct(item -> item.getId())
.flatMapCompletabale(item -> uploadItem(item))
Item uploading could emit several errors and on some of them I should retry to upload item once again later and proceed another items while 'failed' one is waiting for its retrying.
How can I postpone retrying uploading 'failed' item and proceed other items while this one is wating for its try?
Thanks in advance!
I put this function into retryWhen method and get it working.
public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetryCount;
private final int retryDelay;
private int retryCount;
private TimeUnit timeUnit;
public RetryWithDelay(final int maxRetryCount, final int retryDelay, final TimeUnit timeUnit) {
this.maxRetryCount = maxRetryCount;
this.retryDelay = retryDelay;
this.timeUnit = timeUnit;
this.retryCount = 0;
}
@Override
public Observable<?> apply(final Observable<? extends Throwable> attempts) {
return attempts.flatMap((Function<Throwable, Observable<?>>) throwable -> {
if (++retryCount < maxRetryCount) {
return Observable.timer(retryDelay, timeUnit);
}
return Observable.error(throwable);
});
}
}