I am using Apache beam to join multiple streams along with some lookups. I have 2 scenarios, If, the lookup size is huge, I wanted the side input to reload/refresh for every record processing (i.e. I will query the database with where clause) and if the lookup size is less, then reload/refresh once a day.
I want to know what is the correct approach for this. I don't want the huge data side input to eat up all the workers' memory.
I have used the below code for refreshing the side input once a day.
PCollectionView<Map<String, String>> lkp =
p.apply(GenerateSequence.from(0)).withRate(1, Duration.standardDays(1))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(
ParDo.of(
new DoFn<Long, Map<String, String>>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void process(
@Element Long input, OutputReceiver<Map<String, String>> o) {
Map<String, String> map = HiveConnection.getHiveConnection("select * from table");
o.output(map);
}
}))
.apply(View.<Map<String, String>>asSingleton());
Kindly guide me through the best practices for these type of use cases & provide me with some example code for better understanding.
Thanks, Gowtham
You are using the correct recommended pattern for the small daily lookups.
In the large case, rather than make use of a SideInput, a callout from the DoFn is normally the recommended pattern. This old blog contains an example of the pattern "Calling external services for data enrichment".
Guide to common Cloud Dataflow use-case patterns, Part 1
I will try and find time to add this pattern to the Beam pattern pages at: