kotlinjava-streamkotlin-java-interop

Ensure that a Stream is closed after converting to Sequence (or: why do "use" and "finally" not work in a sequence)


I have some Java code which makes heavy use of the Stream API. It is critical that these streams are closed when we are finished consuming them, but we are struggling to come up with a robust solution.

I had an idea: this is already a mixed Java + Kotlin project, so let's try Kotlin's Sequence

And so I came up with this extension function which looks like it does just what we need:

fun <T> Stream<T>.asCloseableSequence() = sequence {
    this@asCloseableSequence.use {
        yieldAll(it.asSequence())
    }
}

This works okay. The original Stream is closed after we finish processing the Sequence. However, if an exception occurs during processing, then the Stream is not closed.

What am I doing wrong here? My understanding of the use function is that it should close the underlying resource even if an exception occurs. My thought was that the exception must be occurring even before use is called, but if we add some prints

sequence {
    println("entering sequence")

    this@asCloseableSequence.use {
        println("entering use")
        yieldAll(it.asSequence())
    }
}

then we can see entering use is indeed printed, and yet the Stream is not closed.

The same thing happens if I use try/finally instead of the use function.

Here is a complete, (close to) minimal, reproducible example. (Note that the built in asSequence function does not work even if no exception occurs, and use does work if it is not used inside a sequence scope.)

import java.util.stream.Stream
import kotlin.streams.asSequence
import kotlin.test.Test
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue

class StreamClosingTests {

    /**
     * First, let's see if the built-in function does what we want.
     */
    @Test
    fun `using asSequence`() {

        // Given a Stream that has been converted to a Sequence,
        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
        val sequence = stream.asSequence()

        // When we perform a terminal operation on the Sequence,
        sequence.forEach { println(it) }

        // Then the underlying Stream should be closed.
        assertTrue(closed[0]) // Fails!
    }

    /**
     * Since the above test fails, lets try using sequence scope instead
     */
    @Test
    fun `using SequenceScope and iterator`() {

        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
        val sequence = sequence {
            stream.use {
                yieldAll(it.iterator())
            }
        }

        sequence.forEach { println(it) }

        assertTrue(closed[0]) // Passes!
    }

    @Test
    fun `using SequenceScope and iterator and Exception occurs`() {

        // Given a Stream that has been converted to a Sequence,
        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
        val sequence = sequence {
            stream.use {
                yieldAll(it.iterator())
            }
        }

        // When we perform a terminal operation on the Sequence and an exception occurs
        assertFailsWith<RuntimeException> {
            sequence.forEach { throw RuntimeException() }
        }

        // Then the underlying Stream should be closed.
        assertTrue(closed[0]) // Fails!
    }

    /**
     * Let's remove sequence and see if use works with just a plain old stream.
     */
    @Test
    fun `use without sequence`() {

        // Given a Stream,
        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }

        // When we perform a terminal operation on the Stream and an exception occurs,
        assertFailsWith<RuntimeException> {
            stream.use {
                it.forEach { throw RuntimeException() }
            }
        }

        // Then the Stream should be closed.
        assertTrue(closed[0]) // Passes!
    }
}

(Side note: it is very possible that Streams and Sequences are poorly suited to our use case. But even so, I am very interested in why this doesn't work as I expect.)


Solution

  • Both Stream and Sequence don't really have a concept of stopping consuming. They don't know if someone still reads from them or finished already, we can't distinguish one from another. Additionally, Kotlin sequences generally support consuming them multiple times (although this is not required), so we can't assume they should be always closed after consuming all items.

    Your solution doesn't work, because it detects only the case when we consumed the last item - only then it exits the use(). If we stop consuming before the last item, it just waits in yieldAll() line forever (well, not exactly). We don't really have to throw an exception. Use things like first(), take() or find() and you will have the same problem - because we didn't get to the last item in the stream and we never exited use().

    Assuming we lost all references to the sequence (which is true in most cases), I think it will eventually close the stream. When garbage collector decides to destroy the sequence, it will probably cancel its underlying coroutine and as a result we will exit use() as well, closing the stream. But it won't happen straight after we stopped consuming the sequence.

    I'm not sure if you noticed that, but even if Stream is a subtype of AutoCloseable, it also doesn't automatically call close(). stream.forEach { println(it) } won't close the stream. Streams are auto-closeable only to allow people to close them explicitly, by using them in try-with-resources. Which you did in your use without sequence example.

    I suggest using exactly the same approach with sequences. Create a closeable sequence and then expect users of your code to properly and explicitly close the sequence, for example by using use().

    @Test
    fun `use with closeable sequence`() {
    
        // Given a Stream,
        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
    
        stream.asCloseableSequence().use { seq ->
            seq.take(1).forEach { println(it) }
        }
        // Then the Stream should be closed.
        assertTrue(closed[0]) // Passes!
    }
    
    interface CloseableSequence<T> : Sequence<T>, AutoCloseable
    
    fun <T> Stream<T>.asCloseableSequence() = object : CloseableSequence<T> {
        override fun iterator(): Iterator<T> = this@asCloseableSequence.iterator()
        override fun close() = this@asCloseableSequence.close()
    }
    

    If you want to force users of your code to close the sequence, common solution is to not provide the access to the sequence directly, but only inside a lambda and close the sequence automatically upon finishing:

    @Test
    fun `use with closing provider`() {
    
        // Given a Stream,
        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
    
        stream.asSequenceClosingProvider().use { seq ->
            seq.take(1).forEach { println(it) }
        }
        // Then the Stream should be closed.
        assertTrue(closed[0]) // Passes!
    }
    
    interface ClosingProvider<T> {
        fun <R> use(block: (T) -> R): R
    }
    
    fun <T> Stream<T>.asSequenceClosingProvider() = object : ClosingProvider<Sequence<T>> {
        override fun <R> use(block: (Sequence<T>) -> R): R = this@asSequenceClosingProvider.use { block(it.asSequence()) }
    }
    

    It looks very similar to the first example, but the difference is that the ClosingProvider isn't a sequence itself, so users can't consume it outside of use().