I'm new to RxJava, and I am wondering how I can create a configurable Observable? Let's imagine I could write a DB-to-DB transfer like this:
srcDb.getObservable(Bean.class)
.sql(selectSql)
.params(selectParams)
.subscribe(
trgDb.getSubscriber(Bean.class)
.sql(insertSql)
);
I can already do that with the Subscriber, but how can I get some small configuration in the same fashion to the Observable itself?
Maybe I found an acceptable way around this. It seems that what I need to do here is a double-binding outside of the Observable instantiation itself. E.g. I need a DbObservable and DbOnSubscribe pair which is counting on each other, something like this:
DbObservable class:
public class DbObservable<T> extends Observable<T> {
//Some parameter
private String sql;
protected DbObservable(DbOnSubscribe<T> onSub) {
super(onSub);
}
//Getter for DbOnSubscribe
public String getSql() {
return sql;
}
//Chain parameter modifier
public DbObservable<T> sql(String sql) {
this.sql = sql;
return this;
}
}
DbOnSubscribe class:
public class DbOnSubscribe<T> implements Observable.OnSubscribe<T> {
private DbObservable<T> dbObservable;
@Override
public void call(Subscriber<? super T> subscriber) {
String sql = dbObservable.getSql(); //Access SQL param
subscriber.onNext( (T) sql ); //Use subscriber
subscriber.onCompleted();
}
//Set back-reference
public void setDbObservable(DbObservable<T> dbObservable) {
this.dbObservable = dbObservable;
}
}
And finally our assumed DbConnector class:
public class DbConnector {
public DbObservable<String> getObservable() {
DbOnSubscribe<String> onSub = new DbOnSubscribe<String>();
DbObservable<String> obs = new DbObservable<>(onSub);
onSub.setDbObservable(obs);
return obs;
}
}
So when I try it out ...
public class DbObservableTest {
public static void main(String[] args) {
DbConnector srcDb = new DbConnector();
srcDb.getObservable()
.sql("some SQL")
.subscribe(System.out::println);
}
}
... it really works! It prints out the "some SQL".
Conclusion