spring-integrationspring-integration-sftp

Spring integration move file after produce message to kafka


First of all you can find my code at https://github.com/keiseithunder/spring-sftp-xml-to-json/blob/main/src/main/java/com/demo/sftp/SftpApplication.java

I have a spring integration that read file from SFTP server into a stream and send to kafka and then trying to move a remote file to other directory in the same remote server after succesfully produce the message. But, the file doesn't move at all and didn't throw any error.

I have try using this code to move a file from upload/test.xml (the file in upload/file/test.xml is a backup that I use to copy to upload directory) to upload/processed/test.xml.

  @Bean
  @ServiceActivator(inputChannel = "success")
  public MessageHandler handler() {
    return new SftpOutboundGateway(sftpSessionFactory(), "mv", "");
  }

I already set file_renameTo=upload/processed/test.xml in the headers. Not sure, what I do wrong. Or there is a way to use something like advice.setOnSuccessExpressionString("@template.copy(headers['file_remoteDirectory']+'/'+headers['file_remoteFile'])"); to move a file?

My Message is

"GenericMessage [payload=Note(to=Toves, from=Jani, heading=Reminder, body=Don't forget me this weekend!!!!!), headers={file_remoteHostPort=localhost:2222, file_remoteFileInfo={"directory":false,"filename":"test.xml","link":false,"modified":1674550080000,"permissions":"rw-r--r--","remoteDirectory":"upload","size":122}, kafka_messageKey=test.xml, file_remoteDirectory=upload, kafka_recordMetadata=test-0@298, file_renameTo=upload/processed/test.xml, id=708d04c4-5abc-9f45-e83b-1fea7ffa5e8d, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@2e0aa05, file_remoteFile=test.xml, timestamp=1674556856288}]"

PS. I have try using debugger to see what wrong and found that it seem to raise an error at

    private String obtainRemoteFilePath(Message<?> requestMessage) {
Error here----> String remoteFilePath = this.fileNameProcessor.processMessage(requestMessage);
        Assert.state(remoteFilePath != null,
                () -> "The 'fileNameProcessor' evaluated to null 'remoteFilePath' from message: " + requestMessage);
        return remoteFilePath;
    }

"class com.demo.sftp.models.Note cannot be cast to class java.lang.String (com.demo.sftp.models.Note is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @387f9ed2; java.lang.String is in module java.base of loader 'bootstrap')"

EDIT 1: Add solution

  1. need to define an InboundChannelAdapter as PublishSubscribeChannel like
  @Bean
  public MessageChannel streamChannel() {
    return new PublishSubscribeChannel();
  }
  1. Then add OutboundGateway as the LOWEST_PRECEDENCE order like
  @Bean
  @Order(Ordered.LOWEST_PRECEDENCE)
  @ServiceActivator(inputChannel = "streamChannel")
  public MessageHandler moveFile() {
    SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), Command.MV.getCommand(),
        "headers['file_remoteDirectory'] + '/' + headers['file_remoteFile']");
    sftpOutboundGateway
        .setRenameExpressionString(
            "headers['file_remoteDirectory'] + '/processed/' +headers['timestamp'] + '-' + headers['file_remoteFile']");
    sftpOutboundGateway.setRequiresReply(false);
    sftpOutboundGateway.setUseTemporaryFileName(true);
    sftpOutboundGateway.setOutputChannelName("nullChannel");
    sftpOutboundGateway.setOrder(Ordered.LOWEST_PRECEDENCE);
    sftpOutboundGateway.setAsync(true);
    return sftpOutboundGateway;
  }
  1. Extra: in case we want to move the file that have error(from errorChannel). We need to change setRenameExpressionString to match the ErrorMessage structure. Eg.
  @Bean
  @Order(Ordered.LOWEST_PRECEDENCE)
  @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
  public MessageHandler moveErrorFile() {
    SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), Command.MV.getCommand(),
        "payload['failedMessage']['headers']['file_remoteDirectory'] + '/' + payload['failedMessage']['headers']['file_remoteFile']");
    sftpOutboundGateway
        .setRenameExpressionString(
            "payload['failedMessage']['headers']['file_remoteDirectory'] + '/error/' + payload['failedMessage']['headers']['timestamp'] + '-' + payload['failedMessage']['headers']['file_remoteFile']");
    sftpOutboundGateway.setRequiresReply(false);
    sftpOutboundGateway.setUseTemporaryFileName(true);
    sftpOutboundGateway.setOutputChannelName("nullChannel");
    sftpOutboundGateway.setOrder(Ordered.HIGHEST_PRECEDENCE);
    sftpOutboundGateway.setAsync(true);
    return sftpOutboundGateway;
  }

Solution

  • As artem-bilan answer in Spring integration - SFTP rename or move file in remote server after copying I can solve the issue by change my InboundChannelAdapter to PublishSubscribeChannel and create new subscriber like below

     @Bean
      @Order(Ordered.LOWEST_PRECEDENCE)
      @ServiceActivator(inputChannel = "streamChannel")
      public MessageHandler moveFile() {
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), Command.MV.getCommand(),
            "headers['file_remoteDirectory'] + '/' + headers['file_remoteFile']");
        sftpOutboundGateway
            .setRenameExpressionString("headers['file_remoteDirectory'] + '/processed/' +headers['timestamp'] + '-' + headers['file_remoteFile']");
        sftpOutboundGateway.setRequiresReply(false);
        sftpOutboundGateway.setUseTemporaryFileName(true);
        sftpOutboundGateway.setOutputChannelName("nullChannel");
        sftpOutboundGateway.setOrder(Ordered.LOWEST_PRECEDENCE);
        sftpOutboundGateway.setAsync(true);
        return sftpOutboundGateway;
      }