I'm trying to get the offset (high watermark offset) for a partition using rd_kafka_offsets_for_times().
Here is a code snippet:
rd_kafka_topic_partition_t* pt0 = rd_kafka_topic_partition_new(topic, 0);
pt0->offset = 0;
pt0->metadata = 0;
pt0->metadata_size = 0;
pt0->opaque = 0;
pt0->err = 0;
pt0->_private = 0;
rd_kafka_topic_partition_list_t* partition_list = rd_kafka_topic_partition_list_new(1);
partition_list->elems = pt0;
rd_kafka_resp_err_t offsets_err = rd_kafka_offsets_for_times(rk, partition_list, 5000);
if (offsets_err != RD_KAFKA_RESP_ERR_NO_ERROR)
{
printf("ERROR: Failed to get offsets: %d: %s.\n", offsets_err,
rd_kafka_err2str(offsets_err));
}
else
{
printf("Successfully got offset.\n");
}
When I run it, I get this error:
ERROR: Failed to get offsets: -186: Local: Invalid argument or configuration.
How do I correct my code to be able to get the offset for the partition?
Need to use rd_kafka_topic_partition_list_add to add a topic partition to the topic partition list. For example:
rd_kafka_topic_partition_list_t* partition_list = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_t* pt0 = rd_kafka_topic_partition_list_add(partition_list, topic, 0);
pt0->offset = ~0; // set to max integer value
rd_kafka_resp_err_t offsets_err = rd_kafka_offsets_for_times(rk, partition_list, 10000);
if (offsets_err != RD_KAFKA_RESP_ERR_NO_ERROR)
{
printf("ERROR: Failed to get offsets: %d: %s.\n", offsets_err, rd_kafka_err2str(offsets_err));
}
else
{
printf("Successfully got high watermark offset %d.\n", pt0->offset);
}