filespring-integrationspring-integration-dslspring-integration-filespring-batch-integration

Spring file integration: Error FileSystemException while writing File using FileWritingMessageHandler


I have created a flow that reads two files as input and copies them to a 'work' directory and then transforms them into a JobRequest. The payload is a JobExecution which contains the two files as parameters. So, I launch the batch then I want to get the two files from the jobExecution and transform them into two GenericMessage with Split. Finally I run a subflow that will write these files to another 'archive' directory. The program generates an exception:

2022-04-25 14:56:39.974 ERROR 4556 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: failed to write Message payload to file in the [bean 'startBatchFlow.subFlow#2.file:outbound-gateway#0' for component 'startBatchFlow.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [/config/FilesReaderIntegrationConfig.class]'; from source: 'bean method startBatchFlow']; nested exception is java.nio.file.FileSystemException: C:\dev\Fichiers\test\work\Fonds.xlsx -> C:\dev\Fichiers\test\archive\Fonds.xlsx: Le processus ne peut pas accéder au fichier car ce fichier est utilisé par un autre processus, failedMessage=GenericMessage [payload=C:\dev\Fichiers\test\work\Fonds.xlsx, headers={sequenceNumber=1, file_name=Fonds.xlsx, sequenceSize=0, correlationId=14ecf13d-c99f-eff1-ae60-31400d606c2d, file_originalFile=C:\dev\Fichiers\test\work\Fonds.xlsx, id=16c09968-1833-d430-1542-ea1959364565, file_relativePath=Fonds.xlsx, timestamp=1650889605032}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.file.FileWritingMessageHandler.handleRequestMessage(FileWritingMessageHandler.java:543)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)

here is my code:

    @Bean
    public IntegrationFlow startBatchFlow() throws InterruptedException {
        return IntegrationFlows //
                .from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(5000)))//
                .handle(fileWritingMessageHandler())
                .aggregate(agg -> agg.correlationExpression("payload != null")
                        .releaseStrategy(new ExpressionEvaluatingReleaseStrategy("size == 2"))
                )
                .transform(fileMessageToJobRequest()) //
                .handle(jobLaunchingMessageHandler()) //
                .wireTap(sf -> {
                    sf.handle(jobExecution ->
                            System.out.println("job Execution payload" + jobExecution.getPayload()));
                })
                .split(new MessageSplitter())
                .wireTap(sf -> {
                    sf.handle(jobExecution ->
                            System.out.println("Message after split payload =" + jobExecution.getPayload()));
                })
                .route(Message.class, message -> message.getPayload() != null,
                        mapping -> mapping.subFlowMapping(true, archiveFilesFlow())
                )
                .get();
    }

    public IntegrationFlow archiveFilesFlow() {
        return f -> f
                .handle(fileWritingInArchive())
                .handle(message -> {
                    System.out.println("here in archive flow" + message.getPayload());
                });
    }

    public FileWritingMessageHandler fileWritingInArchive() {
        FileWritingMessageHandler fileWritingMessageHandler = new FileWritingMessageHandler(new File(archivePath));
        fileWritingMessageHandler.setDeleteSourceFiles(true);
        fileWritingMessageHandler.setExpectReply(true);
        return fileWritingMessageHandler;
    }

and the MessageSplitter:

@Service
public class MessageSplitter extends AbstractMessageSplitter {
    
    @Override
    protected Object splitMessage(Message<?> message) {
        List<File> messages = new ArrayList();
        ((JobExecution) message.getPayload()).getJobParameters().getParameters().forEach((key, jobParam) -> {
            messages.add(new File((String) jobParam.getValue()));
        });
        Iterator<MessageBuilder<File>> builderIterator = new Iterator<MessageBuilder<File>>() {
            private File next;
            private int index = 0;

            @Override
            public boolean hasNext() {
                if (this.next != null) { // handle multiple hasNext() calls.
                    return true;
                }
                while (index < messages.size()) {
                    this.next = messages.get(index);
                    index++;
                    if (index == messages.size()) {
                        return false;
                    }
                    return true;
                }
                return false;
            }

            @Override
            public MessageBuilder<File> next() {
                File message = this.next;
                this.next = null;
                return MessageBuilder
                        .withPayload(message).setHeader("file_name", message.getName())
                        .setHeader("file_originalFile",message)
                        .setHeader("file_relativePath",message.getName());

            }
        };
        return builderIterator;
    }
}

Thanks in advance.


Solution

  • There is an exception in your logs:

    Le processus ne peut pas accéder au fichier car ce fichier est utilisé par un autre processus

    Which translates from French as:

    The process cannot access the file because this file is in use by another process

    So, your C:\dev\Fichiers\test\work\Fonds.xlsx is opened somewhere, but not closed.

    Perhaps your job is doing something with the file content but does not close it in the end. The Unix-type operation systems are forgiving, but Windows doesn't let to do something with the file until its resource is not closed in other process.