javaapache-beam

How to create an empty PCollection<KV<String, Object>>


Im trying to create an empty PCollection of a custom Object called Incident

public class Incident implements Serializable {
    private Integer incidentId;
    private String appId;
    private Long minutes;
     
   // getter setter 
}

I was trying with the following:

PCollection<KV<String, Incident>> incidents = pipeline.apply("Create Empty Collection", Create.empty(TypeDescriptor.of(KV.class)))
                .setTypeDescriptor(TypeDescriptor.of(KV.class, String.class, Incident.class));

But it gives me compilation error:

Cannot resolve method 'of(Class<KV>, Class<String>, Class<Incident>)'


Solution

  • One way to do it:

    1. Create a custom Coder for your custom Object/Class:
    import org.apache.beam.sdk.coders.*;
    import org.checkerframework.checker.initialization.qual.Initialized;
    import org.checkerframework.checker.nullness.qual.NonNull;
    import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    
    public class IncidentCoder extends AtomicCoder<Incident> {
    
        private final Coder<Integer> incidentIdCoder = NullableCoder.of(BigEndianIntegerCoder.of());
        private final Coder<Long> minutesCoder = NullableCoder.of(SerializableCoder.of(Long.class));
        private final Coder<String> appIdCoder = NullableCoder.of(StringUtf8Coder.of());
    
    
        @Override
        public void encode(Incident value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull @Initialized IOException {
            appIdCoder.encode(value.getAppId(), outStream);
            minutesCoder.encode(value.getMinutes(), outStream);
            incidentIdCoder.encode(value.getIncidentId(), outStream);
        }
    
        @Override
        public Incident decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull @Initialized IOException {
            String appId = appIdCoder.decode(inStream);
            Long minutes = minutesCoder.decode(inStream);
            Integer incidentId = incidentIdCoder.decode(inStream);
            Incident incident = new Incident();
            incident.setIncidentId(incidentId);
            incident.setMinutes(minutes);
            incident.setAppId(appId);
            return incident;
        }
    }
    
    
    1. Use the Coder for creating the empty PCollection:
                Coder<Incident> incidentCoder = new IncidentCoder();
                Map<String, Incident> map = new HashMap<>();
                PCollection<KV<String, Incident>> incidents = pipeline.apply("Creating empty PCollection", Create.of(map)
                        .withCoder(KvCoder.of(StringUtf8Coder.of(),
                                incidentCoder)));