How can I immediately stop a concurrent.futures.ThreadPoolExecutor
and discard any pending operations when a script is interrupted, as in the example below:
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_parallel(arguments_list: list[dict]):
try:
with ThreadPoolExecutor(max_workers=25) as executor:
futures_buffer = [executor.submit(requests.get, **kwargs) for kwargs in arguments_list]
for future in as_completed(futures_buffer):
try:
response = future.result()
print (response.url)
yield response.url, response.status_code, response.json()['args']
except KeyboardInterrupt:
executor.shutdown(wait=False, cancel_futures=True)
yield 'KeyboardInterrupt 1'
return
except Exception as exception:
yield exception
except KeyboardInterrupt:
yield 'KeyboardInterrupt 2'
return
if __name__ == '__main__':
arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
for t in get_parallel(arguments):
print(t)
As the code is now, when I run it from terminal and then call ^C, it will stop printing results but will hang for the same time as if it wasn't interrupted, and finally it will print KeyboardInterrupt 2
.
You can't signal individual threads in python and even if you could, there's no guarantee that requests didn't create its own threads as workers. You can kill processes, so you could delegate the request to a subprocess and kill it when you get a keyboard interrupt. The cleanest way to do this is to manage your own subprocesses and a queue of work items. But, if you don't mind a little hacking, concurrent.futures.ProcessPoolExecutor
keeps a list of its pool processes and you can hijack that. But its hacking ... it may break some time in the future.
Since the response is pickled and sent back to the parent process, its best to have an intermediate worker function that grabs useful data from the response object, rather than the response object itself.
import concurrent.futures
import os
import requests
import signal
def worker(arguments_list: dict):
"""use requests to get web page and return url, status, json args"""
resp = requests.get(**arguments_list)
# todo: when status_code not 200 and json decode fails, do...???
return resp.url, resp.status_code, resp.json()['args']
def get_parallel(arguments_list: list[dict]):
with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
try:
futures_buffer = [executor.submit(worker, kwargs) for kwargs in arguments_list]
for future in concurrent.futures.as_completed(futures_buffer):
url, status_code, args = future.result()
print (url)
yield url, status_code, args
except KeyboardInterrupt:
for pid in executor._processes:
os.kill(pid, signal.SIGKILL)
yield 'KeyboardInterrupt 2'
if __name__ == '__main__':
arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
for t in get_parallel(arguments):
print(t)