rustrust-tokiolibrdkafka

How to utilize kafka batching for producer in Rust


I wrote a little Rust app that is sending messages to a kafka topic. Unfortunately, it is much slower than expected. It's was sending one message at a time with a latency of about 200ms. So I get out 5 messages per second.

I figured batching could help and found this link where it's state that is batching is handled out-of-the-box, so I configured queue.buffering.max.ms to be 500 - hoping to enable batching. But it just increased the time between two messages being send to about 540ms.

I looked in the examples of rust-rdkafka but I don't see what I'm missing.

I updated the "simple_producer" example to illustrate my issue:

use std::time::Duration;

use log::info;

use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use tokio::time::Instant;

#[macro_use]
extern crate env_var;

async fn produce(brokers: &str, topic_name: &str, kafka_user: &str, kafka_password: &str) {
    let producer: &FutureProducer = &ClientConfig::new()
        .set("bootstrap.servers", brokers.to_string())
        .set("security.protocol", "SASL_SSL")
        .set("sasl.mechanism", "PLAIN")
        .set("sasl.username", kafka_user.to_string())
        .set("sasl.password", kafka_password.to_string())
        .set("queue.buffering.max.ms", "500".to_string())
        .set("message.timeout.ms", "5000")
        .create()
        .expect("Producer creation error");

    // This loop is non blocking: all messages will be sent one after the other, without waiting
    // for the results.
    let futures = (0..5)
        .map(|i| async move {
            // The send operation on the topic returns a future, which will be
            // completed once the result or failure from Kafka is received.
            let delivery_status = producer
                .send(
                    FutureRecord::to(topic_name)
                        .payload(&format!("Message {}", i))
                        .key(&format!("Key {}", i)),
                    Duration::from_secs(0),
                )
                .await;

            // This will be executed when the result is received.
            info!("Delivery status for message {} received", i);
            delivery_status
        })
        .collect::<Vec<_>>();

    // This loop will wait until all delivery statuses have been received.
    
    for future in futures {
        let timer = Instant::now();
        let result = future.await;
        info!("Future completed after {:?}. Result: {:?}", timer.elapsed(), result);
    }
}

#[tokio::main]
async fn main() {
    env_logger::init();
    let kafka_servers = env_var!(required "KAFKA_SERVERS");
    let kafka_user = env_var!(optional "KAFKA_USER", default: "$ConnectionString");
    let kafka_password = env_var!(required "KAFKA_PASSWORD");
    let kafka_topic = env_var!(required "KAFKA_TOPIC");

    produce(
        kafka_servers.as_str(),
        kafka_topic.as_str(),
        kafka_user.as_str(),
        kafka_password.as_str(),
    )
    .await;
}

Output:

[2022-09-26T13:53:50Z INFO  batch_test] Delivery status for message 0 received
[2022-09-26T13:53:50Z INFO  batch_test] Future completed at 922.2421ms. Result: Ok((0, 414835))
[2022-09-26T13:53:50Z INFO  batch_test] Delivery status for message 1 received
[2022-09-26T13:53:50Z INFO  batch_test] Future completed at 543.6226ms. Result: Ok((0, 414837))
[2022-09-26T13:53:51Z INFO  batch_test] Delivery status for message 2 received
[2022-09-26T13:53:51Z INFO  batch_test] Future completed at 549.2269ms. Result: Ok((0, 414838))
[2022-09-26T13:53:51Z INFO  batch_test] Delivery status for message 3 received
[2022-09-26T13:53:51Z INFO  batch_test] Future completed at 548.8964ms. Result: Ok((0, 414839))
[2022-09-26T13:53:52Z INFO  batch_test] Delivery status for message 4 received
[2022-09-26T13:53:52Z INFO  batch_test] Future completed at 538.4841ms. Result: Ok((0, 414840))

Solution

  • Note that the requests are not sent when you think:

    // This loop is non blocking: all messages will be sent one after the other, without waiting
    // for the results.
    let futures = (0..5)
        .map(|i| async move {
            // The send operation on the topic returns a future, which will be
            // completed once the result or failure from Kafka is received.
            let delivery_status = producer
                .send(
                    FutureRecord::to(topic_name)
                        .payload(&format!("Message {}", i))
                        .key(&format!("Key {}", i)),
                    Duration::from_secs(0),
                )
                .await;
    
            // This will be executed when the result is received.
            info!("Delivery status for message {} received", i);
            delivery_status
        })
        .collect::<Vec<_>>();
    

    At this point, no request has been sent yet!

    // This loop will wait until all delivery statuses have been received.
    for future in futures {
        let timer = Instant::now();
        let result = future.await;
    

    This will queue one request, wait for queue.buffering.max.ms in case another request is queued in the mean time (but none are), then send the request and wait for the response.

        info!("Future completed after {:?}. Result: {:?}", timer.elapsed(), result);
    }
    

    You need to spawn a task for each request so that they will be queued immediately:

    let futures = (0..5)
        .map(|i| tokio::spawn (async move {
            // The send operation on the topic returns a future, which will be
            // completed once the result or failure from Kafka is received.
            let delivery_status = producer
                .send(
                    FutureRecord::to(topic_name)
                        .payload(&format!("Message {}", i))
                        .key(&format!("Key {}", i)),
                    Duration::from_secs(0),
                )
                .await;
    
            // This will be executed when the result is received.
            info!("Delivery status for message {} received", i);
            delivery_status
        }))
        .collect::<Vec<_>>();
    

    At which point, you probably no longer need to tinker with queue.buffering.max.ms.

    Note

    As pointed out by @sbergeron there is a race condition in the above code that may cause messages to be sent out of order. This can be fixed by using send_result to queue the messages synchronously and only spawn the part that waits for delivery confirmation:

    let futures = (0..5)
        .map(|i| {
            let fut = producer
                .send_result (
                    FutureRecord::to (topic_name)
                        .payload (&format!("Message {}", i))
                        .key (&format!("Key {}", i)),
                    Duration::from_secs (0),
                )
                .unwrap();
            tokio::spawn (async move {
                // The send operation on the topic returns a future, which will be
                // completed once the result or failure from Kafka is received.
                let delivery_status = fut.await;
    
                // This will be executed when the result is received.
                info!("Delivery status for message {} received", i);
                delivery_status
            })
        })
        .collect::<Vec<_>>();
    }