I'm developing a FilterBroker
class that manages callable filters for subscriber processes. The broker receives functions wrapped in a Filter
object via a message queue. However, I'm encountering a pickling error when trying to send a locally defined function:
AttributeError: Can't get local object 'task.<locals>.function'
The error occurs because the function is defined inside another function (task()
) and is therefore not picklable, but I need to support sending lambda and locally defined functions.
Here's a minimal reproducible example showing the issue:
from threading import Thread
from multiprocessing import Queue, Manager, Process
from dataclasses import dataclass
from typing import Optional
import logging
import inspect
@dataclass
class Service:
id: Optional[int] = None
name: str = ""
port: int = 0
class Filter:
def __init__(self, filter_function: callable):
self.filter_function: callable = filter_function
self.subscribers: list[Service] = []
def __call__(self, *args, **kwds):
return self.filter_function(*args, **kwds)
class FilterBroker(Thread):
def __init__(self, queue: Queue) -> None:
super().__init__()
self.queue = queue
self.filters: dict[str, Filter] = {}
def add_filter(self, name: str, filter: Filter):
if len(inspect.signature(filter).parameters) != 2:
raise TypeError("Invalid Filter: must have exactly two parameters")
self.filters[name] = filter
def run(self):
class_name = self.__class__.__name__
logging.info(f"[{class_name}]: Process started")
while True:
try:
task = self.queue.get()
logging.debug(f"[{class_name}]: Task received: {task}")
if task is None:
break
if not isinstance(task, tuple) or not callable(task[0]) or not isinstance(task[1], Queue):
continue
response_queue, method, *args = task
response = method(self, *args)
except Exception:
response = None
finally:
response_queue.put_nowait(response)
@staticmethod
def ask(fb: 'FilterBroker', *task):
response_queue = Manager().Queue()
fb.queue.put((response_queue, *task))
print("I put in queue")
result = response_queue.get()
print("I got result")
response_queue.close()
return result
manager = Manager()
broker = FilterBroker(manager.Queue())
broker.start()
def task(broker):
def function(x):
return x > 0
f = Filter(function)
print(f(2))
FilterBroker.ask(broker, FilterBroker.add_filter, 'test', f)
logging.debug(f"Filter added")
process = Process(target=task, args=(broker,))
process.start()
process.join()
print("Process finished")
Traceback (most recent call last):
File "/usr/lib64/python3.13/multiprocessing/process.py", line 313, in _bootstrap
self.run()
~~~~~~~~^^
File "/usr/lib64/python3.13/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/****/Scrivania/github/ctf_proxy/refactoring/test.py", line 22, in task
fb.ask(broker, fb.add_filter, 'test', f)
~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home//****/Scrivania/github/ctf_proxy/refactoring/proxy/multiprocess/FilterBroker.py", line 299, in ask
fb.queue.put((response_queue, *task))
~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
File "<string>", line 2, in put
File "/usr/lib64/python3.13/multiprocessing/managers.py", line 830, in _callmethod
conn.send((self._id, methodname, args, kwds))
~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.13/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
~~~~~~~~~~~~~~~~~~~~~^^^^^
File "/usr/lib64/python3.13/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
~~~~~~~~~~~~~~~~~~~~~~~^^^^^
AttributeError: Can't get local object 'task.<locals>.function'
How can I modify my code to support sending locally defined functions and lambdas via multiprocessing? I want subscribers to be able to register and retrieve custom filter functions without having to define them at the module level.
you should use cloudpickle, you'll have to do the cloudpickle.dumps
and cloudpickle.loads
yourself.
from threading import Thread
from multiprocessing import Queue, Manager, Process
from dataclasses import dataclass
from typing import Optional
import logging
import inspect
import cloudpickle
@dataclass
class Service:
id: Optional[int] = None
name: str = ""
port: int = 0
class Filter:
def __init__(self, filter_function: callable):
self.filter_function: callable = filter_function
self.subscribers: list[Service] = []
def __call__(self, *args, **kwds):
return self.filter_function(*args, **kwds)
class FilterBroker(Thread):
def __init__(self, queue: Queue) -> None:
super().__init__()
self.queue = queue
self.filters: dict[str, Filter] = {}
def add_filter(self, name: str, filter: Filter):
if len(inspect.signature(filter).parameters) != 2:
raise TypeError("Invalid Filter: must have exactly two parameters")
self.filters[name] = filter
def run(self):
class_name = self.__class__.__name__
logging.info(f"[{class_name}]: Process started")
while True:
try:
task = self.queue.get()
logging.debug(f"[{class_name}]: Task received: {task}")
if task is None:
break
response_queue, pickled_task = task
method, *args = cloudpickle.loads(pickled_task)
response = method(self, *args)
except Exception:
import traceback
traceback.print_exc()
response = None
finally:
if task is not None:
response_queue.put_nowait(response)
class FilterBrokerAsker:
def __init__(self, queue: Queue) -> None:
super().__init__()
self.queue = queue
@staticmethod
def ask(fb: 'FilterBrokerAsker', *task):
pickled_task = cloudpickle.dumps(task)
response_queue = Manager().Queue()
fb.queue.put((response_queue, pickled_task))
print("I put in queue")
result = response_queue.get()
print("I got result")
return result
def task(broker):
def function(x):
print("running local function!")
return x > 0
f = Filter(function)
print(f(2))
FilterBrokerAsker.ask(broker, FilterBroker.add_filter, 'test', f)
logging.debug(f"Filter added")
if __name__ == "__main__":
manager = Manager()
broker = FilterBroker(manager.Queue())
broker.start()
broker_data = FilterBrokerAsker(broker.queue)
process = Process(target=task, args=(broker_data,))
process.start()
process.join()
print("Process finished")
broker.queue.put(None)
running local function!
True
I put in queue
I got result
Process finished
the split to FilterBrokerAsker
is to get it to work with spawn
instead of fork
, as threads are not picklable.
note that cloudpickle has problems with imports, and you may need to re-import things inside your functions, the whole concept is very fragile, and FWIW you should just use threads instead.