pythonmultiprocessingdisruptor-patternlmax

AttributeError: Can't get attribute 'journalerReader' on <module '__mp_main__


I tried to implement Lmax in python .I tried to handle data in 4 processes

import disruptor  
import multiprocessing
import random

if __name__ == '__main__':

    cb = disruptor.CircularBuffer(5)

    def receiveWriter():
        while(True):
            n = random.randint(5,20)
            cb.receive(n)

    def ReplicatorReader():
        while(True):
            cb.replicator()

    def journalerReader():
        while(True):
            cb.journaler()

    def unmarshallerReader():
        while(True):
            cb.unmarshaller()

    def consumeReader():
        while(True):
            print(cb.consume())
    


  
    p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
    p1.start()

    p0 = multiprocessing.Process(name="p0",target=receiveWriter)
    p0.start()
    
    p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
    p1.start()

    p2 = multiprocessing.Process(name="p2",target=journalerReader)
    p2.start()

    p3 = multiprocessing.Process(name="p3",target=unmarshallerReader)
    p3.start()

    p4 = multiprocessing.Process(name="p4",target=consumeReader)
    p4.start()
   

but I get this Error in my code :

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "<string>", line 1, in <module>
  File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 116, in spawn_main
  File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
    exitcode = _main(fd, parent_sentinel)
  File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 126, in _main
  File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'unmarshallerReader' on <module '__mp_main__' from 'd:\\python\\RunDisruptor.py'>
AttributeError: Can't get attribute 'consumeReader' on <module '__mp_main__' from 'd:\\python\\RunDisruptor.py'>

Solution

  • Your first problem is that the target of a Process call cannot be within the if __name__ == '__main__': block. But:

    As I mentioned in an earlier post of yours, the only way I see that you can share an instance of CircularBuffer across multiple processess is to implement a managed class, which surprisingly is not all that difficult to do. But when you create a managed class and create an instance of that class, what you have is actually a proxy reference to the object. This has two implications:

    1. Each method call is more like a remote procedure call to a special server process created by the manager you will start up and therefore has more overhead than a local method call.
    2. If you print the reference, the class's __str__ method will not be called; you will be printing a representation of the proxy pointer. You should probably rename method __str__ to something like dump and call that explicitly whenever you want a representation of the instance.

    You should also explicitly wait for the completion of the processes you are creating so that the manager service does not shutdown prematurely, which means that each process should be assigned to a unique variable and have a unique name.

    import disruptor
    import multiprocessing
    from multiprocessing.managers import BaseManager
    import random
    
    class CircularBufferManager(BaseManager):
        pass
    
    def receiveWriter(cb):
        while(True):
            n = random.randint(5,20)
            cb.receive(n)
    
    def ReplicatorReader(cb):
        while(True):
            cb.replicator()
    
    def journalerReader(cb):
        while(True):
            cb.journaler()
    
    def unmarshallerReader(cb):
        while(True):
            cb.unmarshaller()
    
    def consumeReader(cb):
        while(True):
            print(cb.consume())
    
    if __name__ == '__main__':
        # Create managed class
        CircularBufferManager.register('CircularBuffer', disruptor.CircularBuffer)
        # create and start manager:
        with CircularBufferManager() as manager:
            cb = manager.CircularBuffer(5)
    
            p1 = multiprocessing.Process(name="p1", target=ReplicatorReader, args=(cb,))
            p1.start()
    
            p0 = multiprocessing.Process(name="p0",target=receiveWriter, args=(cb,))
            p0.start()
    
            p1a = multiprocessing.Process(name="p1a",target=ReplicatorReader, args=(cb,))
            p1a.start()
    
            p2 = multiprocessing.Process(name="p2",target=journalerReader, args=(cb,))
            p2.start()
    
            p3 = multiprocessing.Process(name="p3",target=unmarshallerReader, args=(cb,))
            p3.start()
    
            p4 = multiprocessing.Process(name="p4",target=consumeReader, args=(cb,))
            p4.start()
    
            p1.join()
            p0.join()
            p1a.join()
            p2.join()
            p3.join()
            p4.join()