I am working on a project to stream an H.264 video file using RabbitMQ (AMQP protocol) and display it in a web application. The setup involves capturing video frames, encoding them, sending them to RabbitMQ, and then consuming and decoding them on the web application side using Flask and Flask-SocketIO.
However, I am encountering performance issues with the publishing and subscribing rates in RabbitMQ. I cannot seem to achieve more than 10 messages per second. This is not sufficient for smooth video streaming. I need help to diagnose and resolve these performance bottlenecks.
Here is my code:
# RabbitMQ setup
RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
KEY = f'DRONE_{CAM_LOCATION}'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'
# Path to the H.264 video file
VIDEO_FILE_PATH = 'videos/FPV.h264'
# Configure logging
logging.basicConfig(level=logging.INFO)
@contextmanager
def rabbitmq_channel(host):
"""Context manager to handle RabbitMQ channel setup and teardown."""
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()
try:
yield channel
finally:
connection.close()
def initialize_rabbitmq(channel):
"""Initialize RabbitMQ exchange and queue, and bind them together."""
channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
channel.queue_declare(queue=QUEUE_NAME)
channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=KEY)
def send_frame(channel, frame):
"""Encode the video frame using FFmpeg and send it to RabbitMQ."""
ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
cmd = [
ffmpeg_path,
'-f', 'rawvideo',
'-pix_fmt', 'rgb24',
'-s', '{}x{}'.format(frame.shape[1], frame.shape[0]),
'-i', 'pipe:0',
'-f', 'h264',
'-vcodec', 'libx264',
'-pix_fmt', 'yuv420p',
'-preset', 'ultrafast',
'pipe:1'
]
start_time = time.time()
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = process.communicate(input=frame.tobytes())
encoding_time = time.time() - start_time
if process.returncode != 0:
logging.error("ffmpeg error: %s", err.decode())
raise RuntimeError("ffmpeg error")
frame_size = len(out)
logging.info("Sending frame with shape: %s, size: %d bytes", frame.shape, frame_size)
timestamp = time.time()
formatted_timestamp = datetime.fromtimestamp(timestamp).strftime('%H:%M:%S.%f')
logging.info(f"Timestamp: {timestamp}")
logging.info(f"Formatted Timestamp: {formatted_timestamp[:-3]}")
timestamp_bytes = struct.pack('d', timestamp)
message_body = timestamp_bytes + out
channel.basic_publish(exchange=EXCHANGE, routing_key=KEY, body=message_body)
logging.info(f"Encoding time: {encoding_time:.4f} seconds")
def capture_video(channel):
"""Read video from the file, encode frames, and send them to RabbitMQ."""
if not os.path.exists(VIDEO_FILE_PATH):
logging.error("Error: Video file does not exist.")
return
cap = cv2.VideoCapture(VIDEO_FILE_PATH)
if not cap.isOpened():
logging.error("Error: Could not open video file.")
return
try:
while True:
start_time = time.time()
ret, frame = cap.read()
read_time = time.time() - start_time
if not ret:
break
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_rgb = np.ascontiguousarray(frame_rgb) # Ensure the frame is contiguous
send_frame(channel, frame_rgb)
cv2.imshow('Video', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
logging.info(f"Read time: {read_time:.4f} seconds")
finally:
cap.release()
cv2.destroyAllWindows()
app = Flask(__name__)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*")
RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'
def initialize_rabbitmq():
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
channel.queue_declare(queue=QUEUE_NAME)
channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=f'DRONE_{CAM_LOCATION}')
return connection, channel
def decode_frame(frame_data):
# FFmpeg command to decode H.264 frame data
ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
cmd = [
ffmpeg_path,
'-f', 'h264',
'-i', 'pipe:0',
'-pix_fmt', 'bgr24',
'-vcodec', 'rawvideo',
'-an', '-sn',
'-f', 'rawvideo',
'pipe:1'
]
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
start_time = time.time() # Start timing the decoding process
out, err = process.communicate(input=frame_data)
decoding_time = time.time() - start_time # Calculate decoding time
if process.returncode != 0:
print("ffmpeg error: ", err.decode())
return None
frame_size = (960, 1280, 3) # frame dimensions expected by the frontend
frame = np.frombuffer(out, np.uint8).reshape(frame_size)
print(f"Decoding time: {decoding_time:.4f} seconds")
return frame
def format_timestamp(ts):
dt = datetime.fromtimestamp(ts)
return dt.strftime('%H:%M:%S.%f')[:-3]
def rabbitmq_consumer():
connection, channel = initialize_rabbitmq()
for method_frame, properties, body in channel.consume(QUEUE_NAME):
message_receive_time = time.time() # Time when the message is received
# Extract the timestamp from the message body
timestamp_bytes = body[:8]
frame_data = body[8:]
publish_timestamp = struct.unpack('d', timestamp_bytes)[0]
print(f"Message Receive Time: {message_receive_time:.4f} ({format_timestamp(message_receive_time)})")
print(f"Publish Time: {publish_timestamp:.4f} ({format_timestamp(publish_timestamp)})")
frame = decode_frame(frame_data)
decode_time = time.time() - message_receive_time # Calculate decode time
if frame is not None:
_, buffer = cv2.imencode('.jpg', frame)
frame_data = buffer.tobytes()
socketio.emit('video_frame', {'frame': frame_data, 'timestamp': publish_timestamp}, namespace='/')
emit_time = time.time() # Time after emitting the frame
# Log the time taken to emit the frame and its size
rtt = emit_time - publish_timestamp # Calculate RTT from publish to emit
print(f"Current Time: {emit_time:.4f} ({format_timestamp(emit_time)})")
print(f"RTT: {rtt:.4f} seconds")
print(f"Emit time: {emit_time - message_receive_time:.4f} seconds, Frame size: {len(frame_data)} bytes")
channel.basic_ack(method_frame.delivery_tag)
@app.route('/')
def index():
return render_template('index.html')
@socketio.on('connect')
def handle_connect():
print('Client connected')
@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')
if __name__ == '__main__':
consumer_thread = threading.Thread(target=rabbitmq_consumer)
consumer_thread.daemon = True
consumer_thread.start()
socketio.run(app, host='0.0.0.0', port=5000)
How can I optimize the publishing and subscribing rates to handle a higher number of messages per second?
Any help or suggestions would be greatly appreciated!
I attempted to use threading and multiprocessing to handle multiple frames concurrently and I tried to optimize the frame decoding function to make it faster but with no success.
First i dont know so much about rabbitmq but i think it would be handle more then 10 Messages per Seconds.
You have some Design issues,
you Read the video file to rgb via cv2 and reencode it to h264. The file is already h264 encoded. Its just overhead. Use pyav to read Packet wise the file so you dont need reencode step when Sending.
you execute for each frame the whole ffmpeg proc for decoding as again in encoding step, use pyav to Feed the packages to the Decoder as an stream like Thingy.
Following this you remove the singel proc execution per frame. If you want to go with the Procs start it once an work with the Pipes.
But pyav is way more Developer friendly and give you more cool things as just Work with Pipes