pythonmultiprocessingpytestpython-multiprocessing

How to return the results of a function when using Process in Multiprocessing without Queue?


I have a few tests using Pytest that I want to make sure:

  1. the tests run sequentially (no parallel processing, order does not matter)
  2. Each test runs in a new process.

I am trying to use multiprocessing Process to call on a custom method (which also has additional functionality using a decorator) but I cant seem to find a way to return the result of the decorator + function WITHOUT using queue. All the solutions I have looked up so far, use queue and the q.put(result) instead of return. Except, I don't have the liberty to replace the return in my function with a q.put since the method is called in a lot of other places that I do not have permissions to change.

Any other way I can get the results? or a different workflow maybe?

Dummy Code:

 def timer(func):
    def wrapper(*args, **kwargs):
        t1 = time()
        result = func(*args, **kwargs)
        t2 = time()
        elapsed = t2-t1
        return result, elapsed
    return wrapper

 class do_stats:

    def _solve(self):
       result = solve something
       return result

if __name__ == '__main__':
    p = Process(target=timer, args=(do_stats._solve,))
    p.start()
    p.join()
    time, result = ?`

I am not allowed to change any code for either timer or do_stats.


Solution

  • There are some issues with your post:

    1. It is not a minimal, reproducible example: import statements are missing.
    2. You have an indentation problem.
    3. You have a class named do_stats and you are trying to call a method that beings with an underscore (i.e. _solve). Class names should normally use the CapWords convention, i.e. DoStats in this case. Methods that being with an underscore are considered "private" and should only be called from other class methods. Rename _solve to solve. See PEP 8 – Style Guide for Python Code. I understand that you may not have control over the actual class and method names if they do not follow PEP 8.
    4. Method solve is an instance method that must be called with an instance of your DoStats class. It appears that you are treating solve as a class method, which it is not.

    If I am correct in my assumption that you cannot modify the DoStats definition at all, then I would try the following. Note that it is necessary to define an additional function (I called it worker) that will be the actual target of your Process instance:

    from multiprocessing import Process, Queue
    from time import time, sleep
    
    def timer(func):
        def wrapper(*args, **kwargs):
            t1 = time()
            result = func(*args, **kwargs)
            t2 = time()
            elapsed = t2-t1
            return result, elapsed
        return wrapper
    
    class DoStats:
        def solve(self, x):
            sleep(1.0)  # emulate some work
            result = x ** 2
            return result
    
    def worker(queue, x):
        do_stats = DoStats()  # Create necessary instance
        # Wrap method explicitly and then call:
        results = timer(do_stats.solve)(x)
        queue.put(results)
    
    if __name__ == '__main__':
        queue = Queue()
        p = Process(target=worker, args=(queue, 7))
        p.start()
        results = queue.get()  # Must be called befor p.join()
        p.join()
        print(results)
    

    Prints:

    (49, 1.0005183219909668)
    

    If you are able to modify the DoStats class so that we can decorate the solve method, then we only need:

    from multiprocessing import Process, Queue
    from time import time, sleep
    
    def timer(func):
        def wrapper(*args, **kwargs):
            t1 = time()
            result = func(*args, **kwargs)
            t2 = time()
            elapsed = t2-t1
            return result, elapsed
        return wrapper
    
    class DoStats:
        @timer
        def solve(self, x):
            sleep(1.0)  # emulate some work
            result = x ** 2
            return result
    
    def worker(queue, x):
        do_stats = DoStats()
        results = do_stats.solve(x)
        queue.put(results)
    
    if __name__ == '__main__':
        queue = Queue()
        p = Process(target=worker, args=(queue, 7))
        p.start()
        results = queue.get()  # Must be called befor p.join()
        p.join()
        print(results)