google-cloud-dataflowapache-beamapache-beam-io

Getting unable to serialize DoFnWithExecutionInformation getting this error while building TableRow


I am trying to Convert PCollection of Strings into Pcollection of BQ TableRow. My Apache beam version is 2.41 and JAVA 11. I tried multiple ways but could not able to fix this error. TableSchema is loaded from avro file and providing it to pcollection as ValueProvider.
Please help me to fix this.

Code:

public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(DirectRunner.class);
        options.setTempLocation("data/temp/");
        Pipeline p = Pipeline.create(options);

        BeamShemaUtil beamShemaUtil = new BeamShemaUtil("data/ship_data_schema.avsc");
        TableSchema tableSchema = beamShemaUtil.convertBQTableSchema();
        ValueProvider<TableSchema> ts= ValueProvider.StaticValueProvider.of(tableSchema);

        PCollection<String>   pc1 = p.apply(TextIO.read().from("data/ship_data.csv"));
        PCollection<TableRow> pc2 = pc1.apply(MapElements.via(new ConvertStringToTableRow(ts))) ;
        PipelineResult result = p.run();
        result.waitUntilFinish();

SimpleFunction Class

 public static  class ConvertStringToTableRow extends SimpleFunction<String, TableRow> {
        ValueProvider<TableSchema>  tableSchema;
        public  ConvertStringToTableRow(ValueProvider<TableSchema> tableSchema) {
            this.tableSchema = tableSchema;
        }
public TableRow buildTableRow(TableSchema sc,String[] arr) {
            List<TableFieldSchema> fieldSchemaList = sc.getFields();
            List<String> data = Arrays.stream(arr).collect(Collectors.toList());
            TableRow row = new TableRow();
            TableCell record = new TableCell();
            List<TableCell> tc = new ArrayList<TableCell>();

            for ( int i = 0; i < fieldSchemaList.size(); i++  ){
                TableFieldSchema sc2 = fieldSchemaList.get(i);
                String fieldName = sc2.getName();
                String fieldType = sc2.getType();
                String fieldValue = data.get(i);

                if (fieldValue.isEmpty()) {
                    record.set(fieldName,null);
                    tc.add(record);
                }
                else {
                    switch (fieldType) {
                        case "STRING":
                           record.set(fieldName,fieldValue);
                           tc.add(record);
                        case "BYTES":
                            record.set(fieldName,fieldValue.getBytes());
                            tc.add(record);
                        case "INT64":
                            record.set(fieldName,Integer.valueOf(fieldValue));
                            tc.add(record);
                        case "INTEGER":
                            record.set(fieldName,Integer.valueOf(fieldValue));
                            tc.add(record);
                        case "FLOAT64":
                            record.set(fieldName,Float.valueOf(fieldValue));
                            tc.add(record);
                        case "FLOAT":
                            record.set(fieldName,Float.valueOf(fieldValue));
                            tc.add(record);
                        case "BOOL":
                        case "BOOLEAN":
                        case "NUMERIC":
                            record.set(fieldName,Integer.valueOf(fieldValue));
                            tc.add(record);
                        case "TIMESTAMP":
                        case "TIME":
                        case "DATE":
                        case "DATETIME":
                        case "STRUCT":
                        case "RECORD":
                        default:
                            //  row.set(fieldName,fieldValue);
                            //  throw new UnsupportedOperationException("Unsupported BQ Data Type");
                    }
                }

            }
            return  row.setF(tc);

        }
@Override
        public TableRow apply(String element) {
            String[] arr = element.split(",");

           // BeamShemaUtil beamShemaUtil = new BeamShemaUtil("data/ship_data_schema.avsc");
           // TableSchema tableSchema = beamShemaUtil.convertBQTableSchema();
            TableRow row = buildTableRow(tableSchema.get(), arr);
            return row;
        }

Error Messages:

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.transforms.MapElements$1@270a620, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[], fieldAccessDescriptor=*}}
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:59)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:737)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:268)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:877)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:264)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:225)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:191)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:248)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:788)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:803)
    at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:274)
    at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:290)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
    at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:268)
    at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:218)
    at org.apache.beam.runners.direct.DirectRunner.performRewrites(DirectRunner.java:254)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:175)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
    at BuildWriteBQTableRowExample01.main(BuildWriteBQTableRowExample01.java:50)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.model.TableSchema
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
    ... 26 more

Process finished with exit code 1

Solution

  • I propose you a solution, it's not perfect but I hope it can help.

    You can use your own structure for table schema, and convert TableFieldSchema to a custom created object that implements Serializable, example :

    public class MyTableSchemaFields implement Serializable {
         private String fieldName;
         private String fieldType;
         
         // Constructor
         .....
    
         // Getters and setters
         .......
    }
    
    public List<MyTableSchemaFields> toMyTableSchemaFields(final List<TableFieldSchema> schemaFields) {
        return schemaFields.stream()
                    .map(this::toMyTableSchemaField)
                    .collect(Collectors.toList());
    }
    
    
    public List<MyTableSchemaFields> toMyTableSchemaField(final TableFieldSchema schemaField) {
        MyTableSchemaFields field = new MyTableSchemaFields();
        field.setFieldName(schemaField.getName());
        field.setFieldType(schemaField.getType());
    
        return field;
    }
    

    Then in the rest of your program, use MyTableSchemaFields instead of TableFieldSchema :

    public static  class ConvertStringToTableRow extends SerializableFunction<String, TableRow> {
            List<MyTableSchemaFields>  schemaFields;
            public  ConvertStringToTableRow(List<MyTableSchemaFields> schemaFields) {
                this.schemaFields = schemaFields;
            }
    
    public TableRow buildTableRow(List<MyTableSchemaFields> schemaFields,String[] arr) {
                ...........
    

    For the class ConvertStringToTableRow I used a SerializableFunction in my example instead of SimpleFunction.