What I want to do:
Actual code:
static void run(final BeeswaxDataflowOptions options) {
final Pipeline pipeline = Pipeline.create(options);
final PCollection<MatchResult.Metadata> matches =
pipeline.apply(
"Read",
FileIO.match()
.filepattern(options.getSourcePath() + options.getSourceFilesPattern())
.continuously(
Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never()));
matches
.apply(FileIO.readMatches().withCompression(GZIP))
.apply(
Window.<FileIO.ReadableFile>into(
FixedWindows.of(Duration.standardSeconds(options.getWindowInterval())))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO)
.triggering(
Repeatedly.forever(AfterPane.elementCountAtLeast(1).getContinuationTrigger())))
.apply(
"Uncompress",
MapElements.into(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(
file -> {
final String filePath = file.getMetadata().resourceId().toString();
try {
return KV.of(filePath, file.readFullyAsUTF8String());
} catch (final IOException e) {
return KV.of(filePath, "");
}
}))
.apply("Prepare for BigQuery import", ParDo.of(new BigQueryDataPreparatorFn()))
.apply(
"Save results",
FileIO.<String, KV<String, String>>writeDynamic()
.withCompression(GZIP)
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.withNumShards(options.getShards())
.to(options.getOutputPath())
.withTempDirectory(options.getTempLocation())
.withNaming(AbsoluteNaming::new));
pipeline.run().waitUntilFinish();
The problem is with OutOfMemory exception (yeah I know that readFullyAsUTF8String can be suspicious for that). How to deal with that kind of situation?
My observation is that all ~3000 files are read and gathering in "Uncompress" step. So before it goes to "Prepare for BigQuery import" and "Save results" it is somehow accumulated and read to RAM.
It would be nice to somehow queue this pipeline - like max of 50 elements go through steps and wait for results and then start next. Is this possible? How deal with it different if not
You can do a couple of things here.
1: Use shuffle to distribute files more evenly.
final PCollection<MatchResult.Metadata> matches =
pipeline.apply(
"Read",
FileIO.match()
.filepattern(options.getSourcePath() + options.getSourceFilesPattern())
.continuously(
Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never()));
matches
.apply(Reshuffle.viaRandomKey())
.apply(FileIO.readMatches().withCompression(GZIP))
Next you can limit concurrent elements processed per VM by setting --numberOfWorkerHarnessThreads
but i think the problem should be resolved by reshuffling.