Install kafka-python package
Python client for the Apache Kafka distributed stream processing system.
pip install kafka-python
from kafka.admin import KafkaAdminClient, NewTopic admin_client = KafkaAdminClient(bootstrap_servers=[ipaddress:port]) topic_names = ['topic1', 'topic2', 'topic3' , 'topic3'] def create_topics(topic_names): existing_topic_list = consumer.topics() print(list(consumer.topics())) topic_list = [] for topic in topic_names: if topic not in existing_topic_list: print('Topic : {} added '.format(topic)) topic_list.append(NewTopic(name=topic, num_partitions=3, replication_factor=3)) else: print('Topic : {topic} already exist ') try: if topic_list: admin_client.create_topics(new_topics=topic_list, validate_only=False) print("Topic Created Successfully") else: print("Topic Exist") except TopicAlreadyExistsError as e: print("Topic Already Exist") except Exception as e: print(e) def delete_topics(topic_names): try: admin_client.delete_topics(topics=topic_names) print("Topic Deleted Successfully") except UnknownTopicOrPartitionError as e: print("Topic Doesn't Exist") except Exception as e: print(e) consumer = KafkaConsumer( bootstrap_servers = "ip_address", ) create_topics(topic_names)
Top comments (3)
Nice snip but not sure if u missed it but u didnt initialize consumer and didnt assign existing_topic_list to receive the topics from consumer
Sorry for missing that code snippet.
The code snippet has been modified.