From the producer side, I'm using Kinesis's PutRecord API to produce messages to the stream. From the consumer side, I'm using spring-cloud-stream-binder integration for kinesis.
I have adapted the code for building the PutRecordRequest object from org.springframework.integration.aws.outbound.KinesisMessageHandler
PutRecordRequest buildPutRecordRequest(Message<?> message) {
MessageHeaders messageHeaders = message.getHeaders();
Object payload = message.getPayload();
ByteBuffer data = null;
Message<?> messageToEmbed = null;
logger.info("messageHeaders = {}", messageHeaders);
logger.info("payload = {}", payload);
if (payload instanceof ByteBuffer) {
data = (ByteBuffer) payload;
if (this.embeddedHeadersMapper != null) {
messageToEmbed = new MutableMessage<>(data.array(), messageHeaders);
}
} else {
byte[] bytes =
(byte[]) (payload instanceof byte[]
? payload
: this.messageConverter.fromMessage(message, byte[].class));
Assert.notNull(bytes, "payload cannot be null");
if (this.embeddedHeadersMapper != null) {
messageToEmbed = new MutableMessage<>(bytes, messageHeaders);
} else {
data = ByteBuffer.wrap(bytes);
}
}
if (messageToEmbed != null) {
try {
byte[] bytes = this.embeddedHeadersMapper.fromMessage(messageToEmbed);
Assert.notNull(bytes, "payload cannot be null");
data = ByteBuffer.wrap(bytes);
} catch (Exception ex) {
throw new MessageConversionException(message, "Cannot embedded headers to payload", ex);
}
}
byte[] bytes = (byte[]) this.messageConverter.fromMessage(message, byte[].class);
data = ByteBuffer.wrap(bytes);
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setData(data);
String explicitHashKey = explicitHashKeyExtractor(event);
putRecordRequest.setPartitionKey(partitionKey);
putRecordRequest.setExplicitHashKey(explicitHashKey);
putRecordRequest.setStreamName(streamName);
return putRecordRequest;
}
void push(Event event) {
Message<Event> genericMessage = MessageBuilder.withPayload(event).setHeader(AwsHeaders.STREAM, streamName).build();
PutRecordResult putRecordResult = kinesisClient.putRecord(buildPutRecordRequest(genericMessage));
}
From the consumer, I'm getting the below error:
2022-09-12 16:01:38.266 INFO 816470 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception during sending a 'GenericMessage [payload=byte[319], headers={skip-input-type-conversion=false, aws_shard=shardId-000000000000, id=d5e996ce-af62-c8b3-850c-3f1ee21c23b6, sourceData={SequenceNumber: 49632961872623851461858864802247332802270083416600870914,ApproximateArrivalTimestamp: Mon Sep 12 16:01:36 IST 2022,Data: java.nio.HeapByteBuffer[pos=0 lim=319 cap=319],PartitionKey: 673354558,}, contentType=application/json, aws_receivedPartitionKey=673354558, aws_receivedStream=test_stream, aws_receivedSequenceNumber=49632961872623851461858864802247332802270083416600870914, timestamp=1662978698237}]'
for the '{SequenceNumber: 49632961872623851461858864802247332802270083416600870914,ApproximateArrivalTimestamp: Mon Sep 12 16:01:36 IST 2022,Data: java.nio.HeapByteBuffer[pos=0 lim=319 cap=319],PartitionKey: 673354558,}'.
Consider to use 'errorChannel' flow for the compensation logic.
Caused by: java.lang.ClassCastException: [B cannot be cast to demo.stream.Event
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:784) ~[spring-cloud-function-context-3.1.2.jar!/:3.1.2]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:589) ~[spring-cloud-function-context-3.1.2.jar!/:3.1.2]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:435) ~[spring-cloud-function-context-3.1.2.jar!/:3.1.2]
How should I modify my producer code so that my consumer can consume the message payload properly?
UPDATE
Using EmbeddedHeaderUtils along with the configuration consumer.headerMode
set as embeddedHeaders
worked for me. The following is the producer code to create a PutRecordRequest.
Map<String, Object> map = new HashMap<>();
map.put(AwsHeaders.STREAM, streamName); // sample header
byte[] eventAsBytes = SerializationUtils.serialize(event);
MessageValues messageValues = new MessageValues(eventAsBytes, map);
byte[] bytes = EmbeddedHeaderUtils.embedHeaders(messageValues, AwsHeaders.STREAM);
ByteBuffer data = ByteBuffer.wrap(bytes);
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setData(data);
And the function signature for the consumer is
@Bean
public Consumer<Message<byte[]>> processOrder(OrderRepository orders) {
return message -> {
byte[] bytes = (byte[]) message.getPayload();
Event event = (Event) SerializationUtils.deserialize(bytes);
// do something
};
}
Spring Cloud Stream AWS Kinesis Binder works by default in the HeaderMode.embeddedHeaders
. Just because there is no records headers notion in the AWS Kinesis by itself.
So, when you send records to the Kinesis stream manually, please, be sure that you follow an embedded headers algorithm use in Spring Cloud Stream - org.springframework.cloud.stream.binder.EmbeddedHeaderUtils
.
Or consider to configure it to none
for the respective Spring Cloud Stream consumer: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_consumer_properties