I tried to implement Lmax in python .I tried to handle data in 4 processes
import disruptor
import multiprocessing
import random
if __name__ == '__main__':
cb = disruptor.CircularBuffer(5)
def receiveWriter():
while(True):
n = random.randint(5,20)
cb.receive(n)
def ReplicatorReader():
while(True):
cb.replicator()
def journalerReader():
while(True):
cb.journaler()
def unmarshallerReader():
while(True):
cb.unmarshaller()
def consumeReader():
while(True):
print(cb.consume())
p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
p1.start()
p0 = multiprocessing.Process(name="p0",target=receiveWriter)
p0.start()
p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
p1.start()
p2 = multiprocessing.Process(name="p2",target=journalerReader)
p2.start()
p3 = multiprocessing.Process(name="p3",target=unmarshallerReader)
p3.start()
p4 = multiprocessing.Process(name="p4",target=consumeReader)
p4.start()
but I get this Error in my code :
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "<string>", line 1, in <module>
File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 116, in spawn_main
File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
exitcode = _main(fd, parent_sentinel)
File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 126, in _main
File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'unmarshallerReader' on <module '__mp_main__' from 'd:\\python\\RunDisruptor.py'>
AttributeError: Can't get attribute 'consumeReader' on <module '__mp_main__' from 'd:\\python\\RunDisruptor.py'>
Your first problem is that the target of a Process
call cannot be within the if __name__ == '__main__':
block. But:
As I mentioned in an earlier post of yours, the only way I see that you can share an instance of CircularBuffer
across multiple processess is to implement a managed class, which surprisingly is not all that difficult to do. But when you create a managed class and create an instance of that class, what you have is actually a proxy reference to the object. This has two implications:
__str__
method will not be called; you will be printing a representation of the proxy pointer. You should probably rename method __str__
to something like dump
and call that explicitly whenever you want a representation of the instance.You should also explicitly wait for the completion of the processes you are creating so that the manager service does not shutdown prematurely, which means that each process should be assigned to a unique variable and have a unique name.
import disruptor
import multiprocessing
from multiprocessing.managers import BaseManager
import random
class CircularBufferManager(BaseManager):
pass
def receiveWriter(cb):
while(True):
n = random.randint(5,20)
cb.receive(n)
def ReplicatorReader(cb):
while(True):
cb.replicator()
def journalerReader(cb):
while(True):
cb.journaler()
def unmarshallerReader(cb):
while(True):
cb.unmarshaller()
def consumeReader(cb):
while(True):
print(cb.consume())
if __name__ == '__main__':
# Create managed class
CircularBufferManager.register('CircularBuffer', disruptor.CircularBuffer)
# create and start manager:
with CircularBufferManager() as manager:
cb = manager.CircularBuffer(5)
p1 = multiprocessing.Process(name="p1", target=ReplicatorReader, args=(cb,))
p1.start()
p0 = multiprocessing.Process(name="p0",target=receiveWriter, args=(cb,))
p0.start()
p1a = multiprocessing.Process(name="p1a",target=ReplicatorReader, args=(cb,))
p1a.start()
p2 = multiprocessing.Process(name="p2",target=journalerReader, args=(cb,))
p2.start()
p3 = multiprocessing.Process(name="p3",target=unmarshallerReader, args=(cb,))
p3.start()
p4 = multiprocessing.Process(name="p4",target=consumeReader, args=(cb,))
p4.start()
p1.join()
p0.join()
p1a.join()
p2.join()
p3.join()
p4.join()