pythonpython-3.xmultiprocessingcythonpython-multiprocessing

"process died unexpectedly" with cythonized version of multiprocessing code


This is an offshoot of this question. The code in python runs fine. When I tried the cythonized version, I started getting "Can't pickle <cyfunction init_worker_processes at 0x7fffd7da5a00>" even though I defined the init_worker_processes at top level. So, I moved it to another module and used the imported init_worker_processes. Now, I get the below error:

error: unrecognized arguments: -s -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=8, pipe_handle=16) --multiprocessing-fork
Python3/lib/python3.9/multiprocessing/resource_tracker.py:96: UserWarning: resource_tracker: process died unexpectedly, relaunching.  Some resources might leak.
  warnings.warn('resource_tracker: process died unexpectedly, '

I'm not explicitly using -s or -c as reported in the error. The error is coming from below code in the multiprocessing library (method - ensure_running)

warnings.warn('resource_tracker: process died unexpectedly, '
                              'relaunching.  Some resources might leak.')

How to resolve this issue?

# updated Python code
# ---------------------- mp_app.py ------------------
import argparse
import logging
import signal
import sys
import time

import multiprocessing as mp
from dataclasses import dataclass
from typing import Dict, NoReturn

import numpy as np

from mp_utils import init_worker_processes 


@dataclass
class TmpData:
    name: str
    value: int


def worker(name: str, data: TmpData) -> NoReturn:
    logger_obj = mp.get_logger()
    logger_obj.info(f"processing : {name}; value: {data.value}")

    time.sleep(data.value)


def get_args(logger: logging.Logger) -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="test MP app")
    parser.add_argument(
        "-m",
        "--max-time",
        type=int,
        dest="max_time",
        required=True,
        help="max timeout in seconds",
    )

    parser.add_argument(
        "-j",
        dest="num_workers",
        type=int,
        default=1,
        required=False,
        help=argparse.SUPPRESS,
    )

    try:
        args = parser.parse_args()
    except argparse.ArgumentError as err:
        logger.exception(parser.print_help())
        raise err

    return args


def mp_app(options: argparse.Namespace, logger: logging.Logger) -> NoReturn:
    map_data: Dict[str, TmpData] = {
        key: TmpData(name=key, value=np.random.randint(1, options.max_time))
        for key in ["ABC", "DEF", "GHI", "JKL", "PQR", "STU", "XYZ"]
    }

    with mp.get_context("fork").Pool(
        processes=options.num_workers,
        initializer=init_worker_processes,
    ) as pool:
        results = []
        for key in map_data:
            try:
                results.append(
                    pool.apply_async(
                        worker,
                        args=(
                            key,
                            map_data[key],
                        ),
                    )
                )
            except KeyboardInterrupt:
                pool.terminate()

        pool.close()
        pool.join()

        for result in results:
            try:
                result.get()
            except Exception as err:
                logger.error(f"{err}")


if __name__ == "__main__":
    main_logger = logging.getLogger()

    try:
        args = get_args(main_logger)
        mp_app(options=args, logger=main_logger)
    except Exception as e:
        main_logger.error(e)
        raise SystemExit(1) from e

    sys.exit(0)

# --------------------- mp_utils.py --------------------------

import multiprocessing
import logging
import signal

from typing import NoReturn


def init_worker_processes() -> NoReturn:
    """
    Initializes each worker to handle signals
    Returns:
        None
    """
    this_process_logger = multiprocessing.log_to_stderr()
    this_process_logger.setLevel(logging.INFO)
    signal.signal(signal.SIGINT, signal.SIG_IGN)

Please note the main issue seems to be "-s" and "-c" being unrecognized option; not sure from where those are coming.

Edit-2:

While I'm still trying to decipher the cython build process as it happens through a complex make system in our environment. However, I'm guessing that I am able to trace the root cause of -s and -c options. -s seems to be coming from the _args_from_interpreter_flags method in subprocess.py (module subprocess).

In my python shell I see sys.flags as following -

>>> sys.flags sys.flags(debug=0, inspect=0, interactive=0, optimize=0, dont_write_bytecode=0, no_user_site=1, no_site=0, ignore_environment=0, verbose=0, bytes_warning=0, quiet=0, hash_randomization=1, isolated=0, dev_mode=False, utf8_mode=0, int_max_str_digits=-1)

Since sys.flags.no_user_site is 1, -s seems to get appended.

get_command_line in spawn.py seems to be adding -c. Since this branch is coming from else of if getattr(sys, 'frozen', False), is spwan approach not supposed to work with a cythonized binary?

EDIT-3:

I tried with both "fork" and "spawn". Both works in Python. But with cythonized build, "spawn" based app I get "UserWarning: resource_tracker: process died unexpectedly, relaunching. Some resources might leak" message and along with "unrecognized arguments" for -s and -c. The cythonized version of the "fork" based app, simply hangs at launch itself as if it's waiting on some lock. I tried, pstack on the process id, but could not spot anything -

# top 20 frames from pstack
#0  0x00007ffff799675d in read () from /usr/lib64/libpthread.so.0
#1  0x00007ffff70c3996 in _Py_read (fd=fd@entry=3, buf=0x7fffbabfdbf0, count=count@entry=4) at Python/fileutils.c:1707
#2  0x00007ffff70ce872 in os_read_impl (module=<optimized out>, length=4, fd=3) at ./Modules/posixmodule.c:9474
#3  os_read (module=<optimized out>, nargs=<optimized out>, args=<optimized out>) at ./Modules/clinic/posixmodule.c.h:5012
#4  os_read (module=<optimized out>, args=<optimized out>, nargs=<optimized out>) at ./Modules/clinic/posixmodule.c.h:4977
#5  0x00007ffff6fc444f in cfunction_vectorcall_FASTCALL (func=0x7ffff7f4aa90, args=0x7fffbae68f10, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/methodobject.c:430
#6  0x00007ffff6f303ec in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbae68f10, callable=0x7ffff7f4aa90, tstate=0x419cb0) at ./Include/cpython/abstract.h:118
#7  PyObject_Vectorcall (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbae68f10, callable=<optimized out>) at ./Include/cpython/abstract.h:127
#8  call_function (kwnames=0x0, oparg=<optimized out>, pp_stack=<synthetic pointer>, tstate=0x419cb0) at Python/ceval.c:5077
#9  _PyEval_EvalFrameDefault (tstate=<optimized out>, f=<optimized out>, throwflag=<optimized out>) at Python/ceval.c:3520
#10 0x00007ffff7066ea4 in _PyEval_EvalFrame (throwflag=0, f=0x7fffbae68d60, tstate=0x419cb0) at ./Include/internal/pycore_ceval.h:40
#11 _PyEval_EvalCode (tstate=tstate@entry=0x419cb0, _co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, args=<optimized out>, argcount=2, kwnames=0x0, kwargs=0x7fffbac0b750, kwcount=0, kwstep=1, defs=0x7fffbae95298, defcount=1, kwdefs=0x0, closure=0x0, name=0x7fffbae8d230, qualname=0x7fffbae8c210) at Python/ceval.c:4329
#12 0x00007ffff6f7baba in _PyFunction_Vectorcall (func=<optimized out>, stack=<optimized out>, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/call.c:396
#13 0x00007ffff6f311aa in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac0b740, callable=0x7fffbabefe50, tstate=0x419cb0) at ./Include/cpython/abstract.h:118
#14 PyObject_Vectorcall (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac0b740, callable=<optimized out>) at ./Include/cpython/abstract.h:127
#15 call_function (kwnames=0x0, oparg=<optimized out>, pp_stack=<synthetic pointer>, tstate=0x419cb0) at Python/ceval.c:5077
#16 _PyEval_EvalFrameDefault (tstate=<optimized out>, f=<optimized out>, throwflag=<optimized out>) at Python/ceval.c:3506
#17 0x00007ffff7066ea4 in _PyEval_EvalFrame (throwflag=0, f=0x7fffbac0b5b0, tstate=0x419cb0) at ./Include/internal/pycore_ceval.h:40
#18 _PyEval_EvalCode (tstate=tstate@entry=0x419cb0, _co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, args=<optimized out>, argcount=2, kwnames=0x0, kwargs=0x7fffbac08f58, kwcount=0, kwstep=1, defs=0x7fffbae83b08, defcount=1, kwdefs=0x0, closure=0x0, name=0x7fffbae84830, qualname=0x7fffbae8c3f0) at Python/ceval.c:4329
#19 0x00007ffff6f7baba in _PyFunction_Vectorcall (func=<optimized out>, stack=<optimized out>, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/call.c:396
#20 0x00007ffff6f311aa in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac08f48, callable=0x7fffbabeff70, tstate=0x419cb0) at ./Include/cpython/abstract.h:118

I checked the cython build process prints something like following:

cython --3str --embed --no-docstrings -o mp_app.c mp_app.py
gcc -Os -Loss/Python3/lib -DNDEBUG -Wl,--strip-all -IPython-3.9.15/include/python3.9 -LPython-3.9.15/lib/python3.9/config-3.9-x86_64-linux-gnu -LPython-3.9.15/lib -lcrypt -lpthread -ldl -lutil -lm -lm -B /binutils/bin  -static-libgcc -static-libstdc++ -fPIC  -lpython3.9 mp_app.c -o mp_app.pex

PS: I've also edited the source code example


Solution

  • I was able to resolve the issue by switching to the fork start method from the spawn start method. Additionally, I had to move the worker method from mp_app.py to mp_utils.py as otherwise in the cythonized version it was throwing PicklingError for cythonized function worker.

    I am still not sure why the spawn start method did not work for me on CentOS7 machine.

    The final code is approximately following:

    # ------------- mp_app.py ------------------
    import argparse
    import logging
    import signal
    import sys
    
    
    import multiprocessing as mp
    from typing import Dict, NoReturn
    
    import numpy as np
    
    from mp_utils import (init_worker_processes, worker_task, InterProcessData)
    
    
    def get_args(logger: logging.Logger) -> argparse.Namespace:
        parser = argparse.ArgumentParser(description="test MP app")
        parser.add_argument(
            "-m",
            "--max-time",
            type=int,
            dest="max_time",
            required=True,
            help="max timeout in seconds",
        )
    
        parser.add_argument(
            "-j",
            dest="num_workers",
            type=int,
            default=1,
            required=False,
            help=argparse.SUPPRESS,
        )
    
        try:
            args = parser.parse_args()
        except argparse.ArgumentError as err:
            logger.exception(parser.print_help())
            raise err
    
        return args
    
    
    def mp_app(options: argparse.Namespace, logger: logging.Logger) -> NoReturn:
        map_data: Dict[str, InterProcessData] = {
            key: InterProcessData(name=key, value=np.random.randint(1, options.max_time))
            for key in ["ABC", "DEF", "GHI", "JKL", "PQR", "STU", "XYZ"]
        }
    
        with mp.get_context("fork").Pool(
            processes=options.num_workers,
            initializer=init_worker_processes,
        ) as pool:
            results = []
            for key in map_data:
                try:
                    results.append(
                        pool.apply_async(
                            worker_task,
                            args=(
                                key,
                                map_data[key],
                            ),
                        )
                    )
                except KeyboardInterrupt:
                    pool.terminate()
    
            pool.close()
            pool.join()
    
            for result in results:
                try:
                    result.get()
                except Exception as err:
                    logger.error(f"{err}")
    
    
    if __name__ == "__main__":
        main_logger = logging.getLogger()
    
        try:
            args = get_args(main_logger)
            mp_app(options=args, logger=main_logger)
        except Exception as e:
            main_logger.error(e)
            raise SystemExit(1) from e
    
        sys.exit(0)
    
    
    # ---------- mp_utils.py -----------
    import time
    import logging
    import signal
    import multiprocessing
    from dataclasses import dataclass
    from typing import NoReturn
    
    
    @dataclass
    class InterProcessData:
        name: str
        value: int
    
    
    def worker_task(name: str, data: InterProcessData) -> NoReturn:
        logger_obj = multiprocessing.get_logger()
        logger_obj.info(f"processing : {name}; value: {data.value}")
    
        time.sleep(data.value)
    
    
    def init_worker_processes() -> NoReturn:
        this_process_logger = multiprocessing.log_to_stderr()
        this_process_logger.setLevel(logging.INFO)
        signal.signal(signal.SIGINT, signal.SIG_IGN)