javagoogle-cloud-dataflowapache-beamdataflowbeam-sql

Dataflow / Beam Accumulator coder


I am developing a Dataflow pipeline that uses the SqlTransform Library and also the beam aggregation function defined in org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf .

Here a slide of code:

import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;

public class EtlSqlTransformations extends PTransform<PCollection<Row>, PCollection<Row>> {
    @Override
    public PCollection<Row> expand(PCollection<Row> input) {
        String sql1 = "SELECT campaign_id, format_id, story_id, browser, device, os, " +
                "       COUNTIF(event_type = 'track_click_through') as clickthrough, " +
                "       FROM PCOLLECTION " +
                "       GROUP BY campaign_id, format_id, story_id, browser, device, os";

        PCollection<Row> groupedImpressions = input.apply("groupedImpressions",
                SqlTransform.query(sql1).registerUdaf("COUNTIF", new CountIf.CountIfFn()));
        return groupedImpressions;
    }
}

This is working fine when testing locally (I also create some test, that works fine):


        PCollection<Row> results = rowPCollection.apply("SQL TRANSFORM", new EtlSqlTransformations());

        // Expected output
        Row expectedRow = Row.withSchema(EtlSqlTransformations.outputSchema)
                .addValues("1044", ...)
                .build();

        PAssert.that(results).containsInAnyOrder(expectedRow);
        pipeline.run();

The problem is when I want to deploy in Google cloud with Dataflow, I have the following output:

[WARNING] 
java.lang.RuntimeException: java.io.IOException: Could not obtain a Coder for the accumulator
    at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:78)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
Caused by: java.io.IOException: Could not obtain a Coder for the accumulator
    at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:207)
    at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179)
    at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438)
    at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248)
    at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
    at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:75)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Cannot infer coder for type parameter AccumT
    at org.apache.beam.sdk.coders.CoderRegistry.getCoder (CoderRegistry.java:328)
    at org.apache.beam.sdk.transforms.CombineFnBase$AbstractGlobalCombineFn.getAccumulatorCoder (CombineFnBase.java:119)
    at org.apache.beam.sdk.transforms.Combine$CombineFn.getAccumulatorCoder (Combine.java:391)
    at org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter$WrappedCombinerBase.getAccumulatorCoder (AggregationCombineFnAdapter.java:75)
    at org.apache.beam.sdk.transforms.CombineFns$ComposedCombineFn.getAccumulatorCoder (CombineFns.java:430)
    at org.apache.beam.sdk.schemas.transforms.SchemaAggregateFn$Inner.getAccumulatorCoder (SchemaAggregateFn.java:335)
    at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:204)
    at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179)
    at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438)
    at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248)
    at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
    at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:75)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  57.006 s
[INFO] Finished at: 2021-10-25T13:54:23Z
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project connected-stories-analytics-logger-store: An exception occured while executing the Java class. java.io.IOException: Could not obtain a Coder for the accumulator: Cannot infer coder for type parameter AccumT -> [Help 1]

I search for this problem but I found nothing, just some similar cases where they have develop their own Agg functions and they need to define the CODER. In this case, I can't understand where to find the coder, or if I have to create it on my own, as I am using a Beam function (org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf).

So the questions are:

  1. Is there a way to use this beam function to add the coder that needs to run in Dataflow?
  2. If I need to change the function and create a new one and also the CODER, how can I do it?

Thanks


Solution

  • Ok, I solved this by implementing the COUTIF by myself.

        public static class CountConditional extends Combine.CombineFn<Boolean, Long, Long> {
        @Override
        public Long createAccumulator() {
            return Long.valueOf(0);
        }
    
        @Override
        public Long addInput(Long accumulator, Boolean input) {
            if (input) {
                ++accumulator;
            }
            return accumulator;
        }
    
        @Override
        public Long mergeAccumulators(Iterable<Long> accumulators) {
            Long v = Long.valueOf(0);
            Iterator<Long> ite = accumulators.iterator();
            while (ite.hasNext()) {
                v += ite.next();
            }
            return v;
        }
    
        @Override
        public Long extractOutput(Long accumulator) {
            return accumulator;
        }
    }
    

    I placed it in the same PTransform that use it, so I don't have to deal with coders.