
Is there any way to build an sftpSupplier splitter (batch producer) with Spring Cloud Stream and RabbitMQ binder?

I'm trying to implement a Spring Cloud Stream application to:

I've tried some different approaches, but I must be misunderstanding some things.

I've tried to model this case with a simple application, which works perfectly (generated with Spring Initializr and those relevant changes):


  <!-- ... -->

Java code

public class CloudFunctionConfiguration {

  private final StreamBridge streamBridge;

  public CloudFunctionConfiguration(final StreamBridge streamBridge) {
    this.streamBridge = streamBridge;

  Supplier<String> source() {
    return () -> "a,1,b,2,3,c";

  Consumer<String> splitter() {
    return string -> {
      Arrays.asList(string.split(",")).forEach(s -> {
        final var message = MessageBuilder
            .withPayload("{\"attribute\": \"".concat(s).concat("\"}"))
            .setHeader("some-header", "some-content")
        streamBridge.send("output", message);



This works as expected; all messages are published each and every time the polled supplier is invoked.

But things stop working when I change that simple String supplier for the SFTP supplier function.

I've generated a sample GZipped file with three records, and then I made these changes:

pom.xml (added)

      <version>${spring-function.version}</version><!-- 5.0.1 -->

Java code (new version)

public class CloudFunctionConfiguration {

  protected static final Logger LOGGER = LoggerFactory.getLogger(CloudFunctionConfiguration.class);

  private final GzipMessageParser parser;
  private final StreamBridge streamBridge;

  public CloudFunctionConfiguration(
      final GzipMessageParser parser,
      final StreamBridge streamBridge) {
    this.parser = parser;
    this.streamBridge = streamBridge;

  Consumer<Message<byte[]>> messageParser() {
    return gzipFile -> {
      parser.parse(gzipFile).forEach(message -> {
        LOGGER.info("SENDING MESSAGE {}", message);
        streamBridge.send("output", message);


application.properties (changes) [edit: fixed a copy/paste mistake]

# changed
# added

It's actually a little more complicated than that (I've configured a database and a metadata store), but I'll omit those extra configurations (for they seem to work fine).

When I run this app, all messages are sent to RabbitMQ, but I get some errors. This is what's in the log.

  1. There is a message conversion (the whole compressed file gets converted pretty much to itself; I guess Spring Cloud Stream does this when the internal funcion pipeline passes the file from the sftpSupplier to the messageParser function)
(...) DEBUG 781757 --- [sftp-demo] [boundedElastic-1] c.f.c.c.BeanFactoryAwareFunctionRegistry : Converted Message: GenericMessage [payload=byte[528], headers={file_remoteHostPort=localhost:2222, file_remoteFileInfo={"directory":false,"filename":"some-prefix_doc-files.GZ","link":false,"modified":1733178662000,"permissions":"rw-r--r--","remoteDirectory":"upload","size":528}, file_remoteDirectory=upload, id=8b2b9553-cb47-fdd3-111f-6c4d39af4593, contentType=application/octet-stream, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@6b5b5c4b, file_remoteFile=some-prefix_doc-files.GZ, timestamp=1733261637557}] to: GenericMessage [payload=byte[528], headers={file_remoteHostPort=localhost:2222, file_remoteFileInfo={"directory":false,"filename":"some-prefix_doc-files.GZ","link":false,"modified":1733178662000,"permissions":"rw-r--r--","remoteDirectory":"upload","size":528}, file_remoteDirectory=upload, id=8b2b9553-cb47-fdd3-111f-6c4d39af4593, contentType=application/octet-stream, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@6b5b5c4b, file_remoteFile=some-prefix_doc-files.GZ, timestamp=1733261637557}]
  1. I see that LOGGER.info("SENDING MESSAGE {}", message); entry, then a preSend hook log, a Publishing message (...) log and a postSend hook log.

  2. All looks nice, but then I see this:

(...) DEBUG 781757 --- [sftp-demo] [scheduling-1] c.f.c.c.BeanFactoryAwareFunctionRegistry : Invoking function sftpSupplier|messageParser
(...) DEBUG 781757 --- [sftp-demo] [scheduling-1] o.s.i.e.SourcePollingChannelAdapter      : Poll resulted in Message: GenericMessage [payload=MonoMap, headers={id=e531d10a-8edc-88fd-fe07-3bba0b950a48, timestamp=1733261637927}]
(...) DEBUG 823084 --- [sftp-demo] [scheduling-1] o.s.integration.handler.LoggingHandler   : bean '_org.springframework.integration.errorLogger.handler' for component '_org.springframework.integration.errorLogger' received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@4841e6], failedMessage=GenericMessage [payload=MonoMap, headers={id=c5903f2f-f4a7-1ca5-18e0-b76cb2f98fa6, timestamp=1733263993106}], headers={id=cb9ce0d7-bac6-0ba2-6e0e-7ac40ad989a1, timestamp=1733263993106}] for original GenericMessage [payload=MonoMap, headers={id=364cff1e-6120-f005-ee63-a34865009323, timestamp=1733263993106}]
(...) ERROR 823084 --- [sftp-demo] [scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@4841e6], failedMessage=GenericMessage [payload=MonoMap, headers={id=c5903f2f-f4a7-1ca5-18e0-b76cb2f98fa6, timestamp=1733263993106}]
Caused by: java.lang.IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: reactor.core.publisher.MonoMap

That is, Spring Cloud Stream is polling the function definition and automatically binding it to the RabbitMQ Binder. Since the poll results in a message with a Mono payload (I don't know why), the binder tries to convert the message and send it - and then it fails.

Curiously, my first working example does this polling as well, but this is the result:

DEBUG 759414 --- [demo] [scheduling-1] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'

And that's why it works, I think.

Then I've also tried the functional batch producer approach (by creating a Function<Message<String>, List<Message<String>>>), but I couldn't get the application to send the messages individually; RabbitMQ just gets one message with the serialized List.

It looks like I shouldn't be doing this thing this way, but by reading the docs I got the impression I could (and should?) compose functions (Spring Cloud Functions with my own) to easily build integration applications.

So how could I build such an application? Should I just do it with Spring Integration, or is there something I can do in code or configuration?


  • So, here is some kind of answer to your straggle: https://github.com/spring-cloud/spring-functions-catalog/pull/108.

    Honestly, even I had some obstacles to figure out how to make everything working together.

    With that sample, you can replace simple fileSupplier to your sftpSupplier requirements and so on.

    I didn't try with StreamBridge, but that is possible anyway, although it would disconnect the purpose of functions composition a bit.