pythonpython-3.xmultithreadingmultiprocessing

Cannot pickle local function when sending callable Filter objects via multiprocessing Queue


Problem Description

I'm developing a FilterBroker class that manages callable filters for subscriber processes. The broker receives functions wrapped in a Filter object via a message queue. However, I'm encountering a pickling error when trying to send a locally defined function:

AttributeError: Can't get local object 'task.<locals>.function'

The error occurs because the function is defined inside another function (task()) and is therefore not picklable, but I need to support sending lambda and locally defined functions.

Code Example

Here's a minimal reproducible example showing the issue:

from threading import Thread
from multiprocessing import Queue, Manager, Process
from dataclasses import dataclass
from typing import Optional
import logging
import inspect

@dataclass
class Service:
    id: Optional[int] = None
    name: str = ""
    port: int = 0

class Filter:
    def __init__(self, filter_function: callable):
        self.filter_function: callable = filter_function
        self.subscribers: list[Service] = []

    def __call__(self, *args, **kwds):
        return self.filter_function(*args, **kwds)


class FilterBroker(Thread):
    def __init__(self, queue: Queue) -> None:
        super().__init__()
        self.queue = queue
        self.filters: dict[str, Filter] = {}

    def add_filter(self, name: str, filter: Filter):
        if len(inspect.signature(filter).parameters) != 2:
            raise TypeError("Invalid Filter: must have exactly two parameters")
        self.filters[name] = filter

    def run(self):
        class_name = self.__class__.__name__
        logging.info(f"[{class_name}]: Process started")
        while True:
            try:
                task = self.queue.get()
                logging.debug(f"[{class_name}]: Task received: {task}")
                if task is None:
                    break
                if not isinstance(task, tuple) or not callable(task[0]) or not isinstance(task[1], Queue):
                    continue
                response_queue, method, *args = task
                response = method(self, *args)
            except Exception:
                response = None
            finally:
                response_queue.put_nowait(response)

    @staticmethod
    def ask(fb: 'FilterBroker', *task):
        response_queue = Manager().Queue()
        fb.queue.put((response_queue, *task))
        print("I put in queue")
        result = response_queue.get()
        print("I got result")
        response_queue.close()
        return result


manager = Manager()
broker = FilterBroker(manager.Queue())
broker.start()


def task(broker):
    def function(x):
        return x > 0

    f = Filter(function)
    print(f(2))
    FilterBroker.ask(broker, FilterBroker.add_filter, 'test', f)
    logging.debug(f"Filter added")


process = Process(target=task, args=(broker,))

process.start()
process.join()

print("Process finished")

Full Error Traceback

Traceback (most recent call last):
  File "/usr/lib64/python3.13/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "/usr/lib64/python3.13/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/****/Scrivania/github/ctf_proxy/refactoring/test.py", line 22, in task
    fb.ask(broker, fb.add_filter, 'test', f)
    ~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home//****/Scrivania/github/ctf_proxy/refactoring/proxy/multiprocess/FilterBroker.py", line 299, in ask
    fb.queue.put((response_queue, *task))
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 2, in put
  File "/usr/lib64/python3.13/multiprocessing/managers.py", line 830, in _callmethod
    conn.send((self._id, methodname, args, kwds))
    ~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.13/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
                     ~~~~~~~~~~~~~~~~~~~~~^^^^^
  File "/usr/lib64/python3.13/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
    ~~~~~~~~~~~~~~~~~~~~~~~^^^^^
AttributeError: Can't get local object 'task.<locals>.function'

Question

How can I modify my code to support sending locally defined functions and lambdas via multiprocessing? I want subscribers to be able to register and retrieve custom filter functions without having to define them at the module level.


Solution

  • you should use cloudpickle, you'll have to do the cloudpickle.dumps and cloudpickle.loads yourself.

    from threading import Thread
    from multiprocessing import Queue, Manager, Process
    from dataclasses import dataclass
    from typing import Optional
    import logging
    import inspect
    import cloudpickle
    
    @dataclass
    class Service:
        id: Optional[int] = None
        name: str = ""
        port: int = 0
    
    class Filter:
        def __init__(self, filter_function: callable):
            self.filter_function: callable = filter_function
            self.subscribers: list[Service] = []
    
        def __call__(self, *args, **kwds):
            return self.filter_function(*args, **kwds)
    
    
    class FilterBroker(Thread):
        def __init__(self, queue: Queue) -> None:
            super().__init__()
            self.queue = queue
            self.filters: dict[str, Filter] = {}
    
        def add_filter(self, name: str, filter: Filter):
            if len(inspect.signature(filter).parameters) != 2:
                raise TypeError("Invalid Filter: must have exactly two parameters")
            self.filters[name] = filter
    
        def run(self):
            class_name = self.__class__.__name__
            logging.info(f"[{class_name}]: Process started")
            while True:
                try:
                    task = self.queue.get()
                    logging.debug(f"[{class_name}]: Task received: {task}")
                    if task is None:
                        break
                    response_queue, pickled_task = task
                    method, *args = cloudpickle.loads(pickled_task)
                    response = method(self, *args)
                except Exception:
                    import traceback
                    traceback.print_exc()
                    response = None
                finally:
                    if task is not None:
                        response_queue.put_nowait(response)
    
    
    class FilterBrokerAsker:
    
        def __init__(self, queue: Queue) -> None:
            super().__init__()
            self.queue = queue
    
        @staticmethod
        def ask(fb: 'FilterBrokerAsker', *task):
            pickled_task = cloudpickle.dumps(task)
            response_queue = Manager().Queue()
            fb.queue.put((response_queue, pickled_task))
            print("I put in queue")
            result = response_queue.get()
            print("I got result")
            return result
    
    def task(broker):
        def function(x):
            print("running local function!")
            return x > 0
    
        f = Filter(function)
        print(f(2))
        FilterBrokerAsker.ask(broker, FilterBroker.add_filter, 'test', f)
        logging.debug(f"Filter added")
    
    if __name__ == "__main__":
        manager = Manager()
        broker = FilterBroker(manager.Queue())
        broker.start()
    
        broker_data = FilterBrokerAsker(broker.queue)
    
        process = Process(target=task, args=(broker_data,))
    
        process.start()
        process.join()
    
        print("Process finished")
        broker.queue.put(None)
    
    running local function!
    True
    I put in queue
    I got result
    Process finished
    

    the split to FilterBrokerAsker is to get it to work with spawn instead of fork, as threads are not picklable.

    note that cloudpickle has problems with imports, and you may need to re-import things inside your functions, the whole concept is very fragile, and FWIW you should just use threads instead.