javaspring-integrationspring-dsl

Delete File from remote server using SftpRemoteFileTemplate


I'm trying to delete a file after processing it and placing it on a kafka topic using the SftpRemoteFileTemplate. I have it working using a SftpOutboundGateway, but according to EIS patterns we should be using the template. I am currently getting an exception saying end of the flow has been reached.

Exception:

Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$Lambda$628/0x000000080104ab28@748f93bb) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.
    at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.registerOutputChannelIfCan(BaseIntegrationFlowDefinition.java:3109) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.channel(BaseIntegrationFlowDefinition.java:275) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.channel(BaseIntegrationFlowDefinition.java:243) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.nullChannel(BaseIntegrationFlowDefinition.java:2919) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow.intermediateDeleteFlow(AcousticToKafkaIntegrationFlow.java:95) ~[classes/:na]
    at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$SpringCGLIB$$0.CGLIB$intermediateDeleteFlow$5(<generated>) ~[classes/:na]
    at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$SpringCGLIB$$2.invoke(<generated>) ~[classes/:na]
    at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:258) ~[spring-core-6.0.11.jar:6.0.11]
    at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331) ~[spring-context-6.0.11.jar:6.0.11]
    at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$SpringCGLIB$$0.intermediateDeleteFlow(<generated>) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:139) ~[spring-beans-6.0.11.jar:6.0.11]
    ... 18 common frames omitted

The code is trying to pull the headers out of the initial message, to create a path to the file on the remote server so the SftpRemoteFileTemplate can call the remove() method on it.

Routing Channel Flow:

                              .split(Files.splitter()
                                          .markers(true)
                                          .charset(StandardCharsets.UTF_8)
                                          .firstLineAsHeader("myHeaders")
                                          .applySequence(true))
                              .filter("!payload.contains('\"mark\":\"START\"')", p -> p.discardFlow(df -> df.channel("logStartOfFile")))
                              .log(LoggingHandler.Level.DEBUG, Constants.LOG_FLOW_CATEGORY, "payload")
                              .route("payload.contains('\"mark\":\"END\"')", r -> r.channelMapping("true", "deleteFileChannel")
                                                                                   .channelMapping("false", "publisherChannel"))
                              .get();
    }

Delete File Flow:

    @Bean
    public IntegrationFlow deleteFileFlow(SftpRemoteFileTemplate remoteFileTemplate, MessageChannel deleteFileChannel) {
        return IntegrationFlow.from(deleteFileChannel)
                              .transform((GenericTransformer<Message, String>) message -> {
                                  var headers = message.getHeaders();
                                  return headers.get(FileHeaders.REMOTE_DIRECTORY) + File.separator + headers.get(FileHeaders.REMOTE_FILE);
                              })
                .handle(path -> remoteFileTemplate.remove(String.valueOf(path.getPayload())))
                              .get();
    }

The exception is thrown on the .nullChannel() method.

Any insight would help thank you.

EDIT Stack Trace with get() instead of nullChannel per Artem's Answer:

2023-09-05T11:03:30.006-05:00 ERROR 32726 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.integration.transformer.MessageTransformationException: Failed to transform Message in bean 'deleteFileFlow.transformer#0' for component 'deleteFileFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/thrivent/enterprisemarketingchannelactivation/engage/config/AcousticToKafkaIntegrationFlow.class]'; from source: 'bean method deleteFileFlow', failedMessage=GenericMessage [payload={"filePath":"/download/dummy_acoustic.csv","lineCount":3,"mark":"END"}, headers={file_remoteHostPort=transfer-campaign-us-2.goacoustic.com:22, file_remoteFileInfo={"directory":false,"filename":"dummy_acoustic.csv","link":false,"modified":1693588146000,"permissions":"rw-r-----","remoteDirectory":"/download/","size":1250}, sequenceNumber=5, file_lineCount=3, file_remoteDirectory=/download/, sequenceSize=0, correlationId=8f9f7566-f176-dfce-b24a-a34e043165b8, id=1914286a-6033-126f-354c-af9dc90b329f, file_marker=END, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@4579f791, file_remoteFile=dummy_acoustic.csv, timestamp=1693929810003}]
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:125)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:217)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:199)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:499)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:354)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:283)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:247)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:499)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:354)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:283)
    at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:317)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:247)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:474)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:460)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class org.springframework.messaging.Message (java.lang.String is in module java.base of loader 'bootstrap'; org.springframework.messaging.Message is in unnamed module of loader 'app')
    at org.springframework.integration.handler.LambdaMessageProcessor.invokeMethod(LambdaMessageProcessor.java:205)
    at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:121)
    at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:115)
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:119)
    ... 88 more

Solution

  • See that handle(MessageHandler) contract. It is as simple as this:

    @FunctionalInterface
    public interface MessageHandler {
    
        void handleMessage(Message<?> message) throws MessagingException;
    
    }
    

    Pay attention to the void return type. So, this means that there is not going to be a reply message produced from this handler. Therefore no way to configure an output channel on this endpoint. That nullChannel() is still a channel and according to messaging configuration this channel is set as an output channel for that endpoint, but since its handler is void, the configuration is rejected with that error message.

    So, you simply should remove nullChannel() in the end of that flow. Use get() to finish flow configuration - and you are good having that flow as a one-way.