pythonapache-kafkaconfluent-kafka-python

How to delete and then create topics correctly in a script for Kafka?


I am working on a script to refresh my topics on an AWS managed Kafka cluster. I need to wipe out the existing data whenever I run the script and I did it by deleting and creating the same topics again. I expect the script to print out successful deletion and successful creation when I repeatedly run it. But deletion/creation fail every other run which confuses me.

The following is my script:

# manage_topics.py
import sys
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import KafkaError, KafkaException
if __name__ == '__main__':
    kafka_cfg = '.....' # omitted  
    admin_client = AdminClient(kafka_cfg)

    deletion_ret = admin_client.delete_topics(['my-test-topic1'])
    for topic, delete_fut in deletion_ret.items():
        try:
            status = delete_fut.result()
            print(f'{topic} deletion is successful. status={status}')
        except KafkaException as e:
            print(f'could not delete topic: {topic}, error: {str(e)}')
            if e.args[0].code() != KafkaError.UNKNOWN_TOPIC_OR_PART:
                print('exiting...')
                sys.exit(1)
            else:
                print('ignoring UNKNOWN_TOPIC_OR_PART error')

    # I have two brokers for the Kafka instance I was given
    creation_ret = admin_client.create_topics([NewTopic('my-test-topic1', 5, 2)])
    for topic, create_fut in creation_ret.items():
        try:
            status = create_fut.result()
            print(f'{topic} creation is successful. status={status}')
        except KafkaException as e:
            print(f'could not create topic: {topic}, error: {str(e)}')

This is the log it generated. It doesn't really matter how long I waited between each runs. To me it seems that when a successful deletion is followed by a creation, it takes time for the deletion to take place so the following creation would fail. By the time I run it again, the previous deletion would be completed, then the current deletion would fail and the creation would succeed.

I'd really appreciate if someone could help me understand and improve this script.

$ python manage_topics.py
could not delete topic: my-test-topic1, error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ignoring UNKNOWN_TOPIC_OR_PART error
my-test-topic1 creation is successful. status=None
$
$
$ python manage_topics.py
my-test-topic1 deletion is successful. status=None
could not create topic: my-test-topic1, error: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'my-test-topic1' already exists."}
$
$
$ python manage_topics.py
could not delete topic: my-test-topic1, error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ignoring UNKNOWN_TOPIC_OR_PART error
my-test-topic1 creation is successful. status=None
$
$
$ python manage_topics.py
my-test-topic1 deletion is successful. status=None
could not create topic: my-test-topic1, error: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'my-test-topic1' already exists."}
$
$

Solution

  • Topic deletion is an async operation on the server side. Your future result only captures the response of the request (topic being marked for deletion), not the actual cluster deleting all replicas.

    Related Kafka - delete a topic completely