javaspring-bootxml-parsingspring-batchspring-batch-stream

Copy header tag in xml spring batch application


I am using spring-batch in spring-boot application. The Spring Boot version is 2.3.3.RELEASE.

What I intend to achieve

I have to read a xml file containing thousands of Transactions with header tag (fileInformation). Do some business logic on transaction and then write the file back with the updated values in transaction. I am using StaxEventItemReader for reading the file and StaxEventItemWriter for writing to the file. Then i have couple of ItemProcessors for handling the business logic. Xml file looks like :

<?xml version="1.0" encoding="UTF-8"?>
<reportFile>
   <fileInformation>
      <sender>200GH7XZ60</sender>
      <timestamp>2020-12-23T09:05:34Z</timestamp>
      <environment>PRO</environment>
      <version>001.60</version>
   </fileInformation>
   <record>
      <transaction>
         <buyer><buyer/>
      </transaction>
      <transaction>
         <buyer><buyer/>
      </transaction>
      <transaction>
         <buyer><buyer/>
      </transaction>
   </record>
</reportFile>

Problem that I am facing is with the value of header tags.

I have configured the OmegaXmlHeaderCallBack which generates the desired header tags but the value in those tags should be copied from the input file. As I am aware the StaxWriterCallback is initialized before reader, processor and writer. So I am not able to inject the value using late binding. This looked like a basic requirement, but couldn't find any solution on stackoverflow.

Here is the code snippets to configure spring batch job.

@Slf4j
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

@Autowired
PIExtractorItemProcessor pIExtractorItemProcessor;

@Autowired
JobBuilderFactory jobBuilderFactory;
 
@Autowired
StepBuilderFactory stepBuilderFactory;

@Value( "${eugateway.batch.chunk.size}" )
private int chunkSize;

@Bean
public Step jobStep(ItemStreamReader<CustomHeaderTransactionXmlElement> reader,
        CompositeItemProcessor<CustomHeaderTransactionXmlElement, 
        ProcessorWriterDto> processor,
        CompositeItemWriter<ProcessorWriterDto> writer,
        EdsClientItemWriteListener<ProcessorWriterDto> writeListener, 
        StepBuilderFactory stepBuilderFactory) {
    return stepBuilderFactory.get("extractAndReplacePersonalDataStep")
            .<CustomHeaderTransactionXmlElement, ProcessorWriterDto>chunk(chunkSize)
            .reader(reader)
            .processor(processor)
            .listener(writeListener)
            .writer(writer)
            .build();
}

@Bean
public Job extractPersonalDataJob(Step jobStep, JobResultListener jobListener,
        JobBuilderFactory jobBuilderFactory) {
    return jobBuilderFactory.get("extractAndReplacePersonalDataJob")
            .incrementer(new RunIdIncrementer())
            .start(jobStep)
            .listener(jobListener)
            .build();
}

@Bean
@StepScope
public ItemStreamReader<CustomHeaderTransactionXmlElement> itemReader(@Value("#{jobParameters[file.path]}") String path) {
    Jaxb2Marshaller transactionMarshaller = new Jaxb2Marshaller();
    transactionMarshaller.setClassesToBeBound (FileInformation.class, TransactionPositionReport.class);
    log.info("Generating StaxEventItemReader");

    return new StaxEventItemReaderBuilder<CustomHeaderTransactionXmlElement>()
            .name("headerTransaction")
            .resource(new FileSystemResource(new FileSystemResource(path)))
            .addFragmentRootElements("fileInformation", "transaction")
            .unmarshaller(transactionMarshaller)
            .build();
}

@Bean
@StepScope
OmegaXmlHeaderCallBack getOmegaXmlHeaderCallBack(@Value("#{jobExecutionContext['header.sender']}") String sender,
        @Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
        @Value("#{jobExecutionContext['header.environment']}") String environment,
        @Value("#{jobExecutionContext['header.version']}") String version){
    return new OmegaXmlHeaderCallBack(sender, timestamp, environment, version);
}

@Bean
@StepScope
OmegaXmlFooterCallBack getOmegaXmlFooterCallBack(){
    return new OmegaXmlFooterCallBack();
}

@StepScope
@Bean(name = "staxTransactionWriter")
public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
        @Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
        @Value("#{jobExecutionContext['header.environment']}") String environment,
        @Value("#{jobExecutionContext['header.version']}") String version) {
    String exportFilePath = "C:\\Users\\sasharma\\Documents\\TO_BE_DELETED\\eugateway\\outputfile.xml";
    Resource exportFileResource = new FileSystemResource(exportFilePath);

    Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
    marshaller.setSupportDtd(true);
    marshaller.setSupportJaxbElementClass(true);
    marshaller.setClassesToBeBound(TransactionPositionReport.class);

    return new StaxEventItemWriterBuilder<TransactionPositionReport>()
            .name("transactionWriter")
            .version("1.0")
            .resource(exportFileResource)
            .marshaller(marshaller)
            .rootTagName("reportFile")
            .headerCallback(getOmegaXmlHeaderCallBack(sender, timestamp, environment, version))
            .footerCallback(getOmegaXmlFooterCallBack())
            .shouldDeleteIfEmpty(true)
            .build();
}

@Bean
@StepScope
public PIExtractorItemProcessor extractItemProcessor() {
    log.info("Generating PIExtractorItemProcessor");
    return new PIExtractorItemProcessor();
}

@Bean
public PIRemoverItemProcessor removeItemProcessor() {
    log.info("Generating PIRemoverItemProcessor");
    return new PIRemoverItemProcessor();
}

@Bean
@StepScope
CompositeItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> extractAndRemoveItemProcessor() {
    log.info("Generating CompositeItemProcessor");
    CompositeItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> itemProcessor = new CompositeItemProcessor<>();
    itemProcessor.setDelegates((List<? extends ItemProcessor<?, ?>>) Arrays.asList(extractItemProcessor(), removeItemProcessor()));
    return itemProcessor;
}

@Bean
@StepScope
public EdsClientItemWriter<ProcessorWriterDto> edsClientItemWriter() {
    log.info("Generating EdsClientItemWriter");
    return new EdsClientItemWriter<>();
}

@Bean
@StepScope
public OmegaXmlFileWriter<ProcessorWriterDto> omegaXmlFileWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
        @Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
        @Value("#{jobExecutionContext['header.environment']}") String environment,
        @Value("#{jobExecutionContext['header.version']}") String version) {
    log.info("Generating OmegaXmlFileWriter");
    return new OmegaXmlFileWriter(staxTransactionItemWriter(sender, timestamp, environment, version));
}


@Bean
@StepScope
public CompositeItemWriter<ProcessorWriterDto> compositeItemWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
        @Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
        @Value("#{jobExecutionContext['header.environment']}") String environment,
        @Value("#{jobExecutionContext['header.version']}") String version) {
    log.info("Generating CompositeItemWriter");
    CompositeItemWriter<ProcessorWriterDto> compositeItemWriter = new CompositeItemWriter<>();
    compositeItemWriter.setDelegates(Arrays.asList(edsClientItemWriter(), omegaXmlFileWriter(sender, timestamp, environment, version)));
    return compositeItemWriter;
 }  
}

Below is the OmegaXmlHeaderCallBack class. Due to no late binding I always end up getting empty values in header tag.

@Slf4j
public class OmegaXmlHeaderCallBack implements StaxWriterCallback {
    private String sender;
    private String timestamp;
    private String environment;
    private String version;
    
    public OmegaXmlHeaderCallBack(String sender, String timestamp, String environment, String version) {
        super();
        this.sender = sender;
        this.timestamp = timestamp;
        this.environment = environment;
        this.version = version;
    }

    @Override
    public void write(XMLEventWriter writer) {
        XMLEventFactory factory = XMLEventFactory.newInstance();
        try {
            writer.add(factory.createStartElement("", "", "fileInformation"));

            writer.add(factory.createStartElement("", "", "sender"));
            writer.add(factory.createCharacters(sender));
            writer.add(factory.createEndElement("", "", "sender"));


            writer.add(factory.createStartElement("", "", "timestamp"));
            writer.add(factory.createCharacters(timestamp));
            writer.add(factory.createEndElement("", "", "timestamp"));

            writer.add(factory.createStartElement("", "", "environment"));
            writer.add(factory.createCharacters(environment));
            writer.add(factory.createEndElement("", "", "environment"));

            writer.add(factory.createStartElement("", "", "version"));
            writer.add(factory.createCharacters(version));
            writer.add(factory.createEndElement("", "", "version"));
            
            writer.add(factory.createEndElement("", "", "fileInformation"));
            
            writer.add(factory.createStartElement("", "", "record"));
            
        } catch (XMLStreamException e) {
            log.error("Error writing OMEGA XML Header: {}", e.getMessage());
            throw new OmegaXmlHeaderWriterException(e.getMessage());
        }
    }
}

Code for ItemProcessor is below. I am setting the header data into ExecutionContext which was intended to be read by headerCallback (sadly not going to happen).

@Slf4j
public class PIExtractorItemProcessor implements ItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> {

    @Autowired
    PersonalDataExtractor personalDataExtractor;
    
    @Value("#{jobParameters['submission.account']}") 
    private String subAccntId;
    
    @Value("#{stepExecution}")
    private StepExecution stepExecution;
    
    @Override
    public ProcessorWriterDto process(CustomHeaderTransactionXmlElement headerTransactionElement) throws Exception {
        FileInformation header = null;
        TransactionPositionReport transaction = null;
        if(headerTransactionElement instanceof FileInformation) {
            header = (FileInformation)headerTransactionElement;
            stepExecution.getExecutionContext().put("header.sender", header.getSender());
            stepExecution.getExecutionContext().put("header.timestamp", header.getTimestamp());
            stepExecution.getExecutionContext().put("header.environment", header.getEnvironment());
            stepExecution.getExecutionContext().put("header.version", header.getVersion());
            log.debug("Header {} found.", header.toString());
            return null;
        } else {
            transaction = (TransactionPositionReport)headerTransactionElement;
            log.debug("NO header info found for transaction {}", transaction.getProcessingDetails().getCustomerTransactionId());
            log.info("Extracting personal data for transaction customer id {} and create EDS requestDto.", transaction.getProcessingDetails().getCustomerTransactionId());
            ProcessorWriterDto transferObject = new ProcessorWriterDto();
            transferObject.setEdsRequestDtoList(personalDataExtractor.extract(transaction, subAccntId));
            transferObject.setTransaction(transaction);
            return transferObject;
        }
    }
}

Links that have been referred by me :


Solution

  • Your step is doing too much. I would beak things down to two steps:

    Here is a quick example:

    import java.io.IOException;
    import java.io.Serializable;
    import java.util.Map;
    
    import javax.xml.stream.XMLEventWriter;
    
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.JobParametersBuilder;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.item.ExecutionContext;
    import org.springframework.batch.item.xml.StaxWriterCallback;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @EnableBatchProcessing
    public class SO67909123 {
    
        @Bean
        public Step extractHeaderStep(StepBuilderFactory steps) {
            return steps.get("extractHeaderStep")
                    .tasklet((contribution, chunkContext) -> {
                        Map<String, Object> jobParameters = chunkContext.getStepContext().getJobParameters();
                        String inputFile = (String) jobParameters.get("file");
                        FileInformation fileInformation = extractFileInformation(inputFile);
                        ExecutionContext jobExecutionContext =  chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
                        jobExecutionContext.put("file.information", fileInformation);
                        return RepeatStatus.FINISHED;
                    }).build();
        }
    
        private FileInformation extractFileInformation(String inputFile) {
            // TODO extract header from inputFile
            FileInformation fileInformation = new FileInformation();
            fileInformation.sender = "200GH7XKDGO3GLZ60";
            fileInformation.version = "001.60";
            return fileInformation;
        }
    
        @Bean
        public Step processFile(StepBuilderFactory steps) {
            return steps.get("processFile")
                    .tasklet((contribution, chunkContext) -> { // Change this to a chunk-oriented tasklet
                        Map<String, Object> jobExecutionContext = chunkContext.getStepContext().getJobExecutionContext();
                        FileInformation fileInformation = (FileInformation) jobExecutionContext.get("file.information");
                        System.out.println("Step 2: " + fileInformation);
                        return RepeatStatus.FINISHED;
            }).build();
        }
        
        @Bean
        @StepScope
        public StaxWriterCallback staxWriterCallback(@Value("#{jobExecutionContext['file.information']}") FileInformation fileInformation) {
            return new StaxWriterCallback() {
                @Override
                public void write(XMLEventWriter writer) throws IOException {
                    // use fileInformation as needed here 
                }
            };
        }
    
        @Bean
        public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
            return jobs.get("job")
                    .start(extractHeaderStep(steps))
                    .next(processFile(steps))
                    .build();
        }
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new AnnotationConfigApplicationContext(SO67909123.class);
            JobLauncher jobLauncher = context.getBean(JobLauncher.class);
            Job job = context.getBean(Job.class);
            JobParameters jobParameters = new JobParametersBuilder()
                    .addString("file", "transactions.xml")
                    .toJobParameters();
            jobLauncher.run(job, jobParameters);
        }
    
        static class FileInformation implements Serializable {
            private String sender;
            private String version;
            // other fields
    
            @Override
            public String toString() {
                return "FileInformation{sender='" + sender + '\'' + ", version='" + version + '\'' + '}';
            }
        }
    
    }
    

    This example shows the idea. You only need to write the snippet that extracts an xml tag from the file (only the header, see TODO). The StaxWriterCallback in that example is a step-scoped bean and can use the header from the execution context. Other step-scoped components from step 2 can also be configured in the same way (processor, listener, etc).