apache-kafkaspring-kafkaspring-cloud-streamspring-cloud-dataflow

Spring cloud streamBridge to Kafka delivery check


I have a few requirements for the spring cloud stream I have:

I have looked into using a Function, but I have not been able to resolve how to send multiple messages given one topic, I have also looked into using a Consumer and Supplier, but I can't see this working very well. The way I am currently sending the messages is using a Consumer and then sending via side effects using StreamBridge.

    @Bean
    @SuppressWarnings("unchecked")
    public Consumer<KStream<String, String>> generateMessage() {
        return messages -> {
            final Map<String, KStream<String, String>> splitMessages =
                    branchOutput(filterMessages(messages));

            KStream<String, MessageData>[] ksArray = splitMessages
                    .values()
                    .stream()
                    .map(message ->
                            message.mapValues((key, jsonMessage) -> {
                                try {
                                    return new MessageData(dataTransformService
                                            .transformMessage(key, jsonMessage, extractTopic(jsonMessage)),
                                            removeTopic(jsonMessage), "");
                                } catch (ClassNotFoundException e) {
                                    return new MessageData(Collections.singletonList(CLASS_NOT_FOUND_EXCEPTION),
                                            removeTopic(jsonMessage), e.getMessage());
                                }
                            }))
                    .toArray(KStream[]::new);

            ksArray[0].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
                    OUTPUT_BINDING_1, value.getOriginalMessage(), value.getError()));
            ksArray[1].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
                    OUTPUT_BINDING_2, value.getOriginalMessage(), value.getError()));
            ksArray[2].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
                    OUTPUT_BINDING_3, value.getOriginalMessage(), value.getError()));
            ksArray[3].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
                    OUTPUT_BINDING_4, value.getOriginalMessage(), value.getError()));
        };
    }

    // send message(s) to topic or forward to dlq if there is a message handling exception
    private void sendMessage(String key, List<String> transformedMessages, String binding, String originalMessage, String error) {
        try {
            for (String transformedMessage : transformedMessages) {
                if (!transformedMessage.equals(CLASS_NOT_FOUND_EXCEPTION)) {
                    boolean sendTest = streamBridge.send(binding,
                            new GenericMessage<>(transformedMessage, Collections.singletonMap(
                                    KafkaHeaders.KEY, (extractMessageId(transformedMessage)).getBytes())));

                    log.debug(String.format("message sent = %s", sendTest));

                } else {
                    log.warn(String.format("message transform error: %s", error));
                    streamBridge.send(DLQ_OUTPUT_BINDING,
                            new GenericMessage<>(originalMessage, Collections.singletonMap(KafkaHeaders.KEY,
                                    key.getBytes())));
                }
            }

        } catch (MessageHandlingException e) {
            log.warn(String.format("message send error: %s", e));
            streamBridge.send(DLQ_OUTPUT_BINDING,
                    new GenericMessage<>(originalMessage, Collections.singletonMap(KafkaHeaders.KEY,
                            key.getBytes())));

        }
    }

What I really need to know is if there is a better way of carrying out these requirements? If not, is there a way to check for acknowledgements from the external kafka cluster (I don't manage it) that we are sending to, so that the message can be resent if not received?


Solution

  • Kafka Streams does not allow you to receive records from one cluster and publish them to a different cluster after processing. All processing in a single topology must be done on the same cluster. See this related Stack Overflow thread. The way to get around this restriction, as required in your use case, is to manually send the records to the second cluster using something like StreamBridge, KafkaTemplate etc. Although this is not perfect, this is an acceptable solution in this case. However, with this approach, you lose any end-to-end guarantees provided by Kafka Streams. For example, when you run the whole topology on the same cluster, Kafka Streams gives you certain processing guarantees, such as exactly once, at least once, etc. If you want to preserve those Kafka Streams provided guarantees, you can use a strategy if you are willing to use another extra topic on the first cluster. Here is the basic idea.

    public Function<KStream<String, String>, KStream<...>> generateMessage() 
    

    So the above is an end-to-end Kafka Streams processor that operates on the same cluster. You produce the result into an outbound topic on the cluster. Then, you use the regular Message channel-based Kafka binder - spring-cloud-stream-binder-kafka to send the message to the second cluster.

    Function<String, String> passThroughToSecondCluster() {
    }
    

    You can leverage the multi-binder capabilities of Spring Cloud Stream to use the first cluster on the inbound and the second one on the outbound. Here is an example. Have a look at the configuration for more details.

    This way, you get the end-to-end guarantees of Kafka Streams,, and then through a separate processor, you send the records to the second cluster. The downside, obviously is that you need an extra topic on the first cluster.