I have a PCollection which I want to pass as a side-input and access its elements in a ParDo.
I've created a PCollectionView of it as:
final PCollectionView<List<Foo>> view =
myPCollection.apply(View.asList());
How to access the elements of it in a ParDo when passed as a side-input?
An example would really help.
Thank You
This snippet mainly comes from the Beam programming guide.
final PCollectionView<List<Foo>> view =
myPCollection.apply(View.asList());
PCollection<String> resultingPCollection =
someOtherPCollection.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
List<Foo> mySideInput = c.sideInput(view);
// Do something with side input
}
}).withSideInputs(view)
);
If you don't want to use an anonymous DoFn, you can also pass the PCollectionView as part of its constructor and access it in the processElement function. Like so:
final PCollectionView<List<Foo>> view =
myPCollection.apply(View.asList());
PCollection<String> resultingPCollection =
someOtherPCollection.apply(ParDo
.of(new MyDoFn(view)).withSideInputs(view));
class MyDoFn extends DoFn<String, String> {
final PCollectionView<List<Foo>> view;
MyDoFn(PCollectionView<List<Foo>> view) {
this.view = view;
}
@ProcessElement
public void processElement(ProcessContext c) {
List<Foo> mySideInput = c.sideInput(this.view);
// Do something with side input
}
}