spring-integrationspring-cloud-streamspring-integration-aws

How to send custom header from spring-cloud-stream (aws kinesis binder) to "legacy" spring integration consumer


I have two applications - the first produces messages using spring-cloud-stream/function with the AWS Kinesis Binder, the second is an application that builds off of spring integration to consume messages. Communicating between the two is not a problem - I can send a message from "stream" and handle it easily in "integration".

When I want to send a custom header, then there is an issue. The header arrives at the consumer as an embedded header using the "New" format (Has an 0xff at the beginning, etc.) - See AbstractMessageChannelBinder#serializeAndEmbedHeadersIfApplicable in spring-cloud-stream.

However, the KinesisMessageDrivenChannelAdapter (spring-integration-aws) does not seem to understand the "new" embedded header form. It uses EmbeddedJsonHeadersMessageMapper (See #toMessage) which cannot "decode" the message. It throws a com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false') because of the additional information included in the embedded header (0xff and so on).

I need to send the header across the wire (the header is used to route on the other side), so it's not an option to "turn off" headers on the producer. I don't see a way to use the "old" embedded headers.

I'd like to use spring-cloud-stream/function on the producer side - it's awesome. I wish I could redo the consumer, but...

I could write my own embedded header mapper that understands the new format (use EmbeddedHeaderUtils), and wire it into the KinesisMessageDrivenChannelAdapter.

Given the close relationship between spring-cloud-stream and spring-integration, I must be doing something wrong. Does Spring Integration have an OutboundMessageMapper that understands the new embedded form?

Or is there a way to coerce spring cloud stream to use a different embedding strategy?

I could use Spring Integration on the producer side. (sad face).

Any thoughts? Thanks in advance.


Solution

  • understands the new format

    It's not a "new" format, it's a format that Spring Cloud Stream created, originally for Kafka, which only added header support in 0.11.

    I could write my own embedded header mapper that understands the new format (use EmbeddedHeaderUtils), and wire it into the KinesisMessageDrivenChannelAdapter.

    I suggest you do that, and consider contributing it to the core Spring Integration Project alongside the EmbeddedJsonHeadersMessageMapper so that it can be used with all technologies that don't support headers natively.