I'm using stream bridge in my app since the topic to send is decided at runtime based on URL path params; I build a message from the Request Body, Path Elements and call stream bridge send to a function for publishing to Kafka
@Bean
public RouterFunction<ServerResponse> webhooks() {
return route().POST("/webhooks/v1/{cat}/{mat}/{key}", accept(MediaType.APPLICATION_JSON), (serverRequest) -> {
String cat = serverRequest.pathVariable("cat");
String mat = serverRequest.pathVariable("mat");
String key = serverRequest.pathVariable("key");
String logPrefix = serverRequest.exchange().getLogPrefix();
log.debug("{}Received HTTP Payload for {}:{} with key {}", logPrefix, cat, mat, key);
return serverRequest.bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {
})
.map(payload -> MessageBuilder.withPayload(payload)
.setHeader(catHeader, cat)
.setHeader(matHeader, mat)
.setHeader(keyHeader, key)
.setHeader(KafkaHeaders.MESSAGE_KEY, "someKey")
.setHeader(KafkaHeaders.TOPIC, String.join("-", cat, mat, namespace))
.setHeader(webhookRequestId, logPrefix)
.build())
.map(message -> streamBridge.send("producer", message))
.flatMap(message -> ServerResponse.accepted().build());
}).build();
}
@Bean
public Function<Flux<Message<?>>, Flux<Message<?>>> producer() {
return mapFlux -> mapFlux.map(m -> MessageBuilder.withPayload(m.getPayload()).copyHeaders(m.getHeaders()).build());
}
I then add the following properties to application yaml
spring:
main:
banner-mode: off
mongodb:
embedded:
version: 3.4.6
data:
mongodb:
port: 29129
host: localhost
database: howler_db
kafka:
binder:
brokers: localhost:9952
cloud:
function:
definition: producer
stream:
bindings:
producer-out-0:
useTopicHeader: true
producer:
configuration:
retry:
topic:
delay: 200
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 3
max:
block:
ms: 500
enable:
idempotence: true
acks: all
My test however, fails with this exception.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert key of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in key.serializer
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:949) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914) ~[kafka-clients-3.1.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1087) ~[spring-kafka-2.8.7.jar:2.8.7]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655) ~[spring-kafka-2.8.7.jar:2.8.7]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:429) ~[spring-kafka-2.8.7.jar:2.8.7]
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:513) ~[spring-integration-kafka-5.5.13.jar:5.5.13]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136) ~[spring-integration-core-5.5.13.jar:5.5.13]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.13.jar:5.5.13]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1074) ~[spring-cloud-stream-3.2.4.jar:3.2.4]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.13.jar:5.5.13]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.13.jar:5.5.13]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.13.jar:5.5.13]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.13.jar:5.5.13]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.13.jar:5.5.13]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.13.jar:5.5.13]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.13.jar:5.5.13]
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:235) ~[spring-cloud-stream-3.2.4.jar:3.2.4]
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:170) ~[spring-cloud-stream-3.2.4.jar:3.2.4]
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:150) ~[spring-cloud-stream-3.2.4.jar:3.2.4]
at com.gabbar.cloud.sambha.SholayWebFunctionConfiguration.lambda$webhooks$1(ShokayWebFunctionConfiguration.java:67) ~[classes/:na]
logs reveal producer key.serializer
isn't set.
2022-07-11 18:24:15.500 INFO 15424 --- [ctor-http-nio-2] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [127.0.0.1:9952]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
Here's what works for me now.
spring:
embedded:
kafka:
brokers: localhost:9092
cloud:
stream:
kafka:
default:
producer:
useTopicHeader: true
binder:
autoCreateTopics: false
producerProperties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
max.block.ms: 100