pythonconcurrent.futures

Interrupt ThreadPoolExecutor


How can I immediately stop a concurrent.futures.ThreadPoolExecutor and discard any pending operations when a script is interrupted, as in the example below:

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed


def get_parallel(arguments_list: list[dict]):
    try:
        with ThreadPoolExecutor(max_workers=25) as executor:
            futures_buffer = [executor.submit(requests.get, **kwargs) for kwargs in arguments_list]

            for future in as_completed(futures_buffer):
                try:
                    response = future.result()
                    print (response.url)
                    yield response.url, response.status_code, response.json()['args']
                except KeyboardInterrupt:
                    executor.shutdown(wait=False, cancel_futures=True)
                    yield 'KeyboardInterrupt 1'
                    return
                except Exception as exception:
                    yield exception
    except KeyboardInterrupt:
        yield 'KeyboardInterrupt 2'
        return


if __name__ == '__main__':
    arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
    for t in get_parallel(arguments):
        print(t)

As the code is now, when I run it from terminal and then call ^C, it will stop printing results but will hang for the same time as if it wasn't interrupted, and finally it will print KeyboardInterrupt 2.


Solution

  • You can't signal individual threads in python and even if you could, there's no guarantee that requests didn't create its own threads as workers. You can kill processes, so you could delegate the request to a subprocess and kill it when you get a keyboard interrupt. The cleanest way to do this is to manage your own subprocesses and a queue of work items. But, if you don't mind a little hacking, concurrent.futures.ProcessPoolExecutor keeps a list of its pool processes and you can hijack that. But its hacking ... it may break some time in the future. Since the response is pickled and sent back to the parent process, its best to have an intermediate worker function that grabs useful data from the response object, rather than the response object itself.

    import concurrent.futures
    import os
    import requests
    import signal
    
    def worker(arguments_list: dict):
        """use requests to get web page and return url, status, json args"""
        resp = requests.get(**arguments_list)
        # todo: when status_code not 200 and json decode fails, do...???
        return resp.url, resp.status_code, resp.json()['args']
    
    def get_parallel(arguments_list: list[dict]):
        with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
            try:
                futures_buffer = [executor.submit(worker, kwargs) for kwargs in arguments_list]
    
                for future in concurrent.futures.as_completed(futures_buffer):
                        url, status_code, args = future.result()
                        print (url)
                        yield url, status_code, args
            except KeyboardInterrupt:
                for pid in executor._processes:
                    os.kill(pid, signal.SIGKILL)
                yield 'KeyboardInterrupt 2'
    
    if __name__ == '__main__':
        arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
        for t in get_parallel(arguments):
            print(t)