javarx-javaflowabledisposable

How to send cancel signal to ConnectableFlowable?


I use disposable Flowable to emit and subscribe items. But when I try to use ConnectableFlowable I can not send cancel signal to emitter. How can I understand flowable is disposed inside Flowable.create method?

You can see the scenario by comment and uncomment 'publish().autoConnect()' code snipped.

Disposable disposable = Flowable.create(emitter -> {
        AtomicBoolean isRunning = new AtomicBoolean(true);
        AtomicInteger i = new AtomicInteger();
        new Thread(() -> {
            while (isRunning.get()) {
                i.getAndIncrement();
                System.out.println("Emitting:" + i.get());
                emitter.onNext(i.get());
                try {
                    Thread.sleep(1_000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        emitter.setCancellable(() -> {
            System.out.println("Cancelled");
            isRunning.set(false);
        });
    }, BackpressureStrategy.BUFFER)
            .publish() //comment here
            .autoConnect() //and here
            .subscribe(s -> {
                System.out.println("Subscribed:" + s);
            });

    Thread.sleep(10_000);
    disposable.dispose();
    Thread.sleep(100_000);

Solution

  • There is an overload that gives you access to the Disposable to cancel the connection:

    SerialDisposable sd = new SerialDisposable();
    
    source.publish().autoConnect(1, sd::set);
    
    // ...
    
    sd.dispose();