librdkafka

rd_kafka_offsets_for_times() returns error -186 (Local: Invalid argument or configuration.)


I'm trying to get the offset (high watermark offset) for a partition using rd_kafka_offsets_for_times().

Here is a code snippet:

rd_kafka_topic_partition_t* pt0 = rd_kafka_topic_partition_new(topic, 0);
pt0->offset = 0;
pt0->metadata = 0;
pt0->metadata_size = 0;
pt0->opaque = 0;
pt0->err = 0;
pt0->_private = 0;

rd_kafka_topic_partition_list_t* partition_list = rd_kafka_topic_partition_list_new(1);

partition_list->elems = pt0;

rd_kafka_resp_err_t offsets_err = rd_kafka_offsets_for_times(rk, partition_list, 5000);

if (offsets_err != RD_KAFKA_RESP_ERR_NO_ERROR)
{
    printf("ERROR: Failed to get offsets: %d: %s.\n", offsets_err,
        rd_kafka_err2str(offsets_err));
}
else
{
    printf("Successfully got offset.\n");
}

When I run it, I get this error:

ERROR: Failed to get offsets: -186: Local: Invalid argument or configuration.

How do I correct my code to be able to get the offset for the partition?


Solution

  • Need to use rd_kafka_topic_partition_list_add to add a topic partition to the topic partition list. For example:

    rd_kafka_topic_partition_list_t* partition_list = rd_kafka_topic_partition_list_new(1);
    
    rd_kafka_topic_partition_t* pt0 = rd_kafka_topic_partition_list_add(partition_list, topic, 0);
    pt0->offset = ~0; // set to max integer value
    
    rd_kafka_resp_err_t offsets_err = rd_kafka_offsets_for_times(rk, partition_list, 10000);
    if (offsets_err != RD_KAFKA_RESP_ERR_NO_ERROR)
    {
        printf("ERROR: Failed to get offsets: %d: %s.\n", offsets_err, rd_kafka_err2str(offsets_err));
    }
    else
    {
        printf("Successfully got high watermark offset %d.\n", pt0->offset);
    }