javagoogle-cloud-platformgoogle-cloud-dataflowapache-beam

skip header while reading a CSV file in Apache Beam


I want to skip header line from a CSV file. As of now I'm removing the header manually before loading it to google storage.

Below is my code :

PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));        
    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {
            String[] strArr = c.element().split(",");
            ClassFinance fin = new ClassFinance();
            fin.setBeneficiaryFinance(strArr[0]);
            fin.setCatlibCode(strArr[1]);
            fin.set_rNR_(Double.valueOf(strArr[2]));
            fin.set_rNCS_(Double.valueOf(strArr[3]));
            fin.set_rCtb_(Double.valueOf(strArr[4]));
            fin.set_rAC_(Double.valueOf(strArr[5]));
            c.output(fin);
        }
    }));

I have checked the existing question in stackoverflow but I dont find it promising : Skipping header rows - is it possible with Cloud DataFlow?

Any help ?

Edit : I have tried something like below and it worked :

PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));       

    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {  
            String[] strArr2 = c.element().split(",");
            String header = Arrays.toString(strArr2);
            ClassFinance fin = new ClassFinance();

                if(header.contains("Beneficiary"))
                System.out.println("Header");
                else {
            fin.setBeneficiaryFinance(strArr2[0].trim());
            fin.setCatlibCode(strArr2[1].trim());
            fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
            fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
            fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
            fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
            c.output(fin);
            }
        }
    }));

Solution

  • The older Stack Overflow post that you shared (Skipping header rows - is it possible with Cloud DataFlow?) does contain the answer to your question.

    This option is currently not available in the Apache Beam SDK, although there is an open Feature Request in the Apache Beam JIRA issue tracker, BEAM-123. Note that, as of writing, this feature request is still open and unresolved, and it has been like that for 2 years already. However, it looks like some effort is being done in that sense, and the latest update in the issue is from February 2018, so I would advise you to stay updated on that JIRA issue, as it was last moved to the sdk-java-core component, and it may be getting more attention there.

    With that information in mind, I would say that the approach you are using (removing the header before uploading the file to GCS) is the best option for you. I would refrain from doing it manually, as you can easily script that and automate the remove headerupload file process.


    EDIT:

    I have been able to come up with a simple filter using a DoFn. It might not be the most elegant solution (I am not an Apache Beam expert myself), but it does work, and you may be able to adapt it to your needs. It requires that you know beforehand the header of the CSV files being uploaded (as it will be filtering by element content), but again, take this just as a template that you may be able to modify to your needs:

    public class RemoveCSVHeader {
      // The Filter class
      static class FilterCSVHeaderFn extends DoFn<String, String> {
        String headerFilter;
    
        public FilterCSVHeaderFn(String headerFilter) {
          this.headerFilter = headerFilter;
        }
    
        @ProcessElement
        public void processElement(ProcessContext c) {
          String row = c.element();
          // Filter out elements that match the header
          if (!row.equals(this.headerFilter)) {
            c.output(row);
          }
        }
      }
    
      // The main class
      public static void main(String[] args) throws IOException {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);
    
        PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));
    
        String header = "col1,col2,col3,col4";
    
        vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))
            .apply(TextIO.write().to("out"));
    
        p.run().waitUntilFinish();
      }
    }