I'm working on real-time image processing using a Raspberry Pi camera. To optimize processing times, I'm trying to separate some image transformations and video encoding in a separate Process:
import time
from multiprocessing import Event as ProcessEvent
from multiprocessing import Process, shared_memory
import cv2
import numpy as np
from picamera2 import MappedArray, Picamera2
from picamera2.request import CompletedRequest
class VideoEncoder(Process):
def __init__(self):
super(VideoEncoder, self).__init__()
self.loop = ProcessEvent()
self.loop.set()
self.new_frame = ProcessEvent()
self.new_frame.clear()
# Doesn't work
self.video_writer = cv2.VideoWriter(
"my_very_long_gstreamer_pipeline",
fourcc=0,
fps=30,
frameSize=(960, 720),
isColor=True,
)
# # Works
# self.video_writer = cv2.VideoWriter(
# "filename.avi",
# cv2.VideoWriter_fourcc(*"MJPG"),
# fps=30,
# frameSize=(960, 720),
# )
data = np.zeros((1232, 1640, 3), dtype=np.uint8).flatten()
self.frame = shared_memory.SharedMemory(create=True, size=data.nbytes)
def run(self):
while self.loop.is_set():
if self.new_frame.wait():
self.process_frame()
self.new_frame.clear()
self.video_writer.release()
def write(self, frame: np.ndarray):
sha = np.ndarray(frame.shape, dtype=frame.dtype, buffer=self.frame.buf)
sha[:] = frame[:]
self.new_frame.set()
print("Requested new frame")
def process_frame(self):
frame = np.ndarray((1232, 1640, 3), dtype=np.uint8, buffer=self.frame.buf)
# Transform image in different ways...
# Draw some stuff on image...
# Takes some time...
frame = cv2.resize(frame, (960, 720))
print("Writing frame to video writer...")
self.video_writer.write(frame)
print(" > Written frame!")
def stop(self):
self.loop.clear()
self.new_frame.set()
encoder = VideoEncoder()
encoder.start()
def cam_callback(request: CompletedRequest):
with MappedArray(request, "main") as m:
# Do some image processing...
# Takes some time...
# Once finished, send file to video stream
encoder.write(m.array)
if __name__ == "__main__":
picam = Picamera2(1)
picam.configure(
picam.create_video_configuration(
main={"format": "RGB888", "size": (1640, 1232)}
)
)
picam.pre_callback = cam_callback
picam.start()
time.sleep(10)
picam.stop()
encoder.stop()
However, it works fine writing to video files, but fails when writing to a GStreamer pipeline.
Here's the result for:
self.video_writer.write(frame)
. Note that it works just fine outside of a Process and I don't get any GStreamer warnings or errors:Requested new frame
Writing frame to video writer...
> Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
Requested new frame
Requested new frame
Requested new frame
Requested new frame
Requested new frame
...
Requested new frame
Writing frame to video writer...
deprecated pixel format used, make sure you did set range correctly
Requested new frame
Requested new frame
> Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
> Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
> Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
> Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
...
I'm not really sure on how I could debug this. Any suggestions are appreciated!
I'm not completely sure why, but I had to put the VideoWriter
initializer in run()
to be sure to have it initialized in the correct process context, and that's it!
def run(self):
self.video_writer = cv2.VideoWriter(
"gstreamer_pipeline",
fourcc=0,
fps=self.input_framerate,
frameSize=(960, 720),
isColor=True,
)
while self.loop.is_set():
if self.new_frame.wait():
self.process_frame()
self.new_frame.clear()
self.video_writer.release()
self.frame.unlink()