pythonpython-3.xmultiprocessingpython-multiprocessing

Cannot subclass multiprocessing Queue in Python 3.5


My eventual goal is to redirect the stdout from several subprocesses to some queues, and print those out somewhere (maybe in a little GUI).

The first step is to subclass Queue into an object that behaves much like the stdout. But that is where I got stuck. Subclassing the multiprocessing Queue seems impossible in Python v3.5.

# This is a Queue that behaves like stdout
# Unfortunately, doesn't work in Python 3.5   :-(
class StdoutQueue(Queue):
    def __init__(self,*args,**kwargs):
        Queue.__init__(self,*args,**kwargs, ctx='')

    def write(self,msg):
        self.put(msg)

    def flush(self):
        sys.__stdout__.flush()

I found this snippet in the following post (probably Python 3.5 did not yet exist at that moment): Python multiprocessing redirect stdout of a child process to a Tkinter Text

In Python v3.5 you stumble on strange error messages when subclassing the multiprocessing Queue class. I found two bug reports describing the issue:

https://bugs.python.org/issue21367

https://bugs.python.org/issue19895

I have 2 questions:

  1. Suppose I want to stick to Python v3.5 - going to a previous version is not really an option. What workaround can I use to subclass the multiprocessing Queue somehow?
  2. Is the bug still around if I upgrade to Python v3.6?

Solution

  • >>> import multiprocessing
    >>> type(multiprocessing.Queue)
    <class 'method'>
    AttributeError: module 'multiprocessing' has no attribute 'queues'
    >>> import multiprocessing.queues
    >>> type(multiprocessing.queues.Queue)
    <class 'type'>
    

    So as you can see multiprocessing.Queue is just constructor method for multiprocessing.queues.Queue class. If you want to make a child class just do class MyQueue(multiprocessing.queues.Queue)

    You can see source of this method here

    As you can see on a link above, multiprocessing.Queue passes ctx argument to Queue. So I managed to get it working by doing it myself in __init__ method. I don't completely understand where BaseContext object supposed to get _name attribute, so I passed it manually.

    def __init__(self,*args,**kwargs):
        from multiprocessing.context import BaseContext
        ctx = BaseContext()
        ctx._name = "Name"
        super(StdoutQueue,self).__init__(*args,**kwargs, ctx=ctx)
    

    Turns out docs have some information about context here. So instead of manually creating it like I did you can do

    import multiprocessing
    ctx = multiprocessing.get_context()
    

    It will create proper context with _name set (to 'fork' in your particular case) and you can pass it to your queue.