pythonpython-3.xmultiprocessingmultiprocessing-manager

How to run parallel processes with multiprocessing terminate one of them from another and then restart


I get error when running the code

TypeError: Pickling an AuthenticationString object is disallowed for security reasons

I am very new to this... You can add your own completely new solution/code to this similar to my code as long as you can meet the goals(for example no need to use arrays to store process ids for termination etc...), goals are: 1.Running multiple processes in parallel and then when certain condition is met in STATECHECK 2.Kill processes that are appended to a sepcific Array (in this case main_processes,normal_processes) 3.Then terminate them from another process(i attempt to do so from reset() inside statecheck()) 4.Restart the same process(randomx) from statecheck

import multiprocessing
import time
from multiprocessing import Process

data_dict = {'y': 4,
             'h': 5,
             'j': 6,
             'o': 7,
             'p': 3,
             'b': 11}


def randomx(key):
    for i in range(0, 2, 1):
        print(key)
        time.sleep(data_dict[key])


def move():
    print("Starting move2obj")
    for i in range(0, 5, 1):
        print(i)

def statecheck(np, cp):
    print("statecheck started")
    reset(np, cp)
    time.sleep(10)
    randomx()
    time.sleep(20)


def run():
    for key in data_dict:
        key = Process(target=randomx, args=key)
        print(main_processes, key)
        main_processes.append(key)
        all_processes.append(key)
        key.start()

    process24 = multiprocessing.Process(target=move)
    process24.start()
    process11 = Process(target=statecheck, args=[normal_processes, main_processes])
    process11.start()

    normal_processes.append(str(process24))
    all_processes.append(str(process24))
    all_processes.append(str(process11))


def reset(normal_processes, main_processes, *args):
    time.sleep(1)
    print("killing normal :", normal_processes)
    for process in normal_processes:
        process.terminate()
        process.join()
    time.sleep(1)
    print("killing combat :", main_processes)
    for process in main_processes:
        process.terminate()
        process.join()


def processes():
    print(main_processes, normal_processes, all_processes)
    return normal_processes, main_processes, all_processes


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    normal_processes = manager.list()
    main_processes = manager.list()
    all_processes = manager.list()
    run()

Solution

  • Here is a simpler way, relevant to your code, to reproduce the error you are getting:

    import time
    from multiprocessing import Manager, Process
    
    def a():
        time.sleep(4)
        print('done')
    
    
    
    if __name__ == "__main__":
        manager = Manager()
        l = manager.list()
    
        b = Process(target=a)
        b.start()
        l.append(b)
        b.join()
    

    Output

    TypeError: Pickling an AuthenticationString object is disallowed for security reasons
    

    The reason why this happens has been explained in this answer. In short, you are trying to pickle something which is explicitly meant not to be pickled. That does not mean it cannot be worked around, however. You just need to use multiprocess, and make the the authkey of the process picklable:

    import time
    from multiprocess import Manager, Process
    
    
    class MyProcess(Process):
    
        def _prepare(self):
            self._config['authkey'] = bytes(self._config['authkey'])
    
        def start(self):
            self._prepare()
            return super().start()
    
    
    def a():
        time.sleep(4)
        print('done')
    
    
    
    if __name__ == "__main__":
        manager = Manager()
        l = manager.list()
    
        b = MyProcess(target=a)
        b.start()
        l.append(b)
        b.join()
    

    Output

    done
    

    So, for your code, instead of using the Process class, use the above MyProcess class to create picklable processes.

    With that being said, unless you know what you are doing and are aware of the possible consequences, it's always better not to try and work around such "issues". They are there, by design, for a reason. So, instead of creating processes and adding them to a managed list (and then also passing that list to other processes), you should only join/terminate processes from the process that created them.

    Edit

    Also how is data passed between processes? in your example you do not demonstrate any data transfer between processes ? if i for example write print(l) inside def a it returns error?

    What do you mean by "demonstrate" data-transfer? Also, what you are doing will obviously raise an error since a() does not know what l is. You need to pass the managed list as an argument to a for that to work.

    Now about terminating processes from other processes, it's possible, but I can't really think of a use-case for such a thing which couldn't be done using other efficient methods. In short, if you are thinking along these lines you probably need to re-evaluate your whole approach.

    If you still want to however, you can do this by creating the processes in another process using managers, and passing those instead of the actual processes to other processes. You won't need multiprocess for this, and neither will you need to alter the authkey of the processes themselves. Example:

    import time
    from multiprocessing import Manager, Process
    from multiprocessing.managers import NamespaceProxy, BaseManager
    import inspect
    
    
    
    class ObjProxy(NamespaceProxy):
        """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
        functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
        pickable and can its state can be shared among different processes. """
    
        @classmethod
        def populate_obj_attributes(cls, real_cls):
            DISALLOWED = set(dir(cls))
            ALLOWED = ['__sizeof__', '__eq__', '__ne__', '__le__', '__repr__', '__dict__', '__lt__',
                       '__gt__']
            DISALLOWED.add('__class__')
            new_dict = {}
            for (attr, value) in inspect.getmembers(real_cls, callable):
                if attr not in DISALLOWED or attr in ALLOWED:
                    new_dict[attr] = cls._proxy_wrap(attr)
            return new_dict
    
        @staticmethod
        def _proxy_wrap(attr):
            """ This method creates function that calls the proxified object's method."""
    
            def f(self, *args, **kwargs):
                return self._callmethod(attr, args, kwargs)
    
            return f
    
    attributes = ObjProxy.populate_obj_attributes(Process)
    ProcessProxy = type("ProcessProxy", (ObjProxy,), attributes)
    
    
    def func1():
        time.sleep(7)
        print('done')
    
    def func2(l):
        proc = l[0]
        proc.terminate()
        print('terminated')
    
    
    if __name__ == "__main__":
        m = Manager()
        l = m.list()
    
        BaseManager.register('Process', Process, ProcessProxy, exposed=tuple(dir(ProcessProxy)))
        manager = BaseManager()
        manager.start()
    
        p1 = manager.Process(target=func1)
        p2 = manager.Process(target=func2, args=(l,))
        print(p1, p2)
    
        l.append(p1)
    
        p1.start()
        p2.start()
    
        p2.join()
        p2.join()
    

    Output

    <Process name='Process-2:1' parent=115344 initial> <Process name='Process-2:2' parent=115344 initial>
    terminated
    

    Just use manager.Process instead of the Process class to create processes. Check this answer if you want more details as to how the manager and proxy above work.