I'm trying to delete a file after processing it and placing it on a kafka topic using the SftpRemoteFileTemplate
. I have it working using a SftpOutboundGateway
, but according to EIS patterns we should be using the template. I am currently getting an exception saying end of the flow has been reached.
Exception:
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$Lambda$628/0x000000080104ab28@748f93bb) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.
at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.registerOutputChannelIfCan(BaseIntegrationFlowDefinition.java:3109) ~[spring-integration-core-6.1.2.jar:6.1.2]
at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.channel(BaseIntegrationFlowDefinition.java:275) ~[spring-integration-core-6.1.2.jar:6.1.2]
at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.channel(BaseIntegrationFlowDefinition.java:243) ~[spring-integration-core-6.1.2.jar:6.1.2]
at org.springframework.integration.dsl.BaseIntegrationFlowDefinition.nullChannel(BaseIntegrationFlowDefinition.java:2919) ~[spring-integration-core-6.1.2.jar:6.1.2]
at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow.intermediateDeleteFlow(AcousticToKafkaIntegrationFlow.java:95) ~[classes/:na]
at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$SpringCGLIB$$0.CGLIB$intermediateDeleteFlow$5(<generated>) ~[classes/:na]
at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$SpringCGLIB$$2.invoke(<generated>) ~[classes/:na]
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:258) ~[spring-core-6.0.11.jar:6.0.11]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331) ~[spring-context-6.0.11.jar:6.0.11]
at com.test.enterprisemarketingchannelactivation.engage.config.AcousticToKafkaIntegrationFlow$$SpringCGLIB$$0.intermediateDeleteFlow(<generated>) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:139) ~[spring-beans-6.0.11.jar:6.0.11]
... 18 common frames omitted
The code is trying to pull the headers out of the initial message, to create a path to the file on the remote server so the SftpRemoteFileTemplate
can call the remove()
method on it.
Routing Channel Flow:
.split(Files.splitter()
.markers(true)
.charset(StandardCharsets.UTF_8)
.firstLineAsHeader("myHeaders")
.applySequence(true))
.filter("!payload.contains('\"mark\":\"START\"')", p -> p.discardFlow(df -> df.channel("logStartOfFile")))
.log(LoggingHandler.Level.DEBUG, Constants.LOG_FLOW_CATEGORY, "payload")
.route("payload.contains('\"mark\":\"END\"')", r -> r.channelMapping("true", "deleteFileChannel")
.channelMapping("false", "publisherChannel"))
.get();
}
Delete File Flow:
@Bean
public IntegrationFlow deleteFileFlow(SftpRemoteFileTemplate remoteFileTemplate, MessageChannel deleteFileChannel) {
return IntegrationFlow.from(deleteFileChannel)
.transform((GenericTransformer<Message, String>) message -> {
var headers = message.getHeaders();
return headers.get(FileHeaders.REMOTE_DIRECTORY) + File.separator + headers.get(FileHeaders.REMOTE_FILE);
})
.handle(path -> remoteFileTemplate.remove(String.valueOf(path.getPayload())))
.get();
}
The exception is thrown on the .nullChannel()
method.
Any insight would help thank you.
EDIT Stack Trace with get()
instead of nullChannel
per Artem's Answer:
2023-09-05T11:03:30.006-05:00 ERROR 32726 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.integration.transformer.MessageTransformationException: Failed to transform Message in bean 'deleteFileFlow.transformer#0' for component 'deleteFileFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/thrivent/enterprisemarketingchannelactivation/engage/config/AcousticToKafkaIntegrationFlow.class]'; from source: 'bean method deleteFileFlow', failedMessage=GenericMessage [payload={"filePath":"/download/dummy_acoustic.csv","lineCount":3,"mark":"END"}, headers={file_remoteHostPort=transfer-campaign-us-2.goacoustic.com:22, file_remoteFileInfo={"directory":false,"filename":"dummy_acoustic.csv","link":false,"modified":1693588146000,"permissions":"rw-r-----","remoteDirectory":"/download/","size":1250}, sequenceNumber=5, file_lineCount=3, file_remoteDirectory=/download/, sequenceSize=0, correlationId=8f9f7566-f176-dfce-b24a-a34e043165b8, id=1914286a-6033-126f-354c-af9dc90b329f, file_marker=END, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@4579f791, file_remoteFile=dummy_acoustic.csv, timestamp=1693929810003}]
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:125)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:217)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:199)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:499)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:354)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:283)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:247)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:499)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:354)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:283)
at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:317)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:247)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:474)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:460)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class org.springframework.messaging.Message (java.lang.String is in module java.base of loader 'bootstrap'; org.springframework.messaging.Message is in unnamed module of loader 'app')
at org.springframework.integration.handler.LambdaMessageProcessor.invokeMethod(LambdaMessageProcessor.java:205)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:121)
at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:115)
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:119)
... 88 more
See that handle(MessageHandler)
contract. It is as simple as this:
@FunctionalInterface
public interface MessageHandler {
void handleMessage(Message<?> message) throws MessagingException;
}
Pay attention to the void
return type. So, this means that there is not going to be a reply message produced from this handler. Therefore no way to configure an output channel on this endpoint. That nullChannel()
is still a channel and according to messaging configuration this channel is set as an output channel for that endpoint, but since its handler is void
, the configuration is rejected with that error message.
So, you simply should remove nullChannel()
in the end of that flow. Use get()
to finish flow configuration - and you are good having that flow as a one-way.