Issue
I want to create a Kafka topic if it does not already exist. I know how to create a topic via the bash, but I don't know how to check whether it exists.
topic_exists = ??????
if not topic_exists:
subprocess.call([os.path.join(KAFKABIN, 'kafka-topics.sh'),
'--create',
'--zookeeper', '{}:2181'.format(KAFKAHOST),
'--topic', str(self.topic),
'--partitions', str(self.partitions),
'--replication-factor', str(self.replication_factor)])
Solution
You can use the --list (List all available topics)
option for kafka-topics.sh
and see if self.topic
exists in the topics
array, as shown below.
Depending on the number of topics you have this approach might be a bit heavy. If this is the case, you might be able to get away with using --describe (List details for the given topics)
which will likely return empty if the topic doesn't exist. I haven't thoroughly tested this, so I can't say for sure how solid this solution (--describe
) is, but it might be worth it for you to investigate a bit further.
wanted_topics = ['host_updates_queue', 'foo_bar']
topics = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
'--list',
'--zookeeper', '{}:2181'.format(KAFKAHOST)])
for wanted in wanted_topics:
if wanted in topics:
print '\'{}\' topic exists!'.format(wanted)
else:
print '\'{}\' topic does NOT exist!'.format(wanted)
topic_desc = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
'--describe',
'--topic', wanted,
'--zookeeper', '{}:2181'.format(KAFKAHOST)])
if not topic_desc:
print 'No description found for the topic \'{}\''.format(wanted)
OUTPUT:
root@dev:/opt/kafka/kafka_2.10-0.8.2.1# ./t.py
'host_updates_queue' topic exists!
'foo_bar' topic does NOT exist!
No description found for the topic 'foo_bar'
There is also a Broker Configuration available so you don't have to take any of these steps:
auto.create.topics.enable | true | Enable auto creation of topic on the server. If this is set to true then attempts to produce data or fetch metadata for a non-existent topic will automatically create it with the default replication factor and number of partitions.
I would take this approach if possible.
Note that you should set topic configs (server.properties
) on your broker for num.partitions
and default.replication.factor
to match your settings in your code snippet.
Answered By - chrsblck