spring-bootapache-kafkaspring-cloudspring-cloud-function

Kafka Binder doesn't apply serializer configuration


I'm using stream bridge in my app since the topic to send is decided at runtime based on URL path params; I build a message from the Request Body, Path Elements and call stream bridge send to a function for publishing to Kafka

@Bean
public RouterFunction<ServerResponse> webhooks() {
    return route().POST("/webhooks/v1/{cat}/{mat}/{key}", accept(MediaType.APPLICATION_JSON), (serverRequest) -> {
        String cat = serverRequest.pathVariable("cat");
        String mat = serverRequest.pathVariable("mat");
        String key = serverRequest.pathVariable("key");
        String logPrefix = serverRequest.exchange().getLogPrefix();
        log.debug("{}Received HTTP Payload for {}:{} with key {}", logPrefix, cat, mat, key);
        return serverRequest.bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {
                })
                .map(payload -> MessageBuilder.withPayload(payload)
                        .setHeader(catHeader, cat)
                        .setHeader(matHeader, mat)
                        .setHeader(keyHeader, key)
                        .setHeader(KafkaHeaders.MESSAGE_KEY, "someKey")
                        .setHeader(KafkaHeaders.TOPIC, String.join("-", cat, mat, namespace))
                        .setHeader(webhookRequestId, logPrefix)
                        .build())
                .map(message -> streamBridge.send("producer", message))
                .flatMap(message -> ServerResponse.accepted().build());
    }).build();
}

@Bean
public Function<Flux<Message<?>>, Flux<Message<?>>> producer() {
    return mapFlux -> mapFlux.map(m -> MessageBuilder.withPayload(m.getPayload()).copyHeaders(m.getHeaders()).build());
}

I then add the following properties to application yaml

spring:
  main:
    banner-mode: off
  mongodb:
    embedded:
      version: 3.4.6
  data:
    mongodb:
      port: 29129
      host: localhost
      database: howler_db
  kafka:
    binder:
      brokers: localhost:9952
  cloud:
    function:
      definition: producer
    stream:
      bindings:
        producer-out-0:
          useTopicHeader: true
          producer:
            configuration:
              retry:
                topic:
                  delay: 200
              key-serializer: org.apache.kafka.common.serialization.StringSerializer
              value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
              retries: 3
              max:
                block:
                  ms: 500
              enable:
                idempotence: true
              acks: all

My test however, fails with this exception.

Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert key of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in key.serializer
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:949) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914) ~[kafka-clients-3.1.1.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1087) ~[spring-kafka-2.8.7.jar:2.8.7]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655) ~[spring-kafka-2.8.7.jar:2.8.7]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:429) ~[spring-kafka-2.8.7.jar:2.8.7]
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:513) ~[spring-integration-kafka-5.5.13.jar:5.5.13]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1074) ~[spring-cloud-stream-3.2.4.jar:3.2.4]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:235) ~[spring-cloud-stream-3.2.4.jar:3.2.4]
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:170) ~[spring-cloud-stream-3.2.4.jar:3.2.4]
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:150) ~[spring-cloud-stream-3.2.4.jar:3.2.4]
    at com.gabbar.cloud.sambha.SholayWebFunctionConfiguration.lambda$webhooks$1(ShokayWebFunctionConfiguration.java:67) ~[classes/:na]

logs reveal producer key.serializer isn't set.

2022-07-11 18:24:15.500  INFO 15424 --- [ctor-http-nio-2] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [127.0.0.1:9952]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

Solution

  • Here's what works for me now.

    spring:
      embedded:
        kafka:
          brokers: localhost:9092
      cloud:
        stream:
          kafka:
            default:
              producer:
                useTopicHeader: true
            binder:
              autoCreateTopics: false
              producerProperties:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
                max.block.ms: 100