I have some Java code which makes heavy use of the Stream API. It is critical that these streams are closed when we are finished consuming them, but we are struggling to come up with a robust solution.
I had an idea: this is already a mixed Java + Kotlin project, so let's try Kotlin's Sequence
And so I came up with this extension function which looks like it does just what we need:
fun <T> Stream<T>.asCloseableSequence() = sequence {
this@asCloseableSequence.use {
yieldAll(it.asSequence())
}
}
This works okay. The original Stream
is closed after we finish processing the Sequence
. However, if an exception occurs during processing, then the Stream is not closed.
What am I doing wrong here? My understanding of the use
function is that it should close the underlying resource even if an exception occurs. My thought was that the exception must be occurring even before use
is called, but if we add some prints
sequence {
println("entering sequence")
this@asCloseableSequence.use {
println("entering use")
yieldAll(it.asSequence())
}
}
then we can see entering use
is indeed printed, and yet the Stream is not closed.
The same thing happens if I use try/finally
instead of the use
function.
Here is a complete, (close to) minimal, reproducible example. (Note that the built in asSequence
function does not work even if no exception occurs, and use
does work if it is not used inside a sequence scope.)
import java.util.stream.Stream
import kotlin.streams.asSequence
import kotlin.test.Test
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class StreamClosingTests {
/**
* First, let's see if the built-in function does what we want.
*/
@Test
fun `using asSequence`() {
// Given a Stream that has been converted to a Sequence,
val closed = mutableListOf(false)
val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
val sequence = stream.asSequence()
// When we perform a terminal operation on the Sequence,
sequence.forEach { println(it) }
// Then the underlying Stream should be closed.
assertTrue(closed[0]) // Fails!
}
/**
* Since the above test fails, lets try using sequence scope instead
*/
@Test
fun `using SequenceScope and iterator`() {
val closed = mutableListOf(false)
val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
val sequence = sequence {
stream.use {
yieldAll(it.iterator())
}
}
sequence.forEach { println(it) }
assertTrue(closed[0]) // Passes!
}
@Test
fun `using SequenceScope and iterator and Exception occurs`() {
// Given a Stream that has been converted to a Sequence,
val closed = mutableListOf(false)
val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
val sequence = sequence {
stream.use {
yieldAll(it.iterator())
}
}
// When we perform a terminal operation on the Sequence and an exception occurs
assertFailsWith<RuntimeException> {
sequence.forEach { throw RuntimeException() }
}
// Then the underlying Stream should be closed.
assertTrue(closed[0]) // Fails!
}
/**
* Let's remove sequence and see if use works with just a plain old stream.
*/
@Test
fun `use without sequence`() {
// Given a Stream,
val closed = mutableListOf(false)
val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
// When we perform a terminal operation on the Stream and an exception occurs,
assertFailsWith<RuntimeException> {
stream.use {
it.forEach { throw RuntimeException() }
}
}
// Then the Stream should be closed.
assertTrue(closed[0]) // Passes!
}
}
(Side note: it is very possible that Streams and Sequences are poorly suited to our use case. But even so, I am very interested in why this doesn't work as I expect.)