out-of-memorygoogle-cloud-dataflowapache-beamlarge-filesfile-watcher

Dataflow GCP (Apache Beam) - continuously reading big amount of files (OutOfMemory)


What I want to do:

  1. Read and uncompress GZ continuously files by pattern (~3000 files), each file has 1.2MB and 9 MB after unpacking
  2. Replace some sequence of characters from every of CSV files
  3. Compress CSV file to GZ and save modified files to its own path.

Actual code:

static void run(final BeeswaxDataflowOptions options) {
final Pipeline pipeline = Pipeline.create(options);
final PCollection<MatchResult.Metadata> matches =
    pipeline.apply(
        "Read",
        FileIO.match()
            .filepattern(options.getSourcePath() + options.getSourceFilesPattern())
            .continuously(
                Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never()));

matches
    .apply(FileIO.readMatches().withCompression(GZIP))
    .apply(
        Window.<FileIO.ReadableFile>into(
                FixedWindows.of(Duration.standardSeconds(options.getWindowInterval())))
            .accumulatingFiredPanes()
            .withAllowedLateness(Duration.ZERO)
            .triggering(
                Repeatedly.forever(AfterPane.elementCountAtLeast(1).getContinuationTrigger())))
    .apply(
        "Uncompress",
        MapElements.into(
                TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
            .via(
                file -> {
                  final String filePath = file.getMetadata().resourceId().toString();
                  try {
                    return KV.of(filePath, file.readFullyAsUTF8String());
                  } catch (final IOException e) {
                    return KV.of(filePath, "");
                  }
                }))
    .apply("Prepare for BigQuery import", ParDo.of(new BigQueryDataPreparatorFn()))
    .apply(
        "Save results",
        FileIO.<String, KV<String, String>>writeDynamic()
            .withCompression(GZIP)
            .by(KV::getKey)
            .withDestinationCoder(StringUtf8Coder.of())
            .via(Contextful.fn(KV::getValue), TextIO.sink())
            .withNumShards(options.getShards())
            .to(options.getOutputPath())
            .withTempDirectory(options.getTempLocation())
            .withNaming(AbsoluteNaming::new));

pipeline.run().waitUntilFinish();

The problem is with OutOfMemory exception (yeah I know that readFullyAsUTF8String can be suspicious for that). How to deal with that kind of situation?

My observation is that all ~3000 files are read and gathering in "Uncompress" step. So before it goes to "Prepare for BigQuery import" and "Save results" it is somehow accumulated and read to RAM.

It would be nice to somehow queue this pipeline - like max of 50 elements go through steps and wait for results and then start next. Is this possible? How deal with it different if not


Solution

  • You can do a couple of things here.

    1: Use shuffle to distribute files more evenly.

    final PCollection<MatchResult.Metadata> matches =
    pipeline.apply(
        "Read",
        FileIO.match()
            .filepattern(options.getSourcePath() + options.getSourceFilesPattern())
            .continuously(
                Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never()));
    matches
    .apply(Reshuffle.viaRandomKey())
    .apply(FileIO.readMatches().withCompression(GZIP))
    

    Next you can limit concurrent elements processed per VM by setting --numberOfWorkerHarnessThreads but i think the problem should be resolved by reshuffling.