I am working on a project to stream an H.264 video file using RabbitMQ (AMQP protocol) and display it in a web application. The setup involves capturing video frames, encoding them, sending them to RabbitMQ, and then consuming and decoding them on the web application side using Flask and Flask-SocketIO.
However, I am encountering performance issues with the publishing and subscribing rates in RabbitMQ. I cannot seem to achieve more than 10 messages per second. This is not sufficient for smooth video streaming. I need help to diagnose and resolve these performance bottlenecks.
Here is my code:
# RabbitMQ setup
RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
KEY = f'DRONE_{CAM_LOCATION}'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'
# Path to the H.264 video file
VIDEO_FILE_PATH = 'videos/FPV.h264'
# Configure logging
logging.basicConfig(level=logging.INFO)
@contextmanager
def rabbitmq_channel(host):
"""Context manager to handle RabbitMQ channel setup and teardown."""
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()
try:
yield channel
finally:
connection.close()
def initialize_rabbitmq(channel):
"""Initialize RabbitMQ exchange and queue, and bind them together."""
channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
channel.queue_declare(queue=QUEUE_NAME)
channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=KEY)
def send_frame(channel, frame):
"""Encode the video frame using FFmpeg and send it to RabbitMQ."""
ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
cmd = [
ffmpeg_path,
'-f', 'rawvideo',
'-pix_fmt', 'rgb24',
'-s', '{}x{}'.format(frame.shape[1], frame.shape[0]),
'-i', 'pipe:0',
'-f', 'h264',
'-vcodec', 'libx264',
'-pix_fmt', 'yuv420p',
'-preset', 'ultrafast',
'pipe:1'
]
start_time = time.time()
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = process.communicate(input=frame.tobytes())
encoding_time = time.time() - start_time
if process.returncode != 0:
logging.error("ffmpeg error: %s", err.decode())
raise RuntimeError("ffmpeg error")
frame_size = len(out)
logging.info("Sending frame with shape: %s, size: %d bytes", frame.shape, frame_size)
timestamp = time.time()
formatted_timestamp = datetime.fromtimestamp(timestamp).strftime('%H:%M:%S.%f')
logging.info(f"Timestamp: {timestamp}")
logging.info(f"Formatted Timestamp: {formatted_timestamp[:-3]}")
timestamp_bytes = struct.pack('d', timestamp)
message_body = timestamp_bytes + out
channel.basic_publish(exchange=EXCHANGE, routing_key=KEY, body=message_body)
logging.info(f"Encoding time: {encoding_time:.4f} seconds")
def capture_video(channel):
"""Read video from the file, encode frames, and send them to RabbitMQ."""
if not os.path.exists(VIDEO_FILE_PATH):
logging.error("Error: Video file does not exist.")
return
cap = cv2.VideoCapture(VIDEO_FILE_PATH)
if not cap.isOpened():
logging.error("Error: Could not open video file.")
return
try:
while True:
start_time = time.time()
ret, frame = cap.read()
read_time = time.time() - start_time
if not ret:
break
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_rgb = np.ascontiguousarray(frame_rgb) # Ensure the frame is contiguous
send_frame(channel, frame_rgb)
cv2.imshow('Video', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
logging.info(f"Read time: {read_time:.4f} seconds")
finally:
cap.release()
cv2.destroyAllWindows()
app = Flask(__name__)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*")
RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'
def initialize_rabbitmq():
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
channel.queue_declare(queue=QUEUE_NAME)
channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=f'DRONE_{CAM_LOCATION}')
return connection, channel
def decode_frame(frame_data):
# FFmpeg command to decode H.264 frame data
ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
cmd = [
ffmpeg_path,
'-f', 'h264',
'-i', 'pipe:0',
'-pix_fmt', 'bgr24',
'-vcodec', 'rawvideo',
'-an', '-sn',
'-f', 'rawvideo',
'pipe:1'
]
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
start_time = time.time() # Start timing the decoding process
out, err = process.communicate(input=frame_data)
decoding_time = time.time() - start_time # Calculate decoding time
if process.returncode != 0:
print("ffmpeg error: ", err.decode())
return None
frame_size = (960, 1280, 3) # frame dimensions expected by the frontend
frame = np.frombuffer(out, np.uint8).reshape(frame_size)
print(f"Decoding time: {decoding_time:.4f} seconds")
return frame
def format_timestamp(ts):
dt = datetime.fromtimestamp(ts)
return dt.strftime('%H:%M:%S.%f')[:-3]
def rabbitmq_consumer():
connection, channel = initialize_rabbitmq()
for method_frame, properties, body in channel.consume(QUEUE_NAME):
message_receive_time = time.time() # Time when the message is received
# Extract the timestamp from the message body
timestamp_bytes = body[:8]
frame_data = body[8:]
publish_timestamp = struct.unpack('d', timestamp_bytes)[0]
print(f"Message Receive Time: {message_receive_time:.4f} ({format_timestamp(message_receive_time)})")
print(f"Publish Time: {publish_timestamp:.4f} ({format_timestamp(publish_timestamp)})")
frame = decode_frame(frame_data)
decode_time = time.time() - message_receive_time # Calculate decode time
if frame is not None:
_, buffer = cv2.imencode('.jpg', frame)
frame_data = buffer.tobytes()
socketio.emit('video_frame', {'frame': frame_data, 'timestamp': publish_timestamp}, namespace='/')
emit_time = time.time() # Time after emitting the frame
# Log the time taken to emit the frame and its size
rtt = emit_time - publish_timestamp # Calculate RTT from publish to emit
print(f"Current Time: {emit_time:.4f} ({format_timestamp(emit_time)})")
print(f"RTT: {rtt:.4f} seconds")
print(f"Emit time: {emit_time - message_receive_time:.4f} seconds, Frame size: {len(frame_data)} bytes")
channel.basic_ack(method_frame.delivery_tag)
@app.route('/')
def index():
return render_template('index.html')
@socketio.on('connect')
def handle_connect():
print('Client connected')
@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')
if __name__ == '__main__':
consumer_thread = threading.Thread(target=rabbitmq_consumer)
consumer_thread.daemon = True
consumer_thread.start()
socketio.run(app, host='0.0.0.0', port=5000)
How can I optimize the publishing and subscribing rates to handle a higher number of messages per second?
Any help or suggestions would be greatly appreciated!
I attempted to use threading and multiprocessing to handle multiple frames concurrently and I tried to optimize the frame decoding function to make it faster but with no success.
I figured out that the message publishing messages low rate was because the ffmpeg encoding was taking to long.