pythonffmpegsubprocessffprobeffmpeg-python

How to gracefully terminate ffmpeg process alongside with ffprobe process?


I was able to terminate ffmpeg process gracefully when it's the only ongoing process. Now I also have ffprobe process alongside with ffmpeg process that tracks the progress of the ffmpeg process. It throws the following exception when I try to cancel the process. I tried to put the statements inside try & except blocks but I don't think it's a good solution. What's the proper way to achieve this job?

P.S. The code might be a bit confusing, but I tried to create a small executable form out of my actual code, sorry about that.

import subprocess as sp
import shlex
import json
import time
import threading

def start_ffmpeg_thread(audio_part, video_part, path):

    global ffmpeg_process
    if (ffmpeg_process is None) or ffmpeg_process.poll():
        data = sp.run(shlex.split(f'ffprobe -v error -select_streams v:0 -count_packets -show_entries stream=nb_read_packets -of csv=p=0 -of json "{video_part}"'), stdout=sp.PIPE).stdout
        dict = json.loads(data)
        tot_n_frames = float(dict['streams'][0]['nb_read_packets'])

        ffmpeg_process = sp.Popen(shlex.split(f'ffmpeg -y -loglevel error -i "{video_part}" -i "{audio_part}" -progress pipe:1 "{path}"'), stdout=sp.PIPE, stdin=sp.PIPE)

        q = [0]

        ffmpeg_progress_reader_thread = threading.Thread(target=ffmpeg_progress_reader, args=(ffmpeg_process, q))
        ffmpeg_progress_reader_thread.start()

        while True:
            if ffmpeg_process.poll() is not None:
                break

            n_frame = q[0]
            progress_percent = (n_frame/tot_n_frames)*100
            print(f"Progress: [%] {progress_percent}", end="\r")
        ffmpeg_progress_reader_thread.join()

def ffmpeg_progress_reader(procs, q):

    while True:
        if procs.poll() is not None:
            break

        progress_text = procs.stdout.readline()
        progress_text = progress_text.decode("utf-8")
        if progress_text.startswith("frame="):
            frame = int(progress_text.partition('=')[-1])
            q[0] = frame

def cancel_ffmpeg():

    time.sleep(10)
    global ffmpeg_process
    if (ffmpeg_process is not None) and (ffmpeg_process.poll() is None):
            ffmpeg_process.stdin.write('q'.encode("GBK"))
            ffmpeg_process.communicate()
            ffmpeg_process.wait()
            ffmpeg_process = None


ffmpeg_process = None

threading.Thread(target=cancel_ffmpeg).start()
start_ffmpeg_thread(<<AUDIO_FILE_FULL_PATH>>, <<VIDEO_FILE_FULL_PATH>>, <<OUTPUT_FULL_PATH>>)
Exception in thread Thread-2 (ffmpeg_progress_reader):
Traceback (most recent call last):
  File "D:\Python311\Lib\threading.py", line 1038, in _bootstrap_inner
    self.run()9.354796147248976
  File "D:\Python311\Lib\threading.py", line 975, in run
    self._target(*self._args, **self._kwargs)
  File "d:\Python Projects\main.py", line 82, in ffmpeg_progress_reader
    progress_text = procs.stdout.readline()
                    ^^^^^^^^^^^^^^^^^^^^^^^
ValueError: PyMemoryView_FromBuffer(): info->buf must not be NULL
Traceback (most recent call last):
  File "d:\Python Projects\main.py", line 101, in <module>
    start_ffmpeg_thread("aud.mp3", "vid.mp4", "output.mp4")
  File "d:\Python Projects\main.py", line 69, in start_ffmpeg_thread
    if ffmpeg_process.poll() is not None:
       ^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'poll'

Solution

  • The exception occurs because FFmpeg is closed during progress_text = procs.stdout.readline().

    Since readline() is "blocking" command, there is no option to avoid the exception.
    Using try & except is a valid solution in this case.


    In case we want to avoid the exception, we have to avoid closing FFmpeg while readline() is waiting for data.
    The "price" is that closing is not going to respond immediately, but waits until readline() returns.

    For closing gracefully, we may use threading.Event() object.


    Code sample:

    import subprocess as sp
    import shlex
    import json
    import time
    import threading
    
    def start_ffmpeg_thread(audio_part, video_part, path):
    
        global ffmpeg_process
        if (ffmpeg_process is None) or ffmpeg_process.poll():
            data = sp.run(shlex.split(f'ffprobe -v error -select_streams v:0 -count_packets -show_entries stream=nb_read_packets -of csv=p=0 -of json "{video_part}"'), stdout=sp.PIPE).stdout
            dict = json.loads(data)
            tot_n_frames = float(dict['streams'][0]['nb_read_packets'])
    
            ffmpeg_process = sp.Popen(shlex.split(f'ffmpeg -y -loglevel error -i "{video_part}" -i "{audio_part}" -progress pipe:1 "{path}"'), stdout=sp.PIPE, stdin=sp.PIPE)
    
            q = [0]
    
            ffmpeg_progress_reader_thread = threading.Thread(target=ffmpeg_progress_reader, args=(ffmpeg_process, q))
            ffmpeg_progress_reader_thread.start()
    
            while True:
                if (ffmpeg_process is None) or (ffmpeg_process.poll() is not None):
                    break
    
                n_frame = q[0]
                progress_percent = (n_frame/tot_n_frames)*100
                print(f"Progress: [%] {progress_percent}", end="\r")
            ffmpeg_progress_reader_thread.join()
    
    
    def ffmpeg_progress_reader(procs, q):
        while True:
            if procs.poll() is not None:
                break
    
            progress_text = procs.stdout.readline()
            progress_text = progress_text.decode("utf-8")
            if progress_text.startswith("frame="):
                frame = int(progress_text.partition('=')[-1])
                q[0] = frame
    
            if not event.is_set():
                break  # Break the loop when event is clear
    
        event.set()  # Marks that the thread is finished.
    
    
    def cancel_ffmpeg():
        time.sleep(10)
    
        # Clear event - causes ffmpeg_progress_reader loop to break
        event.clear()
    
        # Wait for ffmpeg_progress_reader thread to end (the last command of ffmpeg_progress_reader is event.set()).
        event.wait(3)  # Wait with timeout of 3 seconds (just in case...).
    
        # Close FFmpeg only after ffmpeg_progress_reader is ended
        global ffmpeg_process
        if (ffmpeg_process is not None) and (ffmpeg_process.poll() is None):
                ffmpeg_process.stdin.write('q'.encode("GBK"))
                ffmpeg_process.communicate()
                ffmpeg_process.wait()
                ffmpeg_process = None
    
    
    event = threading.Event()
    event.set()  # Initialize event to "set" state.
    ffmpeg_process = None
    
    threading.Thread(target=cancel_ffmpeg).start()
    start_ffmpeg_thread('BigBuckBunny.mp4', 'BigBuckBunny.mp4', 'tmp0.mp4')