The problem I'm trying to solve is expressed in the following code:
@Test
public void buffer_shouldZipAllTheThings() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
Observable<List<String>> strings = Observable.from(Arrays.asList("red", "blue", "yellow", "green")).buffer(3);
Observable<List<Integer>> integers = Observable.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)).buffer(3);
class Zipped {
final List<String> strings;
final List<Integer> integers;
Zipped(List<String> strings, List<Integer> integers) {
this.strings = strings;
this.integers = integers;
}
@Override public String toString() {
return "Strings: { " + strings.toString() + " }\n" + "Integers: { " + integers.toString() + " }";
}
}
Observable<Zipped> zipper = Observable.zip(strings, integers, new Func2<List<String>, List<Integer>, Zipped>() {
@Override public Zipped call(List<String> strings, List<Integer> integers) {
return new Zipped(strings, integers);
}
});
zipper.subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<Zipped>() {
@Override public void onCompleted() {
latch.countDown();
}
@Override public void onError(Throwable throwable) {
}
@Override public void onNext(Zipped zipped) {
System.out.println("-- New zipped object: ");
System.out.println(zipped.toString());
}
});
latch.await();
}
Here is the output of this test:
-- New zipped object:
Strings: { [red, blue, yellow] }
Integers: { [1, 2, 3] }
-- New zipped object:
Strings: { [green] }
Integers: { [4, 5, 6] }
Here is my desired output:
-- New zipped object:
Strings: { [red, blue, yellow] }
Integers: { [1, 2, 3] }
-- New zipped object:
Strings: { [green] }
Integers: { [4, 5, 6] }
-- New zipped object:
Strings: { [] }
Integers: { [7, 8, 9] }
-- New zipped object:
Strings: { [] }
Integers: { [10, 11] }
How about creating an infinite Observable
and use takeWhile
to stop the streaming?
public static void main(String[] args) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
Observable<List<String>> strings = Observable.from(Arrays.asList("red", "blue", "yellow", "green")).buffer(3);
Observable<List<Integer>> integers = Observable.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)).buffer(3);
Observable<List<String>> infiniteStrings = Observable.concat(strings, Observable.<List<String>>from(new ArrayList<String>()).repeat(Schedulers.newThread()));
Observable<List<Integer>> infiniteIntegers = Observable.concat(integers, Observable.<List<Integer>>from(new ArrayList<Integer>()).repeat(Schedulers.newThread()));
class Zipped {
final List<String> strings;
final List<Integer> integers;
Zipped(List<String> strings, List<Integer> integers) {
this.strings = strings;
this.integers = integers;
}
@Override
public String toString() {
return "Strings: { " + strings.toString() + " }\n" + "Integers: { " + integers.toString() + " }";
}
}
Observable<Zipped> zipper = Observable.zip(infiniteStrings, infiniteIntegers, new Func2<List<String>, List<Integer>, Zipped>() {
@Override
public Zipped call(List<String> strings, List<Integer> integers) {
System.out.println(strings + " " + integers);
return new Zipped(strings, integers);
}
}).takeWhile(new Func1<Zipped, Boolean>() {
@Override
public Boolean call(Zipped zipped) {
System.out.println(!zipped.strings.isEmpty() || !zipped.integers.isEmpty());
return !zipped.strings.isEmpty() || !zipped.integers.isEmpty();
}
});
zipper.subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<Zipped>() {
@Override
public void onCompleted() {
System.out.println("Finish");
latch.countDown();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
latch.countDown();
}
@Override
public void onNext(Zipped zipped) {
System.out.println("-- New zipped object: ");
System.out.println(zipped.toString());
}
});
latch.await();
}