pythonmultiprocess

Python multiprocess script doesnt exit on ctrl+c


I have a script, that runs various multiprocesses. These multiprocesses does not open child processes themself!

I now try to close the script with the Keyboardinterrupt. I see that all processes are terminated and joined. But the main process with not die. Why is that? I dont understand it...

Here is the code for the main process:

from position_pipeline import Pipeline as PositionPipeline
from relay_control import RelayControl
from server import run_webserver
from multiprocessing import Process, Value, Queue
import numpy as np
import cv2
import sys



def run_pipeline(pipeline, position_queue):
    pipeline.load_calibration()

    cap = cv2.VideoCapture("sample-video.mp4")

    ret, frame = cap.read()
    pipeline.load_image(frame) 
    pipeline.preprocess_image()

    pipeline.detect_circles()
    pipeline.calibrate()
    pipeline.store_calibration()

    while True:
        ret, frame = cap.read()
        if np.sum(frame) == None:
            break
        pipeline.load_image(frame)
        pipeline.preprocess_image()

        #pipeline.calibrate() maybe try to recalibrate very 100 iterations brcause of af distortion

        pipeline.detect_circles()
        pipeline.draw_circles()

        large_circles = []
        small_circles = []

        for circle in pipeline.circles[1:]:
            pipeline.draw_distance(circle)

            # split circles in large and small
            if circle[2] > 35 and circle[2] < 50:
                large_circles.append(pipeline.calculate_h_v_distance((circle[0], circle[1])))
            elif circle[2] > 15 and circle[2] < 25:
                small_circles.append(pipeline.calculate_h_v_distance((circle[0], circle[1])))

        position_queue.put({"large": large_circles, "small": small_circles, "image": pipeline.image})

    pipeline.store_calibration()
    cv2.destroyAllWindows()
    cap.release()

def run_relay_controller(relay_controller, relay_control_queue, relay_status_queue):
    while True:
        if not relay_control_queue.empty():
            commands = relay_control_queue.get()
            for command in commands:
                if command[0] == "on":
                    relay_controller.on(command[1])
                elif command[0] == "off":
                    relay_controller.off(command[1])
            relay_status_queue.put(relay_controller.status)


if __name__ == "__main__":
    position_pipeline = PositionPipeline()
    relay_controller = RelayControl([1, 2, 3, 4, 5, 6, 7, 8])

    relay_control_queue = Queue()
    relay_status_queue = Queue()
    position_queue = Queue()
    frame_queue = Queue()


    position_process = Process(target=run_pipeline, args=(position_pipeline, position_queue))
    relay_process = Process(target=run_relay_controller, args=(relay_controller, relay_control_queue, relay_status_queue))
    server_process = Process(target=run_webserver, args=())
    position_process.start()
    relay_process.start()
    server_process.start()

    
    relay_status = None
    circles = None

    while True:
        try: 
            if not position_process.is_alive() or not relay_process.is_alive():
                print("Process died")
                print(f"Position process alive: {position_process.is_alive()}")
                print(f"Relay process alive: {relay_process.is_alive()}")
                break
            
            relay_control_queue.put([("on", 1), ("off", 2)])
            
            if not position_queue.empty(): circles = position_queue.get(block=False)
            if not relay_status_queue.empty(): relay_status = relay_status_queue.get(block=False)

            if circles is not None:
            # print(f"Circles large: {circles['large']}")
                #print(f"Circles small: {circles['small']}")
                frame_queue.put(circles["image"])
                # check if there are still circles in the image, and if not --> initialize refill
            
            if relay_status is not None:
                None
                #print(f"Relay status: {relay_status}")
        except KeyboardInterrupt:
            print("Keyboard interrupt")
            position_process.terminate()
            position_process.join()
            print("Position process killed")
            relay_process.terminate()
            relay_process.join()
            print("Relay process killed")
            server_process.terminate()
            server_process.join()
            print("Server process killed")
            break

    print("Processes alive: ", position_process.is_alive(), relay_process.is_alive(), server_process.is_alive())
    sys.exit()

And this is the output:

Keyboard interrupt
Position process killed
Relay process killed
Server process killed
Processes alive:  False False False

But the main process never gets terminated.

Here is a Minimal Reproducable Example:

from multiprocessing import Process, Queue
import sys
import time

def run_pipeline():
    while True:
        print("pipeline running")
        time.sleep(5)

def run_relay_controller():
    while True:
        print("relay running")
        time.sleep(5)

def run_webserver():
    while True:
        print("webserver running")
        time.sleep(5)

if __name__ == "__main__":
    position_process = Process(target=run_pipeline, args=())
    relay_process = Process(target=run_relay_controller, args=())
    server_process = Process(target=run_webserver, args=())
    position_process.start()
    relay_process.start()
    server_process.start()

    frame_queue = Queue()

    while True:
        try:
            frame_queue.put("placeholder") # with this line the main process does not terminate
        except KeyboardInterrupt:
            position_process.terminate()
            relay_process.terminate()
            server_process.terminate()
            break
    position_process.join()
    relay_process.join()
    server_process.join()
    sys.exit()
    

    


Solution

  • The error stems from something with Queue maintaining a reference and not joining. https://github.com/python/cpython/issues/91185

    A solution would be as follows :

    if __name__ == "__main__":
    position_process = Process(target=run_pipeline, args=())
    relay_process = Process(target=run_relay_controller, args=())
    server_process = Process(target=run_webserver, args=())
    position_process.start()
    relay_process.start()
    server_process.start()
    
    frame_queue = Queue()
    
    while True:
        try:
            frame_queue.put("placeholder") # with this line the main process does not terminate
        except KeyboardInterrupt:
            position_process.terminate()
            relay_process.terminate()
            server_process.terminate()
            break
    
    position_process.join()
    relay_process.join()
    server_process.join()
    frame_queue.get() #Forcefully flush the queue
    frame_queue.close() #Close the queue
    sys.exit()
    

    Although not ideal, I would recommend you utilize something like a kill command through the queue to terminate gracefully through a kill command etc. as recommended by the docs.