spring-bootapache-kafkaspring-batchkafka-producer-apiitemwriter

Spring Batch : One Reader, composite processor (two classes with different entities) and two kafkaItemWriter


ItemReader is reading data from DB2 and gave java object ClaimDto. Now the ClaimProcessor takes in the object of ClaimDto and return CompositeClaimRecord object which comprises of claimRecord1 and claimRecord2 which to be sent to two different Kafka topics. How to write claimRecord1 and claimRecord2 to topic1 and topic2 respectively.


Solution

  • Just write a custom ItemWriter that does exactly that.

    public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
    
      private final ItemWriter<Record1> writer1;
      private final ItemWriter<Record2> writer2;
    
      public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
        this.writer1=writer1;
        this.writer2=writer2;
    }
    
      public void write(List<CompositeClaimRecord> items) throws Exception {
    
        for (CompositeClaimRecord record : items) {
           writer1.write(Collections.singletonList(record.claimRecord1));
           writer2.write(Collections.singletonList(record.claimRecord2));
    
        }
      }
    }
    

    Or instead of writing 1 record at a time convert the single list into 2 lists and pass that along. But error handling might be a bit of a challenge that way. \

    public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
    
      private final ItemWriter<Record1> writer1;
      private final ItemWriter<Record2> writer2;
    
      public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
        this.writer1=writer1;
        this.writer2=writer2;
    }
    
      public void write(List<CompositeClaimRecord> items) throws Exception {
    
        List<ClaimRecord1> record1List = items.stream().map(it -> it.claimRecord1).collect(Collectors.toList());
        List<ClaimRecord2> record2List = items.stream().map(it -> it.claimRecord2).collect(Collectors.toList());
    
        writer1.write(record1List);
        writer2.write(record2List);
    
    
      }
    }