javaspringspring-bootspring-kafka

Failing to handle deserialization exceptions in batch listener


I am migrating Spring boot from version 2 to version 3.

In my former implementation, I was based on v2.8.4 documentation and all was working fine.

Currently, ListenerUtils.byteArrayToDeserializationException is removed and I have to use SerializationUtils.byteArrayToDeserializationException.

For this, I am basing on v3.1.x documentation, but I have problems.

The example given in the documentation, precisely this part, does not compile:

DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
                    headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));

Theoretically what is passed is a byte[], but the method only admits a Header. So we would need a cast for that. With ListenerUtils I was casting it to byte[]. But now, I cast to Header or RecordHeader, I get the following error -> Foreign deserialization exception header ignored; possible attack? Something like the following:

DeserializationException exception = SerializationUtils.byteArrayToDeserializationException(
            new LogAccessor(IndexingService.class),
            new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
                (byte[]) headersOfNullItem.get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)));

I have tried to create a DeserializationExceptionHeader, but it is package-protected.

Am I missing something? How can I avoid that problem so the errors in deserialization are handled?

Edit (as Artem Bilan mentioned):

Instead of

@Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers

I use

@Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) Headers headers

And I test it as:

    private byte[] header(DeserializationException deserEx)
        {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            try
            {
                new ObjectOutputStream(baos).writeObject(deserEx);
            }
            catch (IOException e)
            {
                throw new UncheckedIOException(e);
            }
            return baos.toByteArray();
            }
       }
    
@Test
...
    Headers headers = new RecordHeaders();
    myObjects.forEach(myObject -> headers.add("traceparent", "00-f569e0cc279ef4a0-5d47674154157a2d-01".getBytes(StandardCharsets.UTF_8)));
    myObjects.add(null);
    DeserializationException deserializationException = new DeserializationException("aDeserializationException", null, false, null);
    Header header = new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(deserializationException));
    headers.add(header);
    
classUnderTest.myMethod(myObjects, headers)
...

But I still get the same error.

And also, when I run the application, I get this error:

org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [java.util.HashMap<?, ?>] to type [@org.springframework.messaging.handler.annotation.Header org.apache.kafka.common.header.Headers]

Solution

  • I found that giving the following works:

    List<ConsumerRecord<String, my_object>> my_method()
    

    which is different to the one which is given in the documentation and I was not able to use it successfully.