The Question
How do I best execute memory-intensive pipelines in Apache Beam?
Background
I've written a pipeline that takes the Naemura Bird dataset and converts the images and annotations to TF Records with TF Examples of the required format for the TF object detection API.
I tested the pipeline using DirectRunner with a small subset of images (4 or 5) and it worked fine.
The Problem
When running the pipeline with a bigger data set (day 1 of 3, ~21GB) it crashes after a while with a non-descriptive SIGKILL
.
I do see a memory peak before the crash and assume that the process is killed because of a too high memory load.
I ran the pipeline through strace
. These are the last lines in the trace:
[pid 53702] 10:00:09.105069 poll([{fd=10, events=POLLIN}, {fd=11, events=POLLIN}, {fd=12, events=POLLIN}, {fd=13, events=POLLIN}, {fd=14, events=POLLIN}, {fd=15, events=POLLIN}, {fd=16, events=POLLIN}, {fd=17, events=POLLIN}, {fd=18, events=POLLIN}, {fd=19, events=POLLIN}, {fd=20, events=POLLIN}], 11, 100) = 0 (Timeout)
[pid 53702] 10:00:09.205826 poll([{fd=10, events=POLLIN}, {fd=11, events=POLLIN}, {fd=12, events=POLLIN}, {fd=13, events=POLLIN}, {fd=14, events=POLLIN}, {fd=15, events=POLLIN}, {fd=16, events=POLLIN}, {fd=17, events=POLLIN}, {fd=18, events=POLLIN}, {fd=19, events=POLLIN}, {fd=20, events=POLLIN}], 11, 100 <unfinished ...>
[pid 53534] 10:00:09.259806 mmap(NULL, 63082496, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f3aa43d7000
[pid 53694] 10:00:09.297140 <... clock_nanosleep resumed>NULL) = 0
[pid 53694] 10:00:09.297273 clock_nanosleep(CLOCK_REALTIME, 0, {tv_sec=0, tv_nsec=200000000}, <unfinished ...>
[pid 53702] 10:00:09.306409 <... poll resumed>) = 0 (Timeout)
[pid 53702] 10:00:09.306478 poll([{fd=10, events=POLLIN}, {fd=11, events=POLLIN}, {fd=12, events=POLLIN}, {fd=13, events=POLLIN}, {fd=14, events=POLLIN}, {fd=15, events=POLLIN}, {fd=16, events=POLLIN}, {fd=17, events=POLLIN}, {fd=18, events=POLLIN}, {fd=19, events=POLLIN}, {fd=20, events=POLLIN}], 11, 100) = 0 (Timeout)
[pid 53702] 10:00:09.406866 poll([{fd=10, events=POLLIN}, {fd=11, events=POLLIN}, {fd=12, events=POLLIN}, {fd=13, events=POLLIN}, {fd=14, events=POLLIN}, {fd=15, events=POLLIN}, {fd=16, events=POLLIN}, {fd=17, events=POLLIN}, {fd=18, events=POLLIN}, {fd=19, events=POLLIN}, {fd=20, events=POLLIN}], 11, 100 <unfinished ...>
[pid 53710] 10:03:55.844910 <... futex resumed>) = ?
[pid 53709] 10:03:57.797618 <... futex resumed>) = ?
[pid 53708] 10:03:57.797737 <... futex resumed>) = ?
[pid 53707] 10:03:57.797793 <... futex resumed>) = ?
[pid 53706] 10:03:57.797847 <... futex resumed>) = ?
[pid 53705] 10:03:57.797896 <... futex resumed>) = ?
[pid 53704] 10:03:57.797983 <... futex resumed>) = ?
[pid 53703] 10:03:57.798035 <... futex resumed>) = ?
[pid 53702] 10:03:57.798085 +++ killed by SIGKILL +++
[pid 53701] 10:03:57.798124 <... futex resumed>) = ?
[pid 53700] 10:03:57.798173 <... futex resumed>) = ?
[pid 53699] 10:03:57.798224 <... futex resumed>) = ?
[pid 53698] 10:03:57.798272 <... futex resumed>) = ?
[pid 53697] 10:03:57.798321 <... accept4 resumed> <unfinished ...>) = ?
[pid 53694] 10:03:57.798372 <... clock_nanosleep resumed> <unfinished ...>) = ?
[pid 53693] 10:03:57.798426 <... futex resumed>) = ?
[pid 53660] 10:03:57.798475 <... futex resumed>) = ?
[pid 53641] 10:03:57.798523 <... futex resumed>) = ?
[pid 53640] 10:03:57.798572 <... futex resumed>) = ?
[pid 53639] 10:03:57.798620 <... futex resumed>) = ?
[pid 53710] 10:03:57.798755 +++ killed by SIGKILL +++
[pid 53709] 10:03:57.798792 +++ killed by SIGKILL +++
[pid 53708] 10:03:57.798828 +++ killed by SIGKILL +++
[pid 53707] 10:03:57.798864 +++ killed by SIGKILL +++
[pid 53706] 10:03:57.798900 +++ killed by SIGKILL +++
[pid 53705] 10:03:57.798937 +++ killed by SIGKILL +++
[pid 53704] 10:03:57.798973 +++ killed by SIGKILL +++
[pid 53703] 10:03:57.799008 +++ killed by SIGKILL +++
[pid 53701] 10:03:57.799044 +++ killed by SIGKILL +++
[pid 53700] 10:03:57.799079 +++ killed by SIGKILL +++
[pid 53699] 10:03:57.799116 +++ killed by SIGKILL +++
[pid 53698] 10:03:57.799152 +++ killed by SIGKILL +++
[pid 53697] 10:03:57.799187 +++ killed by SIGKILL +++
[pid 53694] 10:03:57.799245 +++ killed by SIGKILL +++
[pid 53693] 10:03:57.799282 +++ killed by SIGKILL +++
[pid 53660] 10:03:57.799318 +++ killed by SIGKILL +++
[pid 53641] 10:03:57.799354 +++ killed by SIGKILL +++
[pid 53640] 10:03:57.799390 +++ killed by SIGKILL +++
[pid 53639] 10:03:57.910349 +++ killed by SIGKILL +++
10:03:57.910381 +++ killed by SIGKILL +++
Multiple things could cause this behaviour, because the pipeline runs fine with less Data, analysing what has changed could lead us to a resolution.
The third line of the logs you provide might indicate that you're processing unclean data in your bigger pipeline mmap(NULL,
could mean that | "Get Content" >> beam.Map(lambda x: x.read_utf8())
is trying to read a null value.
Is there an empty file somewhere ? Are your files utf8 encoded ?
I'm guessing using the fileio.ReadMatches()
will try to load into memory the whole file, if your file is bigger than your memory, this could lead to errors. Can you split your data into smaller files ?
If files are too big for your current machine with a DirectRunner
you could try to use an on-demand infrastructure using another runner on the Cloud such as DataflowRunner