regexdelphiapache-kafka

How to subscribe to Kafka topics using wildcards with DelphiKafkaClient (or librdkafka.dll)?


I'm using DelphiKafkaClient (https://github.com/norgepaul/DelphiKafkaClient) which is a wrapper for the Apache Kafka (librdkafka.dll).

When creating a new consumer and specifying a topic name to subscribe to (in TKafkaFactory.NewConsumer), normal names do work well, but regex wildcards seem to not work (^*, *, #, ^.* or anything else).

Here's the relevant code from DelphiKafkaClient (see my comments):

procedure TKafkaConsumerThread.DoSetup;
var
  i: Integer;
  TopicList: prd_kafka_topic_partition_list_t;
  err: rd_kafka_resp_err_t;
begin
  FKafkaHandle := TKafkaHelper.NewConsumer(FConfiguration);

  if rd_kafka_brokers_add(FKafkaHandle, PAnsiChar(AnsiString(FBrokers))) = 0 then
  begin
    raise EKafkaError.Create(StrBrokersCouldNotBe);
  end;

  rd_kafka_poll_set_consumer(FKafkaHandle);

  TopicList := rd_kafka_topic_partition_list_new(0);

  for i := Low(FTopics) to High(FTopics) do
  begin
    rd_kafka_topic_partition_list_add(
      TopicList,
      PAnsiChar(AnsiString(FTopics[i])), // 'test' works, '^*' does not
      FPartitions[i]);
  end;

  err := rd_kafka_assign(FKafkaHandle, TopicList); // no errors returned, all fine

  // I have tried using rd_kafka_subscribe alongside or instead - makes no difference
  err := rd_kafka_subscribe(FKafkaHandle, TopicList); // no errors returned, all fine
end;

rd_kafka_subscribe description says that

Wildcard (regex) topics are supported:
any topic name in the \p topics list that is prefixed with \c "^" will
be regex-matched to the full list of topics in the cluster and matching
topics will be added to the subscription list.

However, specifying any wildcards for it and calling rd_kafka_subscribe does not seem to work either (see code above).

Hence my question - how should a string be formatted and passed to DelphiKafkaClient or librdkafka.dll to work as a valid regex for matching the topic subscriptions?


Solution

  • DelphiKafkaClient (https://github.com/norgepaul/DelphiKafkaClient) comes with a really old DLL. Upgrading the DLL from version 0.9.2 to a newer one made the RegEx work properly.