I am developing a Dataflow pipeline that uses the SqlTransform
Library and also the beam aggregation function defined in org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf
.
Here a slide of code:
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
public class EtlSqlTransformations extends PTransform<PCollection<Row>, PCollection<Row>> {
@Override
public PCollection<Row> expand(PCollection<Row> input) {
String sql1 = "SELECT campaign_id, format_id, story_id, browser, device, os, " +
" COUNTIF(event_type = 'track_click_through') as clickthrough, " +
" FROM PCOLLECTION " +
" GROUP BY campaign_id, format_id, story_id, browser, device, os";
PCollection<Row> groupedImpressions = input.apply("groupedImpressions",
SqlTransform.query(sql1).registerUdaf("COUNTIF", new CountIf.CountIfFn()));
return groupedImpressions;
}
}
This is working fine when testing locally (I also create some test, that works fine):
PCollection<Row> results = rowPCollection.apply("SQL TRANSFORM", new EtlSqlTransformations());
// Expected output
Row expectedRow = Row.withSchema(EtlSqlTransformations.outputSchema)
.addValues("1044", ...)
.build();
PAssert.that(results).containsInAnyOrder(expectedRow);
pipeline.run();
The problem is when I want to deploy in Google cloud with Dataflow, I have the following output:
[WARNING]
java.lang.RuntimeException: java.io.IOException: Could not obtain a Coder for the accumulator
at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:78)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
Caused by: java.io.IOException: Could not obtain a Coder for the accumulator
at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:207)
at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179)
at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438)
at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248)
at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:75)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Cannot infer coder for type parameter AccumT
at org.apache.beam.sdk.coders.CoderRegistry.getCoder (CoderRegistry.java:328)
at org.apache.beam.sdk.transforms.CombineFnBase$AbstractGlobalCombineFn.getAccumulatorCoder (CombineFnBase.java:119)
at org.apache.beam.sdk.transforms.Combine$CombineFn.getAccumulatorCoder (Combine.java:391)
at org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter$WrappedCombinerBase.getAccumulatorCoder (AggregationCombineFnAdapter.java:75)
at org.apache.beam.sdk.transforms.CombineFns$ComposedCombineFn.getAccumulatorCoder (CombineFns.java:430)
at org.apache.beam.sdk.schemas.transforms.SchemaAggregateFn$Inner.getAccumulatorCoder (SchemaAggregateFn.java:335)
at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:204)
at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179)
at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438)
at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248)
at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:75)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 57.006 s
[INFO] Finished at: 2021-10-25T13:54:23Z
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project connected-stories-analytics-logger-store: An exception occured while executing the Java class. java.io.IOException: Could not obtain a Coder for the accumulator: Cannot infer coder for type parameter AccumT -> [Help 1]
I search for this problem but I found nothing, just some similar cases where they have develop their own Agg functions and they need to define the CODER.
In this case, I can't understand where to find the coder, or if I have to create it on my own, as I am using a Beam function (org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf
).
So the questions are:
Thanks
Ok, I solved this by implementing the COUTIF by myself.
public static class CountConditional extends Combine.CombineFn<Boolean, Long, Long> {
@Override
public Long createAccumulator() {
return Long.valueOf(0);
}
@Override
public Long addInput(Long accumulator, Boolean input) {
if (input) {
++accumulator;
}
return accumulator;
}
@Override
public Long mergeAccumulators(Iterable<Long> accumulators) {
Long v = Long.valueOf(0);
Iterator<Long> ite = accumulators.iterator();
while (ite.hasNext()) {
v += ite.next();
}
return v;
}
@Override
public Long extractOutput(Long accumulator) {
return accumulator;
}
}
I placed it in the same PTransform that use it, so I don't have to deal with coders.