I'm working on application with different resources which I need to close, while working with reactive streams.
I've got factory based on flyweight pattern which keeps references to objects, and they implements AutoCloseable interface. Problem is i'm using close() inside Autocloseable class, and here is my question: what is best solution to remove reference to closed resource inside factory? Can I throw some kind of event and catch it in factory, or after every action that can close resource should I iterate through references map and remove closed resources?
For better context: I'm using reactivex Observable which emits directory event (create, remove file/directory), and after every subscriber unsubscribed to it i'm closing WatchService which i'm using.
edit #1
Here how my factory class looks like:
public final class Factory {
private final ConcurrentHashMap<String, ReactiveStream> reactiveStreams = new ConcurrentHashMap<>();
public ReactiveStream getReactiveStream(Path path) throws IOException {
ReactiveStream stream = reactiveStreams.get(path.toString());
if (stream != null) return stream;
stream = new ReactiveStream(path);
reactiveStreams.put(path.toString(), stream);
return stream;
}
}
And here is how my ReactiveStream class look like:
public class ReactiveStream implements AutoCloseable {
(...)
private WatchService service;
private Observable<Event> observable;
public Observable<Event> getObservable() throws IOException {
(...) // where i create observable
return observable;
}
(...)
@Override
public void close() throws IOException {
service.close();
}
}
As you can see i've got factory which keeps references to ReactiveStream class, which close itself after it's observable will not be subscribed anymore (i did it in a way that i use doOnUnsubscribe(() -> close()) before using share() on observable, so when there will be no subscribers, doOnUnsubscribe will be invoked).
My question is, how can i remove reference from Factory to closed ReactiveStream after it will be closed?
edit #2
observable = Observable.fromCallable(new EventObtainer()).flatMap(Observable::from).subscribeOn(Schedulers.io()).repeat().doOnUnsubscribe(() -> {
try {
close();
} catch (IOException e) {
e.printStackTrace();
}
}).share();
Here is how i create my observable. EventObtainer is nested class in ReactiveStream which uses WatchService which need to be closed after every subscriber stops subscribing.
Today my colleague told me best solution to resolve this problem. So i've created interface:
@FunctionalInterface
public interface CustomClosable {
void onClosing();
}
And add reference of this interface to ReactiveStream in Constructor.
Also now i'm invoking onClosing.onClosing() where i need to close resources.
Thanks to that the factory class is responsible for declaring actions what should be done after its resources will be closed, and also i don't have circular dependency, so my ReactiveStream class can be reused many times.