Im trying to create an empty PCollection of a custom Object called Incident
public class Incident implements Serializable {
private Integer incidentId;
private String appId;
private Long minutes;
// getter setter
}
I was trying with the following:
PCollection<KV<String, Incident>> incidents = pipeline.apply("Create Empty Collection", Create.empty(TypeDescriptor.of(KV.class)))
.setTypeDescriptor(TypeDescriptor.of(KV.class, String.class, Incident.class));
But it gives me compilation error:
Cannot resolve method 'of(Class<KV>, Class<String>, Class<Incident>)'
One way to do it:
import org.apache.beam.sdk.coders.*;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class IncidentCoder extends AtomicCoder<Incident> {
private final Coder<Integer> incidentIdCoder = NullableCoder.of(BigEndianIntegerCoder.of());
private final Coder<Long> minutesCoder = NullableCoder.of(SerializableCoder.of(Long.class));
private final Coder<String> appIdCoder = NullableCoder.of(StringUtf8Coder.of());
@Override
public void encode(Incident value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull @Initialized IOException {
appIdCoder.encode(value.getAppId(), outStream);
minutesCoder.encode(value.getMinutes(), outStream);
incidentIdCoder.encode(value.getIncidentId(), outStream);
}
@Override
public Incident decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull @Initialized IOException {
String appId = appIdCoder.decode(inStream);
Long minutes = minutesCoder.decode(inStream);
Integer incidentId = incidentIdCoder.decode(inStream);
Incident incident = new Incident();
incident.setIncidentId(incidentId);
incident.setMinutes(minutes);
incident.setAppId(appId);
return incident;
}
}
Coder<Incident> incidentCoder = new IncidentCoder();
Map<String, Incident> map = new HashMap<>();
PCollection<KV<String, Incident>> incidents = pipeline.apply("Creating empty PCollection", Create.of(map)
.withCoder(KvCoder.of(StringUtf8Coder.of(),
incidentCoder)));