spring-cloud-streamamazon-kinesisspring-integration-awsspring-cloud-stream-binderspring-cloud-stream-binder-kinesis

How to produce a message record which can be consumed by a spring cloud stream consumer?


From the producer side, I'm using Kinesis's PutRecord API to produce messages to the stream. From the consumer side, I'm using spring-cloud-stream-binder integration for kinesis.

I have adapted the code for building the PutRecordRequest object from org.springframework.integration.aws.outbound.KinesisMessageHandler

PutRecordRequest buildPutRecordRequest(Message<?> message) {
        MessageHeaders messageHeaders = message.getHeaders();
        Object payload = message.getPayload();

        ByteBuffer data = null;

        Message<?> messageToEmbed = null;
        logger.info("messageHeaders = {}", messageHeaders);
        logger.info("payload = {}", payload);

        if (payload instanceof ByteBuffer) {
            data = (ByteBuffer) payload;
            if (this.embeddedHeadersMapper != null) {
                messageToEmbed = new MutableMessage<>(data.array(), messageHeaders);
            }
        } else {
            byte[] bytes =
                    (byte[]) (payload instanceof byte[]
                            ? payload
                            : this.messageConverter.fromMessage(message, byte[].class));
            Assert.notNull(bytes, "payload cannot be null");
            if (this.embeddedHeadersMapper != null) {
                messageToEmbed = new MutableMessage<>(bytes, messageHeaders);
            } else {
                data = ByteBuffer.wrap(bytes);
            }
        }

        if (messageToEmbed != null) {
            try {
                byte[] bytes = this.embeddedHeadersMapper.fromMessage(messageToEmbed);
                Assert.notNull(bytes, "payload cannot be null");
                data = ByteBuffer.wrap(bytes);
            } catch (Exception ex) {
                throw new MessageConversionException(message, "Cannot embedded headers to payload", ex);
            }
        }

        byte[] bytes = (byte[]) this.messageConverter.fromMessage(message, byte[].class);
        data = ByteBuffer.wrap(bytes);


        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setData(data);

        
        String explicitHashKey = explicitHashKeyExtractor(event);

        putRecordRequest.setPartitionKey(partitionKey);
        putRecordRequest.setExplicitHashKey(explicitHashKey);
        putRecordRequest.setStreamName(streamName);
        return putRecordRequest;
    }
void push(Event event) {

        Message<Event> genericMessage = MessageBuilder.withPayload(event).setHeader(AwsHeaders.STREAM, streamName).build();
        PutRecordResult putRecordResult = kinesisClient.putRecord(buildPutRecordRequest(genericMessage));

    }

From the consumer, I'm getting the below error:

2022-09-12 16:01:38.266  INFO 816470 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception during sending a 'GenericMessage [payload=byte[319], headers={skip-input-type-conversion=false, aws_shard=shardId-000000000000, id=d5e996ce-af62-c8b3-850c-3f1ee21c23b6, sourceData={SequenceNumber: 49632961872623851461858864802247332802270083416600870914,ApproximateArrivalTimestamp: Mon Sep 12 16:01:36 IST 2022,Data: java.nio.HeapByteBuffer[pos=0 lim=319 cap=319],PartitionKey: 673354558,}, contentType=application/json, aws_receivedPartitionKey=673354558, aws_receivedStream=test_stream, aws_receivedSequenceNumber=49632961872623851461858864802247332802270083416600870914, timestamp=1662978698237}]'
for the '{SequenceNumber: 49632961872623851461858864802247332802270083416600870914,ApproximateArrivalTimestamp: Mon Sep 12 16:01:36 IST 2022,Data: java.nio.HeapByteBuffer[pos=0 lim=319 cap=319],PartitionKey: 673354558,}'.
Consider to use 'errorChannel' flow for the compensation logic.

Caused by: java.lang.ClassCastException: [B cannot be cast to demo.stream.Event
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:784) ~[spring-cloud-function-context-3.1.2.jar!/:3.1.2]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:589) ~[spring-cloud-function-context-3.1.2.jar!/:3.1.2]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:435) ~[spring-cloud-function-context-3.1.2.jar!/:3.1.2]

How should I modify my producer code so that my consumer can consume the message payload properly?


UPDATE

Using EmbeddedHeaderUtils along with the configuration consumer.headerMode set as embeddedHeaders worked for me. The following is the producer code to create a PutRecordRequest.

        Map<String, Object> map = new HashMap<>();
        map.put(AwsHeaders.STREAM, streamName); // sample header

        byte[] eventAsBytes = SerializationUtils.serialize(event);
        MessageValues messageValues = new MessageValues(eventAsBytes, map);

        byte[] bytes = EmbeddedHeaderUtils.embedHeaders(messageValues, AwsHeaders.STREAM);
        ByteBuffer data = ByteBuffer.wrap(bytes);

        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setData(data);

And the function signature for the consumer is

@Bean
    public Consumer<Message<byte[]>> processOrder(OrderRepository orders) {
        return message -> {

            byte[] bytes = (byte[]) message.getPayload();
            Event event = (Event) SerializationUtils.deserialize(bytes);
            // do something
        };
    }

Solution

  • Spring Cloud Stream AWS Kinesis Binder works by default in the HeaderMode.embeddedHeaders. Just because there is no records headers notion in the AWS Kinesis by itself.

    So, when you send records to the Kinesis stream manually, please, be sure that you follow an embedded headers algorithm use in Spring Cloud Stream - org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.

    Or consider to configure it to none for the respective Spring Cloud Stream consumer: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_consumer_properties