pythonmultithreadingffmpegrtspmultiple-input

Multiple RTSPs receive method


I am trying to code a project which I have at least 20 rtsp CCTV URL going to access at same time.

I tried to use ffmpeg to reach out my goal via multiple input method. However, there is a problem.

ffmpeg -i URL_1 -i URL_2 -

The command above is the example I tried. I wish that I can access two rtsps via ffmpeg and output them into two different queues for the future use. If I use this command and read bytes after that, I can not distinguish which bytes belongs to which input rtsp.

Is there any other way which I can access more rtsp at same time?

Edit: Adding Code

import ffmpeg
import numpy as np
import subprocess as sp
import threading
import queue
import time
class CCTVReader(threading.Thread):
    def __init__(self, q, in_stream, name):
        super().__init__()
        self.name = name
        self.q = q
        self.command = ["ffmpeg",
                        "-c:v", "h264",     # Tell ffmpeg that input stream codec is h264
                        "-i", in_stream,    # Read stream from file vid.264
                        "-c:v", "copy",     # Tell ffmpeg to copy the video stream as is (without decding and encoding)
                        "-an", "-sn",       # No audio an no subtites
                        "-f", "h264",       # Define pipe format to be h264
                        "-"]                # Output is a pipe

    def run(self):
        pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**3)  # Don't use shell=True (you don't need to execute the command through the shell).

        # while True:
        for i in range(1024*10):  # Read up to 100KBytes for testing
            data = pipe.stdout.read(1024)  # Read data from pip in chunks of 1024 bytes
            self.q.put(data)

            # Break loop if less than 1024 bytes read (not going to work with CCTV, but works with input file)
            if len(data) < 1024:
                break

        try:
            pipe.wait(timeout=1)  # Wait for subprocess to finish (with timeout of 1 second).
        except sp.TimeoutExpired:
            pipe.kill()           # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).

        if self.q.empty():
            print("There is a problem (queue is empty)!!!")
        else:
            # Write data from queue to file vid_from_queue.264 (for testingg)
            with open(self.name+".h264", "wb") as queue_save_file:
                while not self.q.empty():
                    queue_save_file.write(self.q.get())


# Build synthetic video, for testing begins:
################################################
# width, height = 1280, 720
# in_stream = "vid.264"
# sp.Popen("ffmpeg -y -f lavfi -i testsrc=size=1280x720:duration=5:rate=1 -c:v libx264 -crf 23 -pix_fmt yuv420p " + in_stream).wait()
################################################

#Use public RTSP Streaming for testing
readers = {}
queues = {}
dict = {
        "name1":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name2":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name3":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name4":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name5":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name6":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name7":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name8":{"ip":"rtsp://xxx.xxx.xxx.xxx/",
        "name9":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name10":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name11":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name12":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name13":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name14":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        "name15":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
        }

for key in dict:
    ip = dict[key]["ip"]
    name = key
    q = queue.Queue()
    queues[name] = q
    cctv_reader = CCTVReader(q, ip, name)
    readers[name] = cctv_reader
    cctv_reader.start()
    cctv_reader.join()

Solution

  • You already have all the infrastructure in your previous question.

    All you need to do is creating multiple objects of your 'CCTVReader' class.

    Here is a working code sample, reading two streams:

    import numpy as np
    import subprocess as sp
    import threading
    import queue
    import time
    
    class CCTVReader(threading.Thread):
        def __init__(self, q, in_stream, chunk_size):
            super().__init__()
            self.q = q
            self.chunk_size = chunk_size
            self.command = ["ffmpeg",
                            "-c:v", "h264",     # Tell FFmpeg that input stream codec is h264
                            "-i", in_stream,    # Read stream from file vid.264
                            "-c:v", "copy",     # Tell FFmpeg to copy the video stream as is (without decoding and encoding)
                            "-an", "-sn",       # No audio an no subtitles
                            "-f", "h264",       # Define pipe format to be h264
                            "-"]                # Output is a pipe
    
        def run(self):
            pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**3)  # Don't use shell=True (you don't need to execute the command through the shell).
    
            # while True:
            for i in range(100):  # Read up to 100KBytes for testing
                data = pipe.stdout.read(self.chunk_size)  # Read data from pip in chunks of self.chunk_size bytes
                self.q.put(data)
    
                # Break loop if less than self.chunk_size bytes read (not going to work with CCTV, but works with input file)
                if len(data) < self.chunk_size:
                    break
    
            try:
                pipe.wait(timeout=1)  # Wait for subprocess to finish (with timeout of 1 second).
            except sp.TimeoutExpired:
                pipe.kill()           # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).
    
    
    
    #in_stream = "rtsp://xxx.xxx.xxx.xxx:xxx/Streaming/Channels/101?transportmode=multicast",
    
    #Use public RTSP Streaming for testing
    in_stream1 = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"
    
    in_stream2 = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"
    
    
    q1 = queue.Queue()
    q2 = queue.Queue()
    
    cctv_reader1 = CCTVReader(q1, in_stream1, 1024)  # First stream 
    cctv_reader2 = CCTVReader(q2, in_stream2, 2048)  # Second stream
    
    cctv_reader1.start()
    time.sleep(5) # Wait 5 seconds (for testing).
    cctv_reader2.start()
    
    cctv_reader1.join()
    cctv_reader2.join()
    
    if q1.empty():
        print("There is a problem (q1 is empty)!!!")
    else:
        # Write data from queue to file vid_from_queue1.264 (for testing)
        with open("vid_from_q1.264", "wb") as queue_save_file:
            while not q1.empty():
                queue_save_file.write(q1.get())
    
    if q2.empty():
        print("There is a problem (q2 is empty)!!!")
    else:
        # Write data from queue to file vid_from_queue2.264 (for testing)
        with open("vid_from_q2.264", "wb") as queue_save_file:
            while not q2.empty():
                queue_save_file.write(q2.get())
    

    Update:

    I don't think you can use a syntax like ffmpeg -i URL_1 -i URL_2 -...

    The code you have posted has few issues:

    1. The cctv_reader.join() must be in a second loop, because it waits for the thread to end, and blocks execution.
    2. Saving the data to files should be after all threads ended (it's just for testing).
      In case you want to record the data, try saving each chunk right after grabbing it.
    3. Reduce size of bufsize=1024**3, try bufsize=1024**2*100.
      In case OS actually allocate buffer size of 1GB per process, you might get out of memory.

    Note: Python multi-threading performance is not so good, check the CPU load.

    Here is a code sample (reading from files):

    import numpy as np
    import subprocess as sp
    import threading
    import queue
    
    class CCTVReader(threading.Thread):
        def __init__(self, q, in_stream, chunk_size):
            super().__init__()
            self.q = q
            self.chunk_size = chunk_size
            self.command = ["ffmpeg",
                            "-c:v", "h264",     # Tell FFmpeg that input stream codec is h264
                            "-i", in_stream,    # Read stream from file vid.264
                            "-c:v", "copy",     # Tell FFmpeg to copy the video stream as is (without decoding and encoding)
                            "-an", "-sn",       # No audio an no subtitles
                            "-f", "h264",       # Define pipe format to be h264
                            "-"]                # Output is a pipe
    
        def run(self):
            pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**2*100)
    
            # while True:
            for i in range(100):  # Read up to 100KBytes for testing
                data = pipe.stdout.read(self.chunk_size)  # Read data from pip in chunks of self.chunk_size bytes
                self.q.put(data)
    
                # Break loop if less than self.chunk_size bytes read (not going to work with CCTV, but works with input file)
                if len(data) < self.chunk_size:
                    break
    
            try:
                pipe.wait(timeout=1)  # Wait for subprocess to finish (with timeout of 1 second).
            except sp.TimeoutExpired:
                pipe.kill()           # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).
    
    
        def save_q_to_file(self, vid_file_name):
            # Write data from queue to file (for testing)
            if self.q.empty():
                print("There is a problem (q is empty)!!!")
            else:            
                with open(vid_file_name, "wb") as queue_save_file:
                    while not self.q.empty():
                        queue_save_file.write(self.q.get())
    
    #in_stream = "rtsp://xxx.xxx.xxx.xxx:xxx/Streaming/Channels/101?transportmode=multicast",
    
    #Use public RTSP Streaming for testing
    #in_stream = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"
    
    #Use public RTSP Streaming for testing
    readers = {}
    queues = {}
    
    # Read from file (for tesing)
    dict = {
            "name1":{ "ip":"vid1.264",  "fname":"vid_from_q1.264"},
            "name2":{ "ip":"vid2.264",  "fname":"vid_from_q2.264"},
            "name3":{ "ip":"vid3.264",  "fname":"vid_from_q3.264"},
            "name4":{ "ip":"vid4.264",  "fname":"vid_from_q4.264"},
            "name5":{ "ip":"vid5.264",  "fname":"vid_from_q5.264"},
            "name6":{ "ip":"vid6.264",  "fname":"vid_from_q6.264"},
            "name7":{ "ip":"vid7.264",  "fname":"vid_from_q7.264"},
            "name8":{ "ip":"vid8.264",  "fname":"vid_from_q8.264"},
            "name9":{ "ip":"vid9.264",  "fname":"vid_from_q9.264"},
            "name10":{"ip":"vid10.264", "fname":"vid_from_q10.264"},
            "name11":{"ip":"vid11.264", "fname":"vid_from_q11.264"},
            "name12":{"ip":"vid12.264", "fname":"vid_from_q12.264"},
            "name13":{"ip":"vid13.264", "fname":"vid_from_q13.264"},
            "name14":{"ip":"vid14.264", "fname":"vid_from_q14.264"},
            "name15":{"ip":"vid15.264", "fname":"vid_from_q15.264"}
            }
    
    for key in dict:
        ip = dict[key]["ip"]
        name = key
        q = queue.Queue()
        queues[name] = q
        cctv_reader = CCTVReader(q, ip, 8192)
        readers[name] = cctv_reader
        cctv_reader.start()
    
    # Wait for all threads to end
    for key in readers:
        readers[key].join()
    
    # Save data for testing
    for key in readers:
        fine_name = dict[key]["fname"]
        readers[key].save_q_to_file(fine_name)