I am starting a daemon thread from a context manager that should send a heartbeat every second, but since it is running in a thread it will not terminate the context manager if an exception occurs. How do I raise an exception in the context manager when the heartbeats stops?
from contextlib import contextmanager
from threading import Thread, Event
from time import sleep
@contextmanager
def plc():
stop_event = Event()
try:
# Send heartbeat every second
hb_t = Thread(target=heartbeat_task,
args=(stop_event,),
daemon=True)
hb_t.start()
yield
except Exception:
raise
finally:
stop_event.set()
hb_t.join()
print("Heartbeat stopped")
def heartbeat_task(stop_event):
value = False
while not stop_event.is_set():
value = not value
print("Heartbeat: " + str(value))
sleep(1)
def main():
with plc():
while True:
print("Program running")
sleep(5)
if __name__ == '__main__':
main()
I have a hard time finding examples of this.
Thanks for helping out!
Update
I have modified the code to be more closely aligned with the code you posted. But:
Your code as presented has an inconsistency: heartbeat_task
is passed an event that if set will cause the function to return. But it is only set when the context manager in function main
created with with plc():
exits, which is never. If you are hoping that any exception thrown by heartbeat_task
will force the context manager to exit and then caught in function plc
, then what is the point of it calling stop_event.set()
if by definition we only arrive here if heartbeat_task
is no longer running due to an exception?
So either you want heartbeat_task
to run indefinitely until it raises an exception (in which case there is no point in having a "stop" event) or you want to be able to stop heartbeat_task
when some condition exists, but there is no code for doing that. I will assume for demo purposes that main
will be given access to the stop_event
event and will set it under some circumstance. Otherwise, it runs until it detects that heartbeat_task
is no longer running presumably because it raised an exception (it is executing an infinite loop, so how else could it terminate if the stop event has not been set?). What remains is why you need to be using a context manager at all. I will present an alternative later on.
If you use a multithreading pool (we only need one thread in the pool), it become simple for the main thread to catch exceptions thrown by a task submitted to the pool: When multiprocessing.pool.ThreadPool.apply_async
is called a multiprocessing.pool.AsyncResult
instance is returned that represents a future completion. When method get
is called on this instance you either get the return value from the worker function (heartbeat_task
) or any exception thrown by the worker function is re-raised. But we can also use method wait
to wait for either the completion of the submitted task or an elapsed time. We can then test whether after waiting 5 seconds whether the submitted task actually finished (due to an exception or return) with method ready
. If the task is still running, then we can tell it to stop. In this demo I force the task to raise an exception after approximately 7 seconds:
from contextlib import contextmanager
from threading import Event
from multiprocessing.pool import ThreadPool
from time import sleep
@contextmanager
def plc():
stop_event = Event()
pool = ThreadPool(1)
# Send heartbeat every second
async_result = pool.apply_async(heartbeat_task, args=(stop_event,))
yield stop_event, async_result
# We only return here if the task is no longer running
try:
# See if task threw an exception and if so, catch it:
async_result.get()
except Exception as e:
print("Got exception:", e)
finally:
pool.close()
pool.join()
print("Heartbeat stopped")
def heartbeat_task(stop_event):
# For demo purposes, we will force an exception to occur
# after approximately 7 seconds:
value = False
n = 0
while not stop_event.is_set():
value = not value
print("Heartbeat: " + str(value))
sleep(1)
n += 1
if n == 7:
raise Exception('Oops!')
def main():
with plc() as tpl:
stop_event, async_result = tpl
# This function could forcibly cause the heartbeat_task
# to complete by calling stop_event.set()
# Loop while the task is still running
while not async_result.ready():
"""
if some_condition:
stop_event.set()
break
"""
print("Program running")
# Sleep for 5 seconds or until heartbeat_task terminates:
async_result.wait(5)
if __name__ == '__main__':
main()
Prints:
Program running
Heartbeat: True
Heartbeat: False
Heartbeat: True
Heartbeat: False
Heartbeat: True
Program running
Heartbeat: False
Heartbeat: True
Got exception: Oops!
Heartbeat stopped
Alternative to Using a Context Manager
from threading import Event
from multiprocessing.pool import ThreadPool
from time import sleep
def heartbeat_task(stop_event):
value = False
n = 0
while not stop_event.is_set():
value = not value
print("Heartbeat: " + str(value))
sleep(1)
n += 1
if n == 7:
raise Exception('Oops!')
def main():
stop_event = Event()
pool = ThreadPool(1)
async_result = pool.apply_async(heartbeat_task, args=(stop_event,))
# Run as long as heartbeat_task is running:
while not async_result.ready():
"""
if some_condition:
stop_event.set()
break
"""
print("Program running")
# Sleep for 5 seconds or until heartbeat_task terminates:
async_result.wait(5)
# Any exception thrown in heartbeat_task will be rethrown and caught here:
try:
async_result.get()
except Exception as e:
print("Got exception:", e)
finally:
pool.close()
pool.join()
if __name__ == '__main__':
main()