javaspringspring-cloudproducer

How do I produce just one message using Spring Cloud Stream w/o deprecated @Output, or turn off polling?


I am trying to publish just one message to Kafka, using Spring Cloud without any deprecated classes/methods or annotations. I would also like to be able to easily change the payload.

So for all clarity, I am trying to not use the deprecated @Output annotation, nor any KafkaTemplate.

My configuration:

spring:
  cloud:
    stream:
      bindings:
        message-out-0:
          destination: ${spring.application.name}
          producer:
            key:
              serializer:
                type: string
                format: utf-8
                charset: utf-8
            value:
              serializer:
                type: string
                format: utf-8
                charset: utf-8

My code - what I have tried so far:

@Component
@RequiredArgsConstructor
public class ApplicationAnnouncer implements CommandLineRunner {
    
    private final MessageService messageService;
    
    @Override
    public void run(String... args) throws Exception {
        messageService.value = "Application started...";
        messageService.message();
    }
}

One attempt:

@Configuration
public class MessageService {
     public Object value;
     
     @Bean
     public Supplier<Message<?>> message () {
          return () -> MessageBuilder.withPayload(value).build();
     }
}

Another attempt:

@Configuration
public class MessageService {
     public Object value;
     
     @Bean
     public Supplier<Flux<?>> message () {
          return () -> Flux.fromStream(Stream.generate(() -> {
               try {
                    Thread.sleep(1000);
                    return value;
               } catch (Exception e) {
                    // ignore
               }
               return null;
          })).subscribeOn(Schedulers.elastic()).share();
     }
}

Output in console consumer for both attempts:

Hello World!
Hello World!
Hello World!
Hello World! // ... Repeated every second

The documentation states:

The framework provides a default polling mechanism (answering the question of "Who?") that will trigger the invocation of the supplier and by default it will do so every second (answering the question of "How often?").

But what if I don't want it to poll every second?

It is weird how I'm supplying the MessageService with the message... Is it configuration? Or is it a service?

I have yet to find the most basic example of just pushing ONE CUSTOMIZABLE MESSAGE to Kafka.


Solution

  • You can tap into the cloud-stream-bindings by using a StreamBridge:

    @Component
    @RequiredArgsConstructor
    public class ApplicationAnnouncer implements CommandLineRunner {
    
        private final StreamBridge streamBridge;
        
        @Override
        public void run(String... args) throws Exception {
            streamBridge.send("message-out-0", "Application started...");
        }
    }
    

    The first string is the binding-name provided in the application settings derived from the bean providing the function.

    You don't even need an actual bean from which the binding-name is derived. in that case, any name will do.


    You can find some samples here.