This Python program:
import concurrent.futures
import multiprocessing
import time
class A:
def __init__(self):
self.event = multiprocessing.Manager().Event()
def start(self):
try:
while True:
if self.event.is_set():
break
print("processing")
time.sleep(1)
except BaseException as e:
print(type(e).__name__ + " (from pool thread):", e)
def shutdown(self):
self.event.set()
if __name__ == "__main__":
try:
a = A()
pool = concurrent.futures.ThreadPoolExecutor(1)
future = pool.submit(a.start)
while not future.done():
concurrent.futures.wait([future], timeout=0.1)
except BaseException as e:
print(type(e).__name__ + " (from main thread):", e)
finally:
a.shutdown()
pool.shutdown()
outputs:
processing
processing
processing
KeyboardInterrupt (from main thread):
BrokenPipeError (from pool thread): [WinError 232] The pipe is being closed
Traceback (most recent call last):
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 788, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File ".\foo.py", line 34, in <module>
a.shutdown()
File ".\foo.py", line 21, in shutdown
self.event.set()
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 1067, in set
return self._callmethod('set')
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 792, in _callmethod
self._connect()
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 779, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "C:\Program Files\Python37\lib\multiprocessing\connection.py", line 490, in Client
c = PipeClient(address)
File "C:\Program Files\Python37\lib\multiprocessing\connection.py", line 691, in PipeClient
_winapi.WaitNamedPipe(address, 1000)
FileNotFoundError: [WinError 2] The system cannot find the file specified
when it is run and a SIGINT signal is sent after three seconds (by pressing Ctrl+C).
Analysis. — The SIGINT signal is sent to the main thread of each process. In this case there are two processes: the main process and the manager's child process.
SIGINT signal, the default SIGINT signal handler raises the KeyboardInterrupt exception, which is caught and printed.SIGINT signal, the default SIGINT signal handler raises a KeyboardInterrupt exception, which terminates the child process. Consequently all subsequent uses of the manager's shared objects by other processes raise a BrokenPipeError exception.BrokenPipeError exception is raised at the line if self.event.is_set():.a.shutdown(), which raises the AttributeError and FileNotFoundError exceptions.How to prevent this BrokenPipeError exception?
A solution to this issue is to override the default SIGINT signal handler with a handler that will ignore the signal, for instance with the signal.SIG_IGN standard signal handler. It is possible by calling the signal.signal function at the start of the manager's child process:
import concurrent.futures
import multiprocessing.managers
import signal
import time
def init():
signal.signal(signal.SIGINT, signal.SIG_IGN)
class A:
def __init__(self):
manager = multiprocessing.managers.SyncManager()
manager.start(init)
self.event = manager.Event()
def start(self):
try:
while True:
if self.event.is_set():
break
print("processing")
time.sleep(1)
except BaseException as e:
print(type(e).__name__ + " (from pool thread):", e)
def shutdown(self):
self.event.set()
if __name__ == "__main__":
try:
a = A()
pool = concurrent.futures.ThreadPoolExecutor(1)
future = pool.submit(a.start)
while not future.done():
concurrent.futures.wait([future], timeout=0.1)
except BaseException as e:
print(type(e).__name__ + " (from main thread):", e)
finally:
a.shutdown()
pool.shutdown()
Note. — This program also works with a concurrent.futures.ProcessPoolExecutor.