I am using RXAndroidBle library to connect and read data from the BLE device. I have set the establishConnection
function to true
i.e. auto-connect to true. when the BleAlreadyConnectedException
is occurring I want to capture that exception and restart the flow of reading data because every time disposing and connecting to BLE device is creating issues. so better to keep the connection alive and re-read the data.
In onErrorResumeNext
i to re-call the functions writeStatus, readModelInfo,getReadings and so on. Now sure how would I achieve it.
device.establishConnection(true)
.flatMap(rxBleConnection -> {
rxBleConnection.discoverServices();
mRxBleConnection = rxBleConnection;
return Observable.just(rxBleConnection);
})
.flatMap(rxBleConnection -> rxBleConnection.setupNotification(TSDictionary.BATTERY_LEVEL,NotificationSetupMode.QUICK_SETUP).flatMap(it->it))
.flatMap(bytes -> writeStatus())
.flatMap(bytes->readModelInfo(bytes))
.flatMap(bytes -> getReadings(bytes))
.doOnNext(data->initializeErrorHistory(data))
.flatMap(data->getSequenceSize())
.flatMap(length ->getOperationInfo(length))
.doOnNext(data->initializeOperationInfo(data))
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends ArrayList<Map<Integer, TSDictionaryMetaData>>>>() {
@Override
public ObservableSource<? extends ArrayList<Map<Integer, TSDictionaryMetaData>>> apply(@io.reactivex.annotations.NonNull Throwable throwable) throws Exception {
if(throwable instanceof BleAlreadyConnectedException){
// i want to RECALL/restart the function call
// writeStatus ,readModelInfo,getReadings, initializeErrorHistory
// getSequenceSize , getOperationInfo, initializeOperationInfo
}
return null;
}
})
.subscribe(data -> {
}, e -> {
e.printStackTrace();
});
Put the onErrorResumeNext
closer to the connection code.
device.establishConnection(true)
.doOnNext(rxBleConnection -> {
rxBleConnection.discoverServices();
mRxBleConnection = rxBleConnection;
})
.onErrorResumeNext(throwable -> {
if (throwable instanceof BleAlreadyConnectedException) {
return Observable.just(mRxBleConnection);
}
return Observable.error(throwable);
})
.flatMap(rxBleConnection ->
rxBleConnection.setupNotification(TSDictionary.BATTERY_LEVEL,
NotificationSetupMode.QUICK_SETUP)
.flatMap(it->it)
)
.flatMap(bytes -> writeStatus())
.flatMap(bytes->readModelInfo(bytes))
.flatMap(bytes -> getReadings(bytes))
.doOnNext(data->initializeErrorHistory(data))
.flatMap(data->getSequenceSize())
.flatMap(length ->getOperationInfo(length))
.doOnNext(data->initializeOperationInfo(data))
.subscribe(data -> {
}, e -> {
e.printStackTrace();
});