pythonmultiprocessingpython-multithreading

Issues with Multithreading and Multiprocessing in OpenCV Multi-Stream System


import cv2
import threading, multiprocessing


class Video:
    def __init__(self, video_path: str, window_name: str):
        self.mutex = threading.Lock()
        self.__VD = cv2.VideoCapture(video_path)
        self.__queue = multiprocessing.Queue(maxsize=30)
        self.start_switch = threading.Barrier(2)
        self.close_switch = threading.Event()
        self.__window_name = window_name

    def __read_video(self):
        self.start_switch.wait()
        while not self.close_switch.is_set():
            ret, frame = self.__VD.read()
            if ret:
                self.__queue.put(frame)
            else:
                break
        self.__close()
        return

    def __show_video(self):
        self.start_switch.wait()
        while not self.close_switch.is_set():
            if self.__queue.qsize() > 0:
                frame = self.__queue.get()
                if cv2.waitKey(1) & 0xff == ord('c'):
                    self.__VD.read()
                    break
                cv2.imshow(f"{self.__window_name}", frame)
        self.__close()
        return

    def __close(self):
        if not self.close_switch.is_set():
            self.close_switch.set()
            self.__clear_queue()

    def __clear_queue(self):
        while self.__queue.qsize() > 0:
            self.__queue.get()
        print(f"{self.__window_name} closed")
        return

    def start(self):
        print(f"{self.__window_name} start")
        read_video = threading.Thread(target=self.__read_video)
        show_video = threading.Thread(target=self.__show_video)
        read_video.start()
        show_video.start()
        return


class VideoAgent:
    def __init__(self, number_of_video: list, video_name: list, name: str):
        self.__number_of_video = number_of_video
        self.__video_name = video_name
        self.__name = name

    def start_video(self):
        print(f"Video Agent {self.__name} start")
        videos = [Video(video_path=i, window_name=f"{self.__video_name[idx]}") for idx, i in enumerate(self.__number_of_video)]
        for i in videos:
            i.start()
        return

    def start(self):
        multiprocessing.Process(target=self.start_video).start()
        return


class Controller:
    def __init__(self, video_list: list, num_of_process: int):
        self.__video_list = video_list
        self.__num_of_process = num_of_process
        self.__video_split_list, self.__video_name_split_list = self.__split_video()
        self.video_agent_list = [
            VideoAgent(number_of_video=self.__video_split_list[i], video_name=self.__video_name_split_list[i],
                       name=f"Agent {i}") for i in range(len(self.__video_split_list))]

    def __split_video(self) -> tuple:
        video_split_list = self.split_list(self.__video_list, self.__num_of_process)
        video_name_split_list = [[f"Process : {i} Video : {j}" for j in range(len(video_list))]
                                 for i, video_list in enumerate(video_split_list)]

        handle_videos_per_process = len(self.__video_list) / self.__num_of_process
        print(f"number of video : {len(self.__video_list)}")
        print(f"number of process : {self.__num_of_process}")
        print(f"each process handle : {int(handle_videos_per_process)} Videos")
        if isinstance(handle_videos_per_process, float):
            print(f"last process handle : {len(video_split_list[-1])} video")
        return video_split_list, video_name_split_list

    def split_list(self, input_list, num_splits):
        split_length = len(input_list) // num_splits
        remainder = len(input_list) % num_splits
        splits = []
        start = 0

        for i in range(num_splits):
            end = start + split_length + (1 if i < remainder else 0)
            splits.append(input_list[start:end])
            start = end

        return splits

    def start(self):
        for i in self.video_agent_list:
            i.start()


if __name__ == "__main__":
    video = ["test.mp4","test.mp4","test.mp4","test.mp4","test.mp4","test.mp4"]
    number_of_process = 3

    a = Controller(video, number_of_process)
    a.start()

Hi everyone,

I am a beginner in Python multithreading and multiprocessing. I am trying to build a multi-stream system using OpenCV. However, when I run the code or close one of the videos, I get the following

errors:

Error traceback:

Exception in thread Thread-2 (__show_video):
Traceback (most recent call last):
  File "C:\Python 3.10.6\lib\threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "C:\Python 3.10.6\lib\threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "D:\Desktop\Opencv_multidisplay\main.py", line 28, in __show_video
    frame = self.__queue.get()
  File "C:\Python 3.10.6\lib\multiprocessing\queues.py", line 103, in get
    res = self._recv_bytes()
  File "C:\Python 3.10.6\lib\multiprocessing\connection.py", line 217, in recv_bytes
    self._check_closed()
  File "C:\Python 3.10.6\lib\multiprocessing\connection.py", line 141, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed

Can someone help me understand and resolve these issues? Or is there a better method to write the code?


Solution

  • The error occurred because the thread tried to execute show_video before read_video had run, so the queue had no reference in show_video.

    Solution:
    Before processing show_video, make sure the thread has completed running read_video.