javacompletable-futurehelidon

DataChunkInputStream causing IOException in Helidon MessageBodyFilter


I am using Helidon SE and trying to create a MessageBodyFilter to modify certain fields on the incoming request payload. When trying to override the apply function as you see below, we are getting an IOException thrown stating that the input stream has been closed...

@Override
    public Flow.Publisher<DataChunk> apply(final Flow.Publisher<DataChunk> dataChunkPublisher) {
        final var span = tracer.buildSpan("request-preprocessing")
                .asChildOf(spanContext)
                .start();
        try (final InputStream dataChunkInputStream = new DataChunkInputStream(dataChunkPublisher)) {
            return Single.create(CompletableFuture.supplyAsync(() -> Contexts.runInContext(ctx, () -> {
                try {
                    final var jsonNode = OM.readTree(dataChunkInputStream);

This is only happening when we create the DataChunkInputStream in a try-with-resources, and not when we create it outside of it. The exception gets thrown in the last line you see where we try to read the stream with the ObjectMapper


Solution

  • You are consuming the resource inside the CompletableFuture.supplyAsync, which means it would be executing on its own separate thread at a later point as decided by the thread executor. However, your resource dataChunkInputStream would be closed immediately by the try block as soon as the enclosed Single.create() returns - which is always before the spawned thread gets a chance to read it. This is a typical mistake we all do when we to mix async coding style with a synchronous approach.

    The below should work. (Excuse the syntax errors if any, I typed this inline here in the answer)

    public Flow.Publisher<DataChunk> apply(final Flow.Publisher<DataChunk> dataChunkPublisher) {
        final var span = tracer.buildSpan("request-preprocessing")
                .asChildOf(spanContext)
                .start();
        
        return Single.create(CompletableFuture.supplyAsync(() -> Contexts.runInContext(ctx, () -> {
                    final var jsonNode = OM.readTree(dataChunkInputStream);
                    dataChunkInputStream.close();
                   }));
    }