I am trying to Convert PCollection of Strings into Pcollection of BQ TableRow.
My Apache beam version is 2.41 and JAVA 11. I tried multiple ways but could not able to fix this error.
TableSchema is loaded from avro file and providing it to pcollection as ValueProvider.
Please help me to fix this.
Code:
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
options.setTempLocation("data/temp/");
Pipeline p = Pipeline.create(options);
BeamShemaUtil beamShemaUtil = new BeamShemaUtil("data/ship_data_schema.avsc");
TableSchema tableSchema = beamShemaUtil.convertBQTableSchema();
ValueProvider<TableSchema> ts= ValueProvider.StaticValueProvider.of(tableSchema);
PCollection<String> pc1 = p.apply(TextIO.read().from("data/ship_data.csv"));
PCollection<TableRow> pc2 = pc1.apply(MapElements.via(new ConvertStringToTableRow(ts))) ;
PipelineResult result = p.run();
result.waitUntilFinish();
SimpleFunction Class
public static class ConvertStringToTableRow extends SimpleFunction<String, TableRow> {
ValueProvider<TableSchema> tableSchema;
public ConvertStringToTableRow(ValueProvider<TableSchema> tableSchema) {
this.tableSchema = tableSchema;
}
public TableRow buildTableRow(TableSchema sc,String[] arr) {
List<TableFieldSchema> fieldSchemaList = sc.getFields();
List<String> data = Arrays.stream(arr).collect(Collectors.toList());
TableRow row = new TableRow();
TableCell record = new TableCell();
List<TableCell> tc = new ArrayList<TableCell>();
for ( int i = 0; i < fieldSchemaList.size(); i++ ){
TableFieldSchema sc2 = fieldSchemaList.get(i);
String fieldName = sc2.getName();
String fieldType = sc2.getType();
String fieldValue = data.get(i);
if (fieldValue.isEmpty()) {
record.set(fieldName,null);
tc.add(record);
}
else {
switch (fieldType) {
case "STRING":
record.set(fieldName,fieldValue);
tc.add(record);
case "BYTES":
record.set(fieldName,fieldValue.getBytes());
tc.add(record);
case "INT64":
record.set(fieldName,Integer.valueOf(fieldValue));
tc.add(record);
case "INTEGER":
record.set(fieldName,Integer.valueOf(fieldValue));
tc.add(record);
case "FLOAT64":
record.set(fieldName,Float.valueOf(fieldValue));
tc.add(record);
case "FLOAT":
record.set(fieldName,Float.valueOf(fieldValue));
tc.add(record);
case "BOOL":
case "BOOLEAN":
case "NUMERIC":
record.set(fieldName,Integer.valueOf(fieldValue));
tc.add(record);
case "TIMESTAMP":
case "TIME":
case "DATE":
case "DATETIME":
case "STRUCT":
case "RECORD":
default:
// row.set(fieldName,fieldValue);
// throw new UnsupportedOperationException("Unsupported BQ Data Type");
}
}
}
return row.setF(tc);
}
@Override
public TableRow apply(String element) {
String[] arr = element.split(",");
// BeamShemaUtil beamShemaUtil = new BeamShemaUtil("data/ship_data_schema.avsc");
// TableSchema tableSchema = beamShemaUtil.convertBQTableSchema();
TableRow row = buildTableRow(tableSchema.get(), arr);
return row;
}
Error Messages:
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.transforms.MapElements$1@270a620, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[], fieldAccessDescriptor=*}}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:59)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:737)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:268)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:877)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:264)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:225)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:191)
at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:248)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:788)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:803)
at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:274)
at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:290)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:268)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:218)
at org.apache.beam.runners.direct.DirectRunner.performRewrites(DirectRunner.java:254)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:175)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at BuildWriteBQTableRowExample01.main(BuildWriteBQTableRowExample01.java:50)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.model.TableSchema
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
... 26 more
Process finished with exit code 1
I propose you a solution, it's not perfect but I hope it can help.
You can use your own structure for table schema, and convert TableFieldSchema
to a custom created object that implements Serializable
, example :
public class MyTableSchemaFields implement Serializable {
private String fieldName;
private String fieldType;
// Constructor
.....
// Getters and setters
.......
}
public List<MyTableSchemaFields> toMyTableSchemaFields(final List<TableFieldSchema> schemaFields) {
return schemaFields.stream()
.map(this::toMyTableSchemaField)
.collect(Collectors.toList());
}
public List<MyTableSchemaFields> toMyTableSchemaField(final TableFieldSchema schemaField) {
MyTableSchemaFields field = new MyTableSchemaFields();
field.setFieldName(schemaField.getName());
field.setFieldType(schemaField.getType());
return field;
}
Then in the rest of your program, use MyTableSchemaFields
instead of TableFieldSchema
:
public static class ConvertStringToTableRow extends SerializableFunction<String, TableRow> {
List<MyTableSchemaFields> schemaFields;
public ConvertStringToTableRow(List<MyTableSchemaFields> schemaFields) {
this.schemaFields = schemaFields;
}
public TableRow buildTableRow(List<MyTableSchemaFields> schemaFields,String[] arr) {
...........
For the class ConvertStringToTableRow
I used a SerializableFunction
in my example instead of SimpleFunction
.