I am creating a RxJava2
chain where in I want to enable and disable notification. the flow I am setting is as follows.
READ_STATUS UUID
.zero
then perform a write byte 01
to WRITE_STATUS UUID
and after WRITE_STATUS
, enable the notification of READ_STATUS UUID
to verify it has byte value 1
.1
then just enable other indicators (UUID1
, UUID2
,UUD3
) and read the value.I have a problem at step 2 and 3 where I am reading the value of READ_STATUS UUID
by enabling the notification. in order to re-read the value, I probably need to disable the notification and then again enable it. And to disable to the notification I have to dispose that particular setupNotification
.
Code is as follows
connectDisposable=
device.establishConnection(false)
.flatMap(rxBleConnection -> {
rxBleConnection.discoverServices();
mRxBleConnection = rxBleConnection;
return Observable.just(rxBleConnection);
})
.flatMap(rxBleConnection ->mRxBleConnection.setupNotification(READ_STATUS,NotificationSetupMode.QUICK_SETUP).flatMap(it->it))
.takeUntil(bytes -> {
if(getByteValue(bytes)==0)
return false;// dispose above to disable the notification
else
return true; // no need to disable the notification and continue writing
})
.flatMap(bytes -> {
return Observable.zip(
mRxBleConnection.writeCharacteristic(WRITE_STATUS, new byte[]{1}).toObservable(),
// setupNotification again to check whether read status has 1 or not
mRxBleConnection.setupNotification(READ_STATUS, NotificationSetupMode.QUICK_SETUP).flatMap(it->it),
Pair::new
);
})
.flatMap(bytes ->{
byte [] val= bytes.first;
if(getByteValue(val) == 1){
return Observable.zip(
mRxBleConnection.setupIndication(HISTORY, NotificationSetupMode.QUICK_SETUP).doOnNext(observable -> Log.e(TAG,"Here 1 ")).flatMap(it -> it),
mRxBleConnection.setupIndication(PARAMCHECK, NotificationSetupMode.QUICK_SETUP).doOnNext(observable -> Log.e(TAG,"Here 2 ")).flatMap(it -> it),
mRxBleConnection.setupIndication(FAULTINFO, NotificationSetupMode.QUICK_SETUP).doOnNext(observable -> Log.e(TAG,"Here 3 ")).flatMap(it -> it),
Data::Readings);
}
return Observable.empty();
}).subscribe(data -> {
});
The problem with this code is my takeUntil
is firing at the last it does not dispose the previous setupNotificaion
operation so that I can re read it later.
I tried solution mentioned over this thread but unfortunately I am not sharing the RxBleConnection
The problem with this code is my takeUntil is firing at the last it does not dispose the previous setupNotificaion operation so that I can re read it later.
The problem is that your condition is inverted. From .takeUntil()
Javadoc:
* @return an Observable that first emits items emitted by the source Observable, checks the specified * condition after each item, and then completes when the condition is satisfied.
You have used:
.takeUntil(bytes -> {
if(getByteValue(bytes)==0)
return false;// dispose above to disable the notification
else
return true; // no need to disable the notification and continue writing
})
where it should be satisfied (return true) when the upstream should get disposed:
.takeUntil(bytes -> {
if(getByteValue(bytes)==0)
return true;// dispose above to disable the notification
else
return false; // no need to disable the notification and continue writing
})