Kafka是一个分布式流处理平台,它通过主题(Topic)来组织和管理数据流。以下是关于Kafka主题管理的一些关键概念和操作:
kafka-topics.sh脚本创建主题。例如:bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 Properties adminClientProps = new Properties(); adminClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); try (AdminClient adminClient = AdminClient.create(adminClientProps)) { NewTopic newTopic = new NewTopic("my-topic", 3, (short) 1); CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic)); createTopicsResult.all().get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } --partitions参数或在创建主题时指定分区数。--replication-factor参数或在创建主题时指定副本因子。kafka-topics.sh脚本修改主题配置。例如:bin/kafka-topics.sh --alter --topic my-topic --partitions 5 --replication-factor 2 Properties adminClientProps = new Properties(); adminClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); try (AdminClient adminClient = AdminClient.create(adminClientProps)) { AlterTopicsResult alterTopicsResult = adminClient.alterTopics(Collections.singletonList(new TopicName("my-topic", "my-cluster"))); alterTopicsResult.all().get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } kafka-topics.sh脚本删除主题。例如:bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092 Properties adminClientProps = new Properties(); adminClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); try (AdminClient adminClient = AdminClient.create(adminClientProps)) { DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(new TopicName("my-topic", "my-cluster"))); deleteTopicsResult.all().get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } kafka-topics.sh脚本查看主题信息。例如:bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092 Properties adminClientProps = new Properties(); adminClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); try (AdminClient adminClient = AdminClient.create(adminClientProps)) { DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList("my-topic")); describeTopicsResult.names().get().forEach(topicName -> { TopicDescription topicDescription = describeTopicsResult.topicDetails().get(topicName); System.out.println("Topic: " + topicName); System.out.println("Partitions: " + topicDescription.partitions().size()); System.out.println("Replication Factor: " + topicDescription.replicationFactor()); }); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } 通过以上操作,您可以有效地管理Kafka中的主题,包括创建、修改、删除和监控主题。