pythonffmpegrabbitmqvideo-streamingh.264

Issues with Publishing and Subscribing Rates for H.264 Video Streaming over RabbitMQ


I am working on a project to stream an H.264 video file using RabbitMQ (AMQP protocol) and display it in a web application. The setup involves capturing video frames, encoding them, sending them to RabbitMQ, and then consuming and decoding them on the web application side using Flask and Flask-SocketIO.

However, I am encountering performance issues with the publishing and subscribing rates in RabbitMQ. I cannot seem to achieve more than 10 messages per second. This is not sufficient for smooth video streaming. I need help to diagnose and resolve these performance bottlenecks.

Here is my code:

# RabbitMQ setup
RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
KEY = f'DRONE_{CAM_LOCATION}'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'

# Path to the H.264 video file
VIDEO_FILE_PATH = 'videos/FPV.h264'

# Configure logging
logging.basicConfig(level=logging.INFO)

@contextmanager
def rabbitmq_channel(host):
    """Context manager to handle RabbitMQ channel setup and teardown."""
    connection = pika.BlockingConnection(pika.ConnectionParameters(host))
    channel = connection.channel()
    try:
        yield channel
    finally:
        connection.close()

def initialize_rabbitmq(channel):
    """Initialize RabbitMQ exchange and queue, and bind them together."""
    channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
    channel.queue_declare(queue=QUEUE_NAME)
    channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=KEY)

def send_frame(channel, frame):
    """Encode the video frame using FFmpeg and send it to RabbitMQ."""
    ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
    cmd = [
        ffmpeg_path,
        '-f', 'rawvideo',
        '-pix_fmt', 'rgb24',
        '-s', '{}x{}'.format(frame.shape[1], frame.shape[0]),
        '-i', 'pipe:0',
        '-f', 'h264',
        '-vcodec', 'libx264',
        '-pix_fmt', 'yuv420p',
        '-preset', 'ultrafast',
        'pipe:1'
    ]
    
    start_time = time.time()
    process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = process.communicate(input=frame.tobytes())
    encoding_time = time.time() - start_time
    
    if process.returncode != 0:
        logging.error("ffmpeg error: %s", err.decode())
        raise RuntimeError("ffmpeg error")
    
    frame_size = len(out)
    logging.info("Sending frame with shape: %s, size: %d bytes", frame.shape, frame_size)
    timestamp = time.time()
    formatted_timestamp = datetime.fromtimestamp(timestamp).strftime('%H:%M:%S.%f')
    logging.info(f"Timestamp: {timestamp}") 
    logging.info(f"Formatted Timestamp: {formatted_timestamp[:-3]}")
    timestamp_bytes = struct.pack('d', timestamp)
    message_body = timestamp_bytes + out
    channel.basic_publish(exchange=EXCHANGE, routing_key=KEY, body=message_body)
    logging.info(f"Encoding time: {encoding_time:.4f} seconds")

def capture_video(channel):
    """Read video from the file, encode frames, and send them to RabbitMQ."""
    if not os.path.exists(VIDEO_FILE_PATH):
        logging.error("Error: Video file does not exist.")
        return
    cap = cv2.VideoCapture(VIDEO_FILE_PATH)
    if not cap.isOpened():
        logging.error("Error: Could not open video file.")
        return
    try:
        while True:
            start_time = time.time()
            ret, frame = cap.read()
            read_time = time.time() - start_time
            if not ret:
                break
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_rgb = np.ascontiguousarray(frame_rgb) # Ensure the frame is contiguous
            send_frame(channel, frame_rgb)
            cv2.imshow('Video', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            logging.info(f"Read time: {read_time:.4f} seconds")
    finally:
        cap.release()
        cv2.destroyAllWindows()
app = Flask(__name__)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*")

RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'

def initialize_rabbitmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
    channel = connection.channel()
    channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
    channel.queue_declare(queue=QUEUE_NAME)
    channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=f'DRONE_{CAM_LOCATION}')
    return connection, channel

def decode_frame(frame_data):
    # FFmpeg command to decode H.264 frame data
    ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
    cmd = [
        ffmpeg_path,
        '-f', 'h264',
        '-i', 'pipe:0',
        '-pix_fmt', 'bgr24',
        '-vcodec', 'rawvideo',
        '-an', '-sn',
        '-f', 'rawvideo',
        'pipe:1'
    ]
    process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    start_time = time.time()  # Start timing the decoding process
    out, err = process.communicate(input=frame_data)
    decoding_time = time.time() - start_time  # Calculate decoding time
    
    if process.returncode != 0:
        print("ffmpeg error: ", err.decode())
        return None
    frame_size = (960, 1280, 3)  # frame dimensions expected by the frontend
    frame = np.frombuffer(out, np.uint8).reshape(frame_size)
    print(f"Decoding time: {decoding_time:.4f} seconds")
    return frame

def format_timestamp(ts):
    dt = datetime.fromtimestamp(ts)
    return dt.strftime('%H:%M:%S.%f')[:-3]

def rabbitmq_consumer():
    connection, channel = initialize_rabbitmq()
    for method_frame, properties, body in channel.consume(QUEUE_NAME):
        message_receive_time = time.time()  # Time when the message is received

        # Extract the timestamp from the message body
        timestamp_bytes = body[:8]
        frame_data = body[8:]
        publish_timestamp = struct.unpack('d', timestamp_bytes)[0]

        print(f"Message Receive Time: {message_receive_time:.4f} ({format_timestamp(message_receive_time)})")
        print(f"Publish Time: {publish_timestamp:.4f} ({format_timestamp(publish_timestamp)})")

        frame = decode_frame(frame_data)
        decode_time = time.time() - message_receive_time  # Calculate decode time

        if frame is not None:
            _, buffer = cv2.imencode('.jpg', frame)
            frame_data = buffer.tobytes()
            socketio.emit('video_frame', {'frame': frame_data, 'timestamp': publish_timestamp}, namespace='/')
            emit_time = time.time()  # Time after emitting the frame

            # Log the time taken to emit the frame and its size
            rtt = emit_time - publish_timestamp  # Calculate RTT from publish to emit
            print(f"Current Time: {emit_time:.4f} ({format_timestamp(emit_time)})")
            print(f"RTT: {rtt:.4f} seconds")
            print(f"Emit time: {emit_time - message_receive_time:.4f} seconds, Frame size: {len(frame_data)} bytes")
        channel.basic_ack(method_frame.delivery_tag)

@app.route('/')
def index():
    return render_template('index.html')

@socketio.on('connect')
def handle_connect():
    print('Client connected')

@socketio.on('disconnect')
def handle_disconnect():
    print('Client disconnected')

if __name__ == '__main__':
    consumer_thread = threading.Thread(target=rabbitmq_consumer)
    consumer_thread.daemon = True
    consumer_thread.start()
    socketio.run(app, host='0.0.0.0', port=5000)

How can I optimize the publishing and subscribing rates to handle a higher number of messages per second?

Any help or suggestions would be greatly appreciated!

I attempted to use threading and multiprocessing to handle multiple frames concurrently and I tried to optimize the frame decoding function to make it faster but with no success.


Solution

  • First i dont know so much about rabbitmq but i think it would be handle more then 10 Messages per Seconds.

    You have some Design issues,

    1. you Read the video file to rgb via cv2 and reencode it to h264. The file is already h264 encoded. Its just overhead. Use pyav to read Packet wise the file so you dont need reencode step when Sending.

    2. you execute for each frame the whole ffmpeg proc for decoding as again in encoding step, use pyav to Feed the packages to the Decoder as an stream like Thingy.

    Following this you remove the singel proc execution per frame. If you want to go with the Procs start it once an work with the Pipes.

    But pyav is way more Developer friendly and give you more cool things as just Work with Pipes