pythonparallel-processingrequestosrm

Error while multi threading API queries in Python


I'm performing queries in a server with Open Source Routing Machine (OSRM) deployed. I send a set of coordinates and obtain a n x n matrix of network distances over a streets network.

In order to improve the speed of the computations, I want to use "ThreadPoolExecutor" to parallelize queries.

So far, I'm setting the connection in two ways, both giving me the same error:


def osrm_query(url_input):
    'Send request'
    response = requests.get(url_input)
    r = response.json()

    return r


def osrm_query_2(url_input):
    'Send request'
    s = requests.Session()
    retries = Retry(total=3,
                    backoff_factor=0.1,
                    status_forcelist=[ 500, 502, 503, 504 ])

    s.mount('https://', HTTPAdapter(max_retries=retries))
    response = s.get(url_input)
    r = response.json()

    return r

I generate the set of URLs (in the _urls list) that I want to send as requests and parallelize this way:

with ThreadPoolExecutor(max_workers=5) as executor:
    for each in executor.map(osrm_query_2, _urls):
        r.append(each)

So far, all works ok, but then, when parsing more than 40,000 URLs, I get this error back:

OSError: [WinError 10048] Only one usage of each socket address (protocol/networ
k address/port) is normally permitted

As far as I understand, the problem is that I send too many request from my machine, exhausting the number of ports available to send the request (it looks that this has nothing to do with the machine I'm sending the request to).

How can I fix this? There is a way to tell the treadPoolExecutor to re-use the connections?


Solution

  • I was guided in the right direction by someone outside Stack Overflow.

    The trick was to point the workers of the pool to a request Session. The function to send the queries was re-worked as follows:

    def osrm_query(url_input, session):
        'Send request'
        response = session.get(url_input)
        r = response.json()
    
        return r
    

    And the parallelization would be:

    with ThreadPoolExecutor(max_workers=50) as executor:
        with requests.Session() as s:
            for each in executor.map(osrm_query, _urls, repeat(s)):
                r.append(each)
    

    This way I reduced the time of execution from 100 minutes (not parallelized) to 7 minutes with 50 workers as the max_workers argument, for 200,000 urls.