I am trying to zip
the list of zip
Observables
but the issue is I am only getting the same values from the zipped observables every time. The reason why I am doing this is to perform two operations 1st reading index
and 2nd reading data
from ble for a certain number of times -in following example it is 6 time.
Not sure how to handle this with RxJava2
here is the code snippet
private Observable<Pair<byte[],byte[]>> getValueFromIndication(RxBleConnection rxBleConnection){
final PublishSubject<Boolean> unsubscribeSubject = PublishSubject.create();
return Observable.zip(
rxBleConnection.setupIndication(Data.INDEX,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeSubject),
rxBleConnection.setupIndication(Data.DATA,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeSubject),
(bytes, bytes2) -> {
unsubscribeSubject.onNext(true);
return Pair.create(bytes,bytes2);
}
);
}
from my main stream i am first creating the list of Observables
and zip it and pass it
.flatMap(rxBleConnection -> {
List<Observable<Pair<byte[],byte[]>>> observableList = new ArrayList<>();
for(int i=0;i<6;i++){
//Creating list of observables so that 6 times this function gets fire
observableList.add(getValueFromIndication(rxBleConnection));
}
// Zipping Zipped list of observables
return Observable.zip(observableList,Data::OperationReadings);
}).subscribe(bytes->{
})
Here, I always get the same values in Data::OperationReadings
. Currently, I am getting the following data which I don't want.
each time same index and value
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
The expected data is as follows
each time different index and value
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [2] DATA [11,11,2,0,3,0]
INDEX [3] DATA [0,0,0,0,33,0]
INDEX [4] DATA [10,30,0,30,3,0]
INDEX [5] DATA [10,0,0,30,3,0]
INDEX [6] DATA [10,0,20,30,3,9]
The reason you get the same data repeated 6 times is that you subscribe to individual getValueFromIndication()
at the same time. Effectively every Observable run in parallel. You want to run each subscription in sequence. The solution could be to replace this:
return Observable.zip(observableList,Data::OperationReadings);
with:
return Observable.concat(observableList) // we want to subscribe each Observable from list after the previous one will complete
.toList() // we want to gather all results from individual Observables from the list — this returns a Single
.toObservable() // get back to the Observable class so the types will match
.map(Data::OperationReadings); // we map it into the OperationReadings class