import cv2
import threading, multiprocessing
class Video:
def __init__(self, video_path: str, window_name: str):
self.mutex = threading.Lock()
self.__VD = cv2.VideoCapture(video_path)
self.__queue = multiprocessing.Queue(maxsize=30)
self.start_switch = threading.Barrier(2)
self.close_switch = threading.Event()
self.__window_name = window_name
def __read_video(self):
self.start_switch.wait()
while not self.close_switch.is_set():
ret, frame = self.__VD.read()
if ret:
self.__queue.put(frame)
else:
break
self.__close()
return
def __show_video(self):
self.start_switch.wait()
while not self.close_switch.is_set():
if self.__queue.qsize() > 0:
frame = self.__queue.get()
if cv2.waitKey(1) & 0xff == ord('c'):
self.__VD.read()
break
cv2.imshow(f"{self.__window_name}", frame)
self.__close()
return
def __close(self):
if not self.close_switch.is_set():
self.close_switch.set()
self.__clear_queue()
def __clear_queue(self):
while self.__queue.qsize() > 0:
self.__queue.get()
print(f"{self.__window_name} closed")
return
def start(self):
print(f"{self.__window_name} start")
read_video = threading.Thread(target=self.__read_video)
show_video = threading.Thread(target=self.__show_video)
read_video.start()
show_video.start()
return
class VideoAgent:
def __init__(self, number_of_video: list, video_name: list, name: str):
self.__number_of_video = number_of_video
self.__video_name = video_name
self.__name = name
def start_video(self):
print(f"Video Agent {self.__name} start")
videos = [Video(video_path=i, window_name=f"{self.__video_name[idx]}") for idx, i in enumerate(self.__number_of_video)]
for i in videos:
i.start()
return
def start(self):
multiprocessing.Process(target=self.start_video).start()
return
class Controller:
def __init__(self, video_list: list, num_of_process: int):
self.__video_list = video_list
self.__num_of_process = num_of_process
self.__video_split_list, self.__video_name_split_list = self.__split_video()
self.video_agent_list = [
VideoAgent(number_of_video=self.__video_split_list[i], video_name=self.__video_name_split_list[i],
name=f"Agent {i}") for i in range(len(self.__video_split_list))]
def __split_video(self) -> tuple:
video_split_list = self.split_list(self.__video_list, self.__num_of_process)
video_name_split_list = [[f"Process : {i} Video : {j}" for j in range(len(video_list))]
for i, video_list in enumerate(video_split_list)]
handle_videos_per_process = len(self.__video_list) / self.__num_of_process
print(f"number of video : {len(self.__video_list)}")
print(f"number of process : {self.__num_of_process}")
print(f"each process handle : {int(handle_videos_per_process)} Videos")
if isinstance(handle_videos_per_process, float):
print(f"last process handle : {len(video_split_list[-1])} video")
return video_split_list, video_name_split_list
def split_list(self, input_list, num_splits):
split_length = len(input_list) // num_splits
remainder = len(input_list) % num_splits
splits = []
start = 0
for i in range(num_splits):
end = start + split_length + (1 if i < remainder else 0)
splits.append(input_list[start:end])
start = end
return splits
def start(self):
for i in self.video_agent_list:
i.start()
if __name__ == "__main__":
video = ["test.mp4","test.mp4","test.mp4","test.mp4","test.mp4","test.mp4"]
number_of_process = 3
a = Controller(video, number_of_process)
a.start()
Hi everyone,
I am a beginner in Python multithreading and multiprocessing. I am trying to build a multi-stream system using OpenCV. However, when I run the code or close one of the videos, I get the following
errors:
Error traceback: Exception in thread Thread-2 (__show_video): Traceback (most recent call last): File "C:\Python 3.10.6\lib\threading.py", line 1016, in _bootstrap_inner self.run() File "C:\Python 3.10.6\lib\threading.py", line 953, in run self._target(*self._args, **self._kwargs) File "D:\Desktop\Opencv_multidisplay\main.py", line 28, in __show_video frame = self.__queue.get() File "C:\Python 3.10.6\lib\multiprocessing\queues.py", line 103, in get res = self._recv_bytes() File "C:\Python 3.10.6\lib\multiprocessing\connection.py", line 217, in recv_bytes self._check_closed() File "C:\Python 3.10.6\lib\multiprocessing\connection.py", line 141, in _check_closed raise OSError("handle is closed") OSError: handle is closed
Can someone help me understand and resolve these issues? Or is there a better method to write the code?
The error occurred because the thread tried to execute show_video
before read_video
had run, so the queue
had no reference in show_video
.
Solution:
Before processing show_video
, make sure the thread has completed running read_video
.