pythonmultiprocessingoserror

Forced Slowdown of Multiprocessing Generation vs OSError Too Many Open Files


I have the following code. In testing, I found that when I get several hundred concurrent child processes (somewhere around 400?), I get "OSError Too Many Open Files". Any idea why?

I can solve the problem with the time.sleep(.005) call, but I shouldn't have to.

This is a part of a larger program. A typical call will set a server string, token string, and a list of many thousands of devices. For the REST API call used, the server can only handle a single device at a time. In testing, this resulted in a 20 min execution time, but indications are that using a multiprocessing approach can reduce it to around 30 sec.

import urllib, requests, json, sys, pprint, time, multiprocessing as mp

assert sys.version_info >= (3, 6), "Must use Python 3.6+"

###########################
### handler function for multiprocessing worker
###########################

def getAttributesOneDevice(server, device, token, q):
    """Handler function for getting a single device"""
    serverURL = server + "/ServicesAPI/API/V1/CMDB/Devices/Attributes"
    headers = { "Accept" : "application/json",
                "Content-Type" : "application/json",
                "token" : token }
    query = { "hostname" : device }

    response = requests.get(serverURL, headers = headers, params = query, verify = False)
    q.put(response.json())
# end getAttributesOneDevice()

def GetDeviceAttributes(server = "", token = "", devices = []):
    """
    See this URL for explanation of what this function does
    https://github.com/NetBrainAPI/NetBrain-REST-API-V8.02/blob/master
                      /REST APIs Documentation/Devices Management
                      /Get Device Attributes API.md

    To summarize the URL: will acquire detailed device attributes for a single
    device.

    This subroutine therefore queries for all devices provided, and assemble the
    results into a single list of dicts.

    Server queries are relatively expensive.  A single query is not a big deal,
    but accumulated across a massive device list and this can take excessive
    time to execute (20min, etc). Therefor, this procedure is parallelized
    through multi-processing to complete in a reasonable amount of time.

    'server' should be a string that is just the http(s)://<FQDN>.  Do not
    incude the trailing '/'.

    'token' should be an authentication token that was generated by
    GetLoginToken and SetWorkingDomain modules in this directory.

    'devices' should be a list of strings, where each entry is a device.

    return a single dictionary:
        "Table" a list of dicts, each dict the detailed attributes of a device
        "Missed" a list of devices that had no result

    Note that failure to capture a device is distinct from function failure.
    """

    resultsTable = []
    MissedDevices = []
    procList = []

    for dev in devices:
        q = mp.Queue()
        proc = mp.Process(target=getAttributesOneDevice,
                          args=(server, dev, token, q))
        proc.start()
        procList += [ {"proc" : proc, "dev" : dev, "queue" : q} ]

        # If I don't do this as I'm going, I *always* get "OSError too many open files" 
        updatedProcList = []
        for proc in procList:
            if proc["proc"].is_alive():
                updatedProcList += [proc]
            else:
                # kill zombies
                if proc["queue"].empty():
                    MissedDevices += [ proc["dev"] ]
                else:
                    queueData = proc["queue"].get()
                    resultsTable += [ queueData ]

                while not proc["queue"].empty():
                    # drain whatever's left before we closeout the process
                    proc["queue"].get()
                proc["proc"].join()

        procList = updatedProcList

        # if I don't do this, I get "OSError too many open files" at somewhere
        # around 375-400 child processes
        time.sleep(.005)

    # I could instead embed the list comprehension in the while statement,
    # but that would hinder troubleshooting
    remainingProcs = [ 1 ]
    while len(remainingProcs) > 0:
        remainingProcs = [ proc for proc in procList if proc["proc"].is_alive()]
        time.sleep(1)

    for proc in procList:
        # kill zombies
        if proc["queue"].empty():
            MissedDevices += [ proc["dev"] ]
        else:
            queueData = proc["queue"].get()
            resultsTable += [ queueData ]

        while not proc["queue"].empty():
            # drain whatever's left before we closeout the process
            proc["queue"].get()
        proc["proc"].join()

    return { "Table" : resultsTable, "Missed" : MissedDevices }

Solution

  • You should be using multithreading with a multithreading pool (which can easily handle up to 500 threads) based on seeing that getAttributesOneDevice spends almost all of its time waiting for a network request to complete. You should also use a requests.Session object for doing the GET requests because according to the documentation:

    The Session object allows you to persist certain parameters across requests. It also persists cookies across all requests made from the Session instance, and will use urllib3’s connection pooling. So if you’re making several requests to the same host, the underlying TCP connection will be reused, which can result in a significant performance increase (see HTTP persistent connection).

    The worker function, getAttributesOneDevice, should be modified to raise an exception if it fails to capture a device.

    import urllib, requests, json, sys, pprint, time
    from multiprocessing.pool import ThreadPool
    from functools import partial
    
    assert sys.version_info >= (3, 6), "Must use Python 3.6+"
    
    ###########################
    ### handler function for multiprocessing worker
    ###########################
    
    def getAttributesOneDevice(session, serverURL, token, device):
        """Handler function for getting a single device"""
        query = { "hostname" : device }
    
        response = session.get(serverURL, params = query, verify = False)
        # Raise an exception if unable to capture a device
        response.raise_for_status()
        # Should the response itself be checked to ensure a device was captured
        # and an expection be raised if not?
        return response.json()
    
    def GetDeviceAttributes(server = "", token = "", devices = []):
        """
        See this URL for explanation of what this function does
        https://github.com/NetBrainAPI/NetBrain-REST-API-V8.02/blob/master
                          /REST APIs Documentation/Devices Management
                          /Get Device Attributes API.md
    
        To summarize the URL: will acquire detailed device attributes for a single
        device.
    
        This subroutine therefore queries for all devices provided, and assemble the
        results into a single list of dicts.
    
        Server queries are relatively expensive.  A single query is not a big deal,
        but accumulated across a massive device list and this can take excessive
        time to execute (20min, etc). Therefor, this procedure is parallelized
        through multi-processing to complete in a reasonable amount of time.
    
        'server' should be a string that is just the http(s)://<FQDN>.  Do not
        incude the trailing '/'.
    
        'token' should be an authentication token that was generated by
        GetLoginToken and SetWorkingDomain modules in this directory.
    
        'devices' should be a list of strings, where each entry is a device.
    
        return a single dictionary:
            "Table" a list of dicts, each dict the detailed attributes of a device
            "Missed" a list of devices that had no result
    
        Note that failure to capture a device is distinct from function failure.
        """
    
        with requests.Session() as session, \
        ThreadPool(min(len(devices), 500)) as pool:
            session.headers = { "Accept" : "application/json",
                                "Content-Type" : "application/json",
                                "token" : token }
            # Compute this once here:
            serverURL = server + "/ServicesAPI/API/V1/CMDB/Devices/Attributes"
            # The serverUrl and token arguments never vary:
            worker = partial(getAttributesOneDevicesession, serverURL, token)
            resultsTable = []
            MissedDevices = []
            results = pool.imap(worker, devices)
            device_index = 0
            while True:
                try:
                    result.append(results.__next__())
                except StopIteration:
                    break
                except:
                    # This is the device that caused the exception.
                    # The assumption is that devices is indexable:
                    MissedDevices.append(devices[device_index])
                finally:
                    device_index += 1