pythonmultithreadingconcurrent.futures

How do you kill Futures once they have started?


I am using the new concurrent.futures module (which also has a Python 2 backport) to do some simple multithreaded I/O. I am having trouble understanding how to cleanly kill tasks started using this module.

Check out the following Python 2/3 script, which reproduces the behavior I'm seeing:

#!/usr/bin/env python
from __future__ import print_function

import concurrent.futures
import time


def control_c_this():
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future1 = executor.submit(wait_a_bit, name="Jack")
        future2 = executor.submit(wait_a_bit, name="Jill")
        
        for future in concurrent.futures.as_completed([future1, future2]):
            future.result()
        
        print("All done!")


def wait_a_bit(name):
    print("{n} is waiting...".format(n=name))
    time.sleep(100)


if __name__ == "__main__":
    control_c_this()

While this script is running it appears impossible to kill cleanly using the regular Ctrl+C keyboard interrupt. I am running on OS X.

Most documentation I've found online talks about how to cleanly kill threads with the old threading module. None of it seems to apply here.

And all the methods provided within the concurrent.futures module to stop stuff (like Executor.shutdown() and Future.cancel()) only work when the Futures haven't started yet or are complete, which is pointless in this case. I want to interrupt the Future immediately.

My use case is simple: When the user hits Ctrl+C, the script should exit immediately like any well-behaved script does. That's all I want.

So what's the proper way to get this behavior when using concurrent.futures?


Solution

  • It's kind of painful. Essentially, your worker threads have to be finished before your main thread can exit. You cannot exit unless they do. The typical workaround is to have some global state, that each thread can check to determine if they should do more work or not.

    Here's a quote explaining why:

    Workers are created as daemon threads. This is done to allow the interpreter to exit when there are still idle threads in a ThreadPoolExecutor's thread pool (i.e. shutdown() was not called). However, allowing workers to die with the interpreter has two undesirable properties:

    • The workers would still be running during interpretor shutdown, meaning that they would fail in unpredictable ways.
    • The workers could be killed while evaluating a work item, which could be bad if the callable being evaluated has external side-effects e.g. writing to a file.

    To work around this problem, an exit handler is installed which tells the workers to exit when their work queues are empty and then waits until the threads finish.

    In essence, if threads exited when the interpreter does, bad things could happen.

    Here's a working example. Note that Ctrl+C takes at most 1 sec to propagate because the sleep duration of the child thread.

    #!/usr/bin/env python
    from __future__ import print_function
    
    import concurrent.futures
    import time
    import sys
    
    quit = False
    def wait_a_bit(name):
        while not quit:
            print("{n} is doing work...".format(n=name))
            time.sleep(1)
    
    def setup():
        executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
        future1 = executor.submit(wait_a_bit, "Jack")
        future2 = executor.submit(wait_a_bit, "Jill")
    
        # main thread must be doing "work" to be able to catch a Ctrl+C 
        # http://www.luke.maurits.id.au/blog/post/threads-and-signals-in-python.html
        while (not (future1.done() and future2.done())):
            time.sleep(1)
    
    if __name__ == "__main__":
        try:
            setup()
        except KeyboardInterrupt:
            quit = True