Kafka Command-Line Interface (CLI) Tools

Apache Kafka® is an open-source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale.

Kafka provides a suite of command-line interface (CLI) tools that can be accessed from the /bin directory after downloading Kafka and extracting the files. These tools offer a range of capabilities, including starting and stopping Kafka, managing topics, and handling partitions. To learn how to use each tool, simply run it with no argument or use the --help argument for detailed instructions.

Use Kafka CLI Tools with Confluent Cloud or Confluent Platform

Quickly manage and interact with your Kafka cluster using built-in Kafka CLI tools. Sign up for Confluent Cloud to start with $400 in free credits, or download Confluent Platform for local development.

To use the Kafka tools with Confluent Cloud, see How to Use Kafka Tools With Confluent Cloud.

You can use all of the Kafka CLI tools with Confluent Platform. They are installed under $CONFLUENT_HOME/bin folder when you install Confluent Platform, along with additional Confluent tools. Confluent has dropped the .sh extensions, so you do not need to use the extensions when calling the Confluent versions of these tools. In addition, when you pass a properties file, remember that Confluent Platform properties files are stored under $CONFLUENT_HOME/etc directory. For more information, see CLI Tools for Confluent Platform.

The following sections group the tools by function and provide basic usage information. In some cases, a tool is listed in more than one section.

Search by tool name

Enter a string to search and filter tools by name.

Manage Kafka and configure metadata

This section contains tools to start Kafka running in KRaft mode and to manage brokers.

kafka-server-start.sh

Use the kafka-server-start tool to start a Kafka server. You must pass the path to the properties file you want to use. To start Kafka in KRaft mode, first generate a cluster ID and store it in the properties file.

Usage details
USAGE: ./kafka-server-start.sh [-daemon] server.properties [--override property=value]* Option Description ------ ----------- --override <String> Optional property that should override values set in  server.properties file --version Print version information and exit. 

kafka-server-stop.sh

Use the kafka-server-stop tool to stop the running Kafka server. When you run this tool, you do not need to have any arguments, but starting with Kafka 3.7 you can optionally specify either a process-role value of broker or controller or a node-id value indicating the node you want to stop.

For example, to stop all brokers, you would use the following command:

./bin/kafka-server-stop.sh --process-role=broker 

To stop node 1234, you would use the following command.

./bin/kafka-server-stop.sh --node-id=1234 
Usage details
USAGE: ./kafka-server-stop.sh {[--process-role=value] | [--node-id=value]} 

kafka-storage.sh

Use the kafka-storage tool to generate a Cluster UUID and format storage with the generated UUID when running Kafka in KRaft mode. You must explicitly create a cluster ID for a KRaft cluster, and format the storage specifying that ID.

For example, the following command generates a cluster ID and stores it in a variable named KAFKA_CLUSTER_ID. The next command formats storage with that ID.

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID -c config/kraft/server.properties 
Usage details
USAGE: kafka-storage.sh [-h] {info,format,version-mapping,feature-dependencies,random-uuid} ... The Kafka storage tool. positional arguments:  {info,format,version-mapping,feature-dependencies,random-uuid}  info Get information about the Kafka log directories on this node.  format Format the Kafka log directories on this node.  version-mapping Look up the corresponding features for a given metadata version. Using the command with no  ``--release-version`` argument will return the mapping for the latest stable metadata version.  feature-dependencies Look up dependencies for a given feature version. If the feature is not known or the version  not yet defined, an error is thrown. Multiple features can be specified.  random-uuid Print a random UUID. optional arguments:  --config CONFIG, -c CONFIG The Kafka configuration file to use.  --cluster-id CLUSTER_ID, -t CLUSTER_ID The cluster ID to use.  --add-scram ADD_SCRAM, -S ADD_SCRAM A SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.  'SCRAM-SHA-256=[name=alice,password=alice-secret]'  'SCRAM-SHA-512=[name=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]'  --ignore-formatted, -g When this option is passed, the format command will skip over already formatted  directories rather than failing.  --release-version The release version to use for the initial feature settings. The minimum is 3.3-IV3; the  RELEASE_VERSION, -r default is 4.0-IV3  RELEASE_VERSION  --feature FEATURE, The setting to use for a specific feature, in feature=level format. For example: ``kraft.version=1``.  -f FEATURE  --standalone, -s Used to initialize a controller as a single-node dynamic quorum.  --no-initial-controllers, -N Used to initialize a server without a dynamic quorum topology. --initial-controllers Used to initialize a server with a specific dynamic quorum topology. The argument is a INITIAL_CONTROLLERS, comma-separated list of id@hostname:port:directory. The same values must be used to -I INITIAL_CONTROLLERS format all nodes. For example:  0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:MvDxzVmcRsaTz33bUuRU6A,  2@example.com:8084:07R5amHmR32VDA6jHkGbTA  -h, --help show this help message and exit 

kafka-cluster.sh

Use the kafka-cluster tool to get the ID of a cluster or unregister a cluster. The following example shows how to retrieve the cluster ID, which requires a bootstrap-server argument.

bin/kafka-cluster.sh cluster-id --bootstrap-server localhost:9092 

The output for this command might look like the following.

Cluster ID: WZEKwK-b123oT3ZOSU0dgw 
Usage details
USAGE: kafka-cluster.sh [-h] {cluster-id,unregister,list-endpoints} ... The Kafka cluster tool. positional arguments: {cluster-id,unregister,list-endpoints}  cluster-id Get information about the ID of a cluster.  unregister Unregister a broker.  list-endpoints List endpoints optional arguments:  -h, --help show this help message and exit 

kafka-features.sh

Use the kafka-features tool to manage feature flags to disable or enable functionality at runtime in Kafka. Pass the describe argument to describe the current active feature flags, upgrade to upgrade one or more feature flags, downgrade to downgrade one or more, and disable to disable one or more feature flags, which is the same as downgrading the version to zero.

Usage details
usage: kafka-features [-h] [--command-config COMMAND_CONFIG] (--bootstrap-server BOOTSTRAP_SERVER |  --bootstrap-controller BOOTSTRAP_CONTROLLER) {describe,upgrade,downgrade,disable,version-mapping,feature-dependencies} ... This tool manages feature flags in Kafka. positional arguments:  {describe,upgrade,downgrade,disable,version-mapping,feature-dependencies}  describe Describes the current active feature flags.  upgrade Upgrade one or more feature flags.  downgrade Upgrade one or more feature flags.  disable Disable one or more feature flags. This is the same as downgrading the version to zero.  version-mapping Look up the corresponding features for a given metadata version. Using the command with no  --release-version argument will return the mapping for the latest stable metadata version  feature-dependencies Look up dependencies for a given feature version. If the feature is not known or the version  not yet defined, an error is thrown. Multiple features can be specified. optional arguments:  -h, --help show this help message and exit  --bootstrap-server BOOTSTRAP_SERVER  A comma-separated list of host:port pairs to use for establishing the connection to the  Kafka cluster.  --bootstrap-controller BOOTSTRAP_CONTROLLER  A comma-separated list of host:port pairs to use for establishing the connection to the  KRaft quorum.  --command-config COMMAND_CONFIG  Property file containing configs to be passed to Admin Client. 

kafka-broker-api-versions.sh

The kafka-broker-api-versions tool retrieves and displays broker information. For example, the following command outputs the version of Kafka that is running on the broker:

bin/kafka-broker-api-versions.sh --bootstrap-server host1:9092 --version 

This command might have the following output:

3.3.1 (Commit:e23c59d00e687ff5) 
Usage details
This tool helps to retrieve broker version information. Option Description ------ ----------- --bootstrap-server <String: server(s) REQUIRED: The server to connect to.  to use for bootstrapping> --command-config <String: command A property file containing configs to  config property file> be passed to Admin Client. --help Print usage information. --version Display Kafka version. 

kafka-metadata-quorum.sh

Use the kafka-metadata-quorum tool to query the metadata quorum status. This tool is useful when you are debugging a cluster in KRaft mode. Pass the describe command to describe the current state of the metadata quorum.

The following code example displays a summary of the metadata quorum:

bin/kafka-metadata-quorum.sh --bootstrap-server host1:9092 describe --status 

The output for this command might look like the following.

ClusterId: fMCL8kv1SWm87L_Md-I2hg LeaderId: 3002 LeaderEpoch: 2 HighWatermark: 10 MaxFollowerLag: 0 MaxFollowerLagTimeMs: -1 CurrentVoters: [3000,3001,3002] CurrentObservers: [0,1,2] 
Usage details
usage: kafka-metadata-quorum [-h] [--command-config COMMAND_CONFIG] (--bootstrap-server BOOTSTRAP_SERVER |  --bootstrap-controller BOOTSTRAP_CONTROLLER) {describe,add-controller,remove-controller} ... This tool describes kraft metadata quorum status. positional arguments:  {describe,add-controller,remove-controller}  describe Describe the metadata quorum info  add-controller Add a controller to the KRaft controller cluster  remove-controller Remove a controller from the KRaft controller cluster optional arguments:  -h, --help show this help message and exit  --bootstrap-server BOOTSTRAP_SERVER  A comma-separated list of host:port pairs to use for establishing the connection to the  Kafka cluster.  --bootstrap-controller BOOTSTRAP_CONTROLLER  A comma-separated list of host:port pairs to use for establishing the connection to the  Kafka controllers.  --command-config COMMAND_CONFIG  Property file containing configs to be passed to Admin Client. 

kafka-metadata-shell.sh

The kafka-metadata-shell tool enables you to interactively examine the metadata stored in a KRaft cluster.

To open the metadata shell, provide the path to your cluster’s metadata log directory using the --directory flag:

kafka-metadata-shell.sh --directory tmp/kraft-combined-logs/_cluster-metadata-0/ 

To ensure kafka-metadata-shell.sh functions correctly, the --directory flag must point to your Kafka cluster’s metadata log directory. While examples often show /tmp/kraft-combined-logs/_cluster-metadata-0/, this path varies based on your Kafka configuration.

If your metadata files are not in their default or expected location, you have two options:

  • Specify the full, correct path to your metadata log directory.

  • Copy the metadata log directory to a temporary location (for example, /tmp), and then specify that temporary path.

After the shell loads, you can explore the contents of the metadata log, and exit. The following code shows an example of this.

Loading... [ Kafka Metadata Shell ] >> ls image local >>ls image acls cells clientQuotas cluster clusterLinks configs delegationToken encryptor features producerIds provenance replicaExclusions scram tenants topics >> ls image/topics/ byID byLinkId byName byTenant >>ls image/topics/byName test perf_test_124 othersourcetopic >> cat /image/topics/byName/othersourcetopic/0/ { "replicas": [1], "observers": null, "directories": ["U9TE_0zN-oReJ5XwsShc-w"], "isr": [1], "removingReplicas": null, "addingReplicas": null, "removingObservers": null, "addingObservers": null, "elr": null, "lastKnownElr": null, "leader": -1, "leaderRecoveryState": "RECOVERED", "leaderEpoch": 57, "partitionEpoch": 9, "linkedLeaderEpoch": 54, "linkState": "ACTIVE" } >> exit 

For more information, see the Kafka Wiki.

Usage details
usage: metadata-shell-tool [-h] [--snapshot SNAPSHOT] [command [command ...]] The Apache Kafka metadata tool positional arguments:  command The command to run. optional arguments:  -h, --help show this help message and exit  --snapshot SNAPSHOT,  -s SNAPSHOT The snapshot file to read. 

kafka-configs.sh

Use the kafka-configs tool to change and describe topic, client, user, broker, IP configuration setting or KRaft controller. To describe or view a KRaft controller, use the --bootstrap-controller option, and do not specify a bootstrap-server.

To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.

/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000 

When you use the --add-config flag to add multiple values, use square brackets around the comma-separated list like the following example:

/bin/kafka-configs.sh --bootstrap-server host1:9092 --alter --add-config max.connections.per.ip.overrides=[host1:50,host2:9] --entity-type brokers --entity-default 

The following example shows how you might check the cluster ID, by specifying the --bootstrap-controller option.

/bin/kafka-cluster.sh cluster-id --bootstrap-controller localhost:9092 

See Kafka Topic Operations for more examples of how to work with topics.

Usage details
This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip or client-metrics Option Description ------ ----------- --add-config <String> Key Value pairs of configs to add.  Square brackets can be used to group  values which contain commas: 'k1=v1,  k2=[v1,v2,v2],k3=v3'. The following  is a list of valid configurations:  For entity-type 'topics':  cleanup.policy  compression.gzip.level  compression.lz4.level  compression.type  compression.zstd.level  delete.retention.ms  file.delete.delay.ms  flush.messages  flush.ms  follower.replication.throttled.  replicas  index.interval.bytes  leader.replication.throttled.replicas  local.retention.bytes  local.retention.ms  max.compaction.lag.ms  max.message.bytes  message.timestamp.after.max.ms  message.timestamp.before.max.ms  message.timestamp.type  min.cleanable.dirty.ratio  min.compaction.lag.ms  min.insync.replicas  preallocate  remote.log.copy.disable  remote.log.delete.on.disable  remote.storage.enable  retention.bytes  retention.ms  segment.bytes  segment.index.bytes  segment.jitter.ms  segment.ms  unclean.leader.election.enable  For entity-type 'brokers':  background.threads  compression.gzip.level  compression.lz4.level  compression.type  compression.zstd.level  follower.replication.throttled.rate  leader.replication.throttled.rate  listener.security.protocol.map  listeners  log.cleaner.backoff.ms  log.cleaner.dedupe.buffer.size  log.cleaner.delete.retention.ms  log.cleaner.io.buffer.load.factor  log.cleaner.io.buffer.size  log.cleaner.io.max.bytes.per.second  log.cleaner.max.compaction.lag.ms  log.cleaner.min.cleanable.ratio  log.cleaner.min.compaction.lag.ms  log.cleaner.threads  log.cleanup.policy  log.flush.interval.messages  log.flush.interval.ms  log.index.interval.bytes  log.index.size.max.bytes  log.local.retention.bytes  log.local.retention.ms  log.message.timestamp.after.max.ms  log.message.timestamp.before.max.ms  log.message.timestamp.type  log.preallocate  log.retention.bytes  log.retention.ms  log.roll.jitter.ms  log.roll.ms  log.segment.bytes  log.segment.delete.delay.ms  max.connection.creation.rate  max.connections  max.connections.per.ip  max.connections.per.ip.overrides  message.max.bytes  metric.reporters  min.insync.replicas  num.io.threads  num.network.threads  num.recovery.threads.per.data.dir  num.replica.fetchers  principal.builder.class  producer.id.expiration.ms  remote.fetch.max.wait.ms  remote.list.offsets.request.timeout.ms  remote.log.index.file.cache.total.  size.bytes  remote.log.manager.copier.thread.pool.  size  remote.log.manager.copy.max.bytes.per.  second  remote.log.manager.expiration.thread.  pool.size  remote.log.manager.fetch.max.bytes.  per.second  remote.log.reader.threads  replica.alter.log.dirs.io.max.bytes.  per.second  sasl.enabled.mechanisms  sasl.jaas.config  sasl.kerberos.kinit.cmd  sasl.kerberos.min.time.before.relogin  sasl.kerberos.principal.to.local.rules  sasl.kerberos.service.name  sasl.kerberos.ticket.renew.jitter  sasl.kerberos.ticket.renew.window.  factor  sasl.login.refresh.buffer.seconds  sasl.login.refresh.min.period.seconds  sasl.login.refresh.window.factor  sasl.login.refresh.window.jitter  sasl.mechanism.inter.broker.protocol  ssl.cipher.suites  ssl.client.auth  ssl.enabled.protocols  ssl.endpoint.identification.algorithm  ssl.engine.factory.class  ssl.key.password  ssl.keymanager.algorithm  ssl.keystore.certificate.chain  ssl.keystore.key  ssl.keystore.location  ssl.keystore.password  ssl.keystore.type  ssl.protocol  ssl.provider  ssl.secure.random.implementation  ssl.trustmanager.algorithm  ssl.truststore.certificates  ssl.truststore.location  ssl.truststore.password  ssl.truststore.type  transaction.partition.verification.  enable  unclean.leader.election.enable  For entity-type 'users':  SCRAM-SHA-256  SCRAM-SHA-512  consumer_byte_rate  controller_mutation_rate  producer_byte_rate  request_percentage  For entity-type 'clients':  consumer_byte_rate  controller_mutation_rate  producer_byte_rate  request_percentage  For entity-type 'ips':  connection_creation_rate  For entity-type 'client-metrics':  interval.ms  match  metrics  For entity-type 'groups':  consumer.heartbeat.interval.ms  consumer.session.timeout.ms  share.auto.offset.reset  share.heartbeat.interval.ms  share.record.lock.duration.ms  share.session.timeout.ms  Entity types 'users' and 'clients' may  be specified together to update  config for clients of a specific  user. --add-config-file <String> Path to a properties file with configs  to add. See add-config for a list of  valid configurations. --all List all configs for the given topic,  broker, or broker-logger entity  (includes static configuration when  the entity type is brokers) --alter Alter the configuration for the entity. --bootstrap-controller <String: The Kafka controllers to connect to. controller to connect to> --bootstrap-server <String: server to The Kafka servers to connect to. connect to> --broker <String> The broker's ID. --broker-defaults The config defaults for all brokers. --broker-logger <String> The broker's ID for its logger config. --client <String> The client's ID. --client-defaults The config defaults for all clients. --client-metrics <String> The client metrics config resource  name. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used  only with --bootstrap-server option  for describing and altering broker  configs. --delete-config <String> config keys to remove 'k1,k2' --describe List configs for the given entity. --entity-default Default entity name for  clients/users/brokers/ips (applies  to corresponding entity type in  command line) --entity-name <String> Name of entity (topic name/client  id/user principal name/broker  id/ip/client metrics) --entity-type <String> Type of entity  (topics/clients/users/brokers/broker-  loggers/ips/client-metrics) --force Suppress console prompts --group <String> The group ID. --help Print usage information. --ip <String> The IP address. --ip-defaults The config defaults for all IPs. --topic <String> The topic's name. --user <String> The user's principal name. --user-defaults The config defaults for all users. --version Display Kafka version. 

Manage topics, partitions, and replication

kafka-topics.sh

Use the kafka-topics tool to create or delete a topic. You can also use the tool to retrieve a list of topics associated with a Kafka cluster. For more information, see Kafka Topic Operations.

To change a topic, see kafka-configs.sh, or how to modify a topic.

Example:

bin/kafka-topics.sh --bootstrap-server host1:9092 --topic test-topic --partitions 3 
Usage details
This tool helps to create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions and  replica assignment. (To alter topic  configurations, the kafka-configs  tool can be used.) --at-min-isr-partitions if set when describing topics, only  show partitions whose isr count is  equal to the configured minimum. --bootstrap-server <String: server to REQUIRED: The Kafka server to connect  connect to> to. --command-config <String: command Property file containing configs to be  config property file> passed to the Admin Client. --config <String: name=value> A topic configuration override for the  topic being created or altered. The  following is a list of valid  configurations:  cleanup.policy  compression.gzip.level  compression.lz4.level  compression.type  compression.zstd.level  delete.retention.ms  file.delete.delay.ms  flush.messages  flush.ms  follower.replication.throttled.  replicas  index.interval.bytes  leader.replication.throttled.replicas  local.retention.bytes  local.retention.ms  max.compaction.lag.ms  max.message.bytes  message.timestamp.after.max.ms  message.timestamp.before.max.ms  message.timestamp.type  min.cleanable.dirty.ratio  min.compaction.lag.ms  min.insync.replicas  preallocate  remote.log.copy.disable  remote.log.delete.on.disable  remote.storage.enable  retention.bytes  retention.ms  segment.bytes  segment.index.bytes  segment.jitter.ms  segment.ms  unclean.leader.election.enable  See the Kafka documentation for full  details on the topic configs. It is  supported only in combination with --  create. (To alter topic  configurations, the kafka-configs  tool can be used.) --create Create a new topic. --delete Delete a topic --delete-config <String: name> This option is no longer supported and  has been deprecated since 4.0. --describe List details for the given topics. --exclude-internal Exclude internal topics when running  list or describe command. The  internal topics will be listed by  default. --help Print usage information. --if-exists if set when altering or deleting or  describing topics, the action will  only execute if the topic exists. --if-not-exists If set when creating topics, the  action will only execute if the  topic does not already exist. --list List all available topics. --partition-size-limit-per-response The maximum partition size to be  <Integer: maximum number of included in one  partitions per response> DescribeTopicPartitions response. --partitions <Integer: # of partitions> The number of partitions for the topic  being created or altered (WARNING:  If partitions are increased for a  topic that has a key, the partition  logic or ordering of the messages  will be affected). If not supplied  for create, defaults to the cluster  default. --replica-assignment <String: A list of manual partition-to-broker  broker_id_for_part1_replica1 : assignments for the topic being  broker_id_for_part1_replica2 , created or altered.  broker_id_for_part2_replica1 :  broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each  replication factor> partition in the topic being  created. If not supplied, defaults  to the cluster default. --topic <String: topic> The topic to create, alter, describe  or delete. It also accepts a regular  expression, except for --create  option. Put topic name in double  quotes and use the '\' prefix to  escape regular expression symbols; e.  g. "test\.topic". --topic-id <String: topic-id> The topic-id to describe. This is used  only with --bootstrap-server option  for describing topics. --topics-with-overrides If set when describing topics, only  show topics that have overridden  configs. --unavailable-partitions If set when describing topics, only  show partitions whose leader is not  available. --under-min-isr-partitions If set when describing topics, only  show partitions whose isr count is  less than the configured minimum. --under-replicated-partitions If set when describing topics, only  show under replicated partitions --version Display Kafka version. 

kafka-configs.sh

Use the kafka-configs tool to change and describe topic, client, user, broker, IP configuration setting or KRaft controller. To describe or view a KRaft controller, use the --bootstrap-controller option, and do not specify a bootstrap-server.

To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.

/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000 

When you use the --add-config flag to add multiple values, use square brackets around the comma-separated list like the following example:

/bin/kafka-configs.sh --bootstrap-server host1:9092 --alter --add-config max.connections.per.ip.overrides=[host1:50,host2:9] --entity-type brokers --entity-default 

The following example shows how you might check the cluster ID, by specifying the --bootstrap-controller option.

/bin/kafka-cluster.sh cluster-id --bootstrap-controller localhost:9092 

See Kafka Topic Operations for more examples of how to work with topics.

Usage details
This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip or client-metrics Option Description ------ ----------- --add-config <String> Key Value pairs of configs to add.  Square brackets can be used to group  values which contain commas: 'k1=v1,  k2=[v1,v2,v2],k3=v3'. The following  is a list of valid configurations:  For entity-type 'topics':  cleanup.policy  compression.gzip.level  compression.lz4.level  compression.type  compression.zstd.level  delete.retention.ms  file.delete.delay.ms  flush.messages  flush.ms  follower.replication.throttled.  replicas  index.interval.bytes  leader.replication.throttled.replicas  local.retention.bytes  local.retention.ms  max.compaction.lag.ms  max.message.bytes  message.timestamp.after.max.ms  message.timestamp.before.max.ms  message.timestamp.type  min.cleanable.dirty.ratio  min.compaction.lag.ms  min.insync.replicas  preallocate  remote.log.copy.disable  remote.log.delete.on.disable  remote.storage.enable  retention.bytes  retention.ms  segment.bytes  segment.index.bytes  segment.jitter.ms  segment.ms  unclean.leader.election.enable  For entity-type 'brokers':  background.threads  compression.gzip.level  compression.lz4.level  compression.type  compression.zstd.level  follower.replication.throttled.rate  leader.replication.throttled.rate  listener.security.protocol.map  listeners  log.cleaner.backoff.ms  log.cleaner.dedupe.buffer.size  log.cleaner.delete.retention.ms  log.cleaner.io.buffer.load.factor  log.cleaner.io.buffer.size  log.cleaner.io.max.bytes.per.second  log.cleaner.max.compaction.lag.ms  log.cleaner.min.cleanable.ratio  log.cleaner.min.compaction.lag.ms  log.cleaner.threads  log.cleanup.policy  log.flush.interval.messages  log.flush.interval.ms  log.index.interval.bytes  log.index.size.max.bytes  log.local.retention.bytes  log.local.retention.ms  log.message.timestamp.after.max.ms  log.message.timestamp.before.max.ms  log.message.timestamp.type  log.preallocate  log.retention.bytes  log.retention.ms  log.roll.jitter.ms  log.roll.ms  log.segment.bytes  log.segment.delete.delay.ms  max.connection.creation.rate  max.connections  max.connections.per.ip  max.connections.per.ip.overrides  message.max.bytes  metric.reporters  min.insync.replicas  num.io.threads  num.network.threads  num.recovery.threads.per.data.dir  num.replica.fetchers  principal.builder.class  producer.id.expiration.ms  remote.fetch.max.wait.ms  remote.list.offsets.request.timeout.ms  remote.log.index.file.cache.total.  size.bytes  remote.log.manager.copier.thread.pool.  size  remote.log.manager.copy.max.bytes.per.  second  remote.log.manager.expiration.thread.  pool.size  remote.log.manager.fetch.max.bytes.  per.second  remote.log.reader.threads  replica.alter.log.dirs.io.max.bytes.  per.second  sasl.enabled.mechanisms  sasl.jaas.config  sasl.kerberos.kinit.cmd  sasl.kerberos.min.time.before.relogin  sasl.kerberos.principal.to.local.rules  sasl.kerberos.service.name  sasl.kerberos.ticket.renew.jitter  sasl.kerberos.ticket.renew.window.  factor  sasl.login.refresh.buffer.seconds  sasl.login.refresh.min.period.seconds  sasl.login.refresh.window.factor  sasl.login.refresh.window.jitter  sasl.mechanism.inter.broker.protocol  ssl.cipher.suites  ssl.client.auth  ssl.enabled.protocols  ssl.endpoint.identification.algorithm  ssl.engine.factory.class  ssl.key.password  ssl.keymanager.algorithm  ssl.keystore.certificate.chain  ssl.keystore.key  ssl.keystore.location  ssl.keystore.password  ssl.keystore.type  ssl.protocol  ssl.provider  ssl.secure.random.implementation  ssl.trustmanager.algorithm  ssl.truststore.certificates  ssl.truststore.location  ssl.truststore.password  ssl.truststore.type  transaction.partition.verification.  enable  unclean.leader.election.enable  For entity-type 'users':  SCRAM-SHA-256  SCRAM-SHA-512  consumer_byte_rate  controller_mutation_rate  producer_byte_rate  request_percentage  For entity-type 'clients':  consumer_byte_rate  controller_mutation_rate  producer_byte_rate  request_percentage  For entity-type 'ips':  connection_creation_rate  For entity-type 'client-metrics':  interval.ms  match  metrics  For entity-type 'groups':  consumer.heartbeat.interval.ms  consumer.session.timeout.ms  share.auto.offset.reset  share.heartbeat.interval.ms  share.record.lock.duration.ms  share.session.timeout.ms  Entity types 'users' and 'clients' may  be specified together to update  config for clients of a specific  user. --add-config-file <String> Path to a properties file with configs  to add. See add-config for a list of  valid configurations. --all List all configs for the given topic,  broker, or broker-logger entity  (includes static configuration when  the entity type is brokers) --alter Alter the configuration for the entity. --bootstrap-controller <String: The Kafka controllers to connect to. controller to connect to> --bootstrap-server <String: server to The Kafka servers to connect to. connect to> --broker <String> The broker's ID. --broker-defaults The config defaults for all brokers. --broker-logger <String> The broker's ID for its logger config. --client <String> The client's ID. --client-defaults The config defaults for all clients. --client-metrics <String> The client metrics config resource  name. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used  only with --bootstrap-server option  for describing and altering broker  configs. --delete-config <String> config keys to remove 'k1,k2' --describe List configs for the given entity. --entity-default Default entity name for  clients/users/brokers/ips (applies  to corresponding entity type in  command line) --entity-name <String> Name of entity (topic name/client  id/user principal name/broker  id/ip/client metrics) --entity-type <String> Type of entity  (topics/clients/users/brokers/broker-  loggers/ips/client-metrics) --force Suppress console prompts --group <String> The group ID. --help Print usage information. --ip <String> The IP address. --ip-defaults The config defaults for all IPs. --topic <String> The topic's name. --user <String> The user's principal name. --user-defaults The config defaults for all users. --version Display Kafka version. 

kafka-get-offsets.sh

Use the kafka-get-offsets tool to retrieve topic-partition offsets.

Usage details
An interactive shell for getting topic-partition offsets. Option Description ------ ----------- --bootstrap-server <String: HOST1: REQUIRED. The server(s) to connect to  PORT1,...,HOST3:PORT3> in the form HOST1:PORT1,HOST2:PORT2. --broker-list <String: HOST1:PORT1,..., DEPRECATED, use --bootstrap-server  HOST3:PORT3> instead; ignored if --bootstrap-  server is specified. The server(s)  to connect to in the form HOST1:  PORT1,HOST2:PORT2. --command-config <String: config file> Property file containing configs to be  passed to Admin Client. --exclude-internal-topics By default, internal topics are  included. If specified, internal  topics are excluded. --help Print usage information. --partitions <String: partition ids> Comma separated list of partition ids  to get the offsets for. If not  present, all partitions of the  authorized topics are queried.  Cannot be used if --topic-partitions  is present. --time <String: <timestamp> / -1 or timestamp of the offsets before that.  latest / -2 or earliest / -3 or max- [Note: No offset is returned, if the  timestamp> timestamp greater than recently  committed record timestamp is  given.] (default: latest) --topic <String: topic> The topic to get the offsets for. It  also accepts a regular expression.  If not present, all authorized  topics are queried. Cannot be used  if --topic-partitions is present. --topic-partitions <String: topic1:1, Comma separated list of topic-  topic2:0-3,topic3,topic4:5-,topic5:-3 partition patterns to get the  > offsets for, with the format of:  ([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)  -([0-9]*))))?. The first group is  an optional regex for the topic  name, if omitted, it matches any  topic name. The section after ':'  describes a partition pattern,  which can be: a number, a range in  the format of NUMBER-NUMBER (lower  inclusive, upper exclusive), an  inclusive lower bound in the format  of NUMBER-, an exclusive upper  bound in the format of -NUMBER or  may be omitted to accept all  partitions. --version Display Kafka version. 

kafka-leader-election.sh

Use the kafka-leader-election tool to attempt to elect a new leader for a set of topic partitions.

Run this tool manually to restore leadership if the auto.leader.rebalance.enable property is set to false.

Usage details
This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas. Option Description ------ ----------- --admin.config <String: config file> Configuration properties files to pass  to the admin client --all-topic-partitions Perform election on all of the  eligible topic partitions based on  the type of election (see the --  election-type flag). Not allowed if  --topic or --path-to-json-file is  specified. --bootstrap-server <String: host:port> A hostname and port for the broker to  connect to, in the form host:port.  Multiple comma separated URLs can be  given. REQUIRED. --election-type <[PREFERRED,UNCLEAN]: Type of election to attempt. Possible  election type> values are "preferred" for preferred  leader election or "unclean" for  unclean leader election. If  preferred election is selection, the  election is only performed if the  current leader is not the preferred  leader for the topic partition. If  unclean election is selected, the  election is only performed if there  are no leader for the topic  partition. REQUIRED. --help Print usage information. --partition <Integer: partition id> Partition id for which to perform an  election. REQUIRED if --topic is  specified. --path-to-json-file <String: Path to The JSON file with the list of  JSON file> partition for which leader elections  should be performed. This is an  example format.  {"partitions":  [{"topic": "foo", "partition": 1},  {"topic": "foobar", "partition": 2}]  }  Not allowed if --all-topic-partitions  or --topic flags are specified. --topic <String: topic name> Name of topic for which to perform an  election. Not allowed if --path-to-  json-file or --all-topic-partitions  is specified. --version Display Kafka version. 

kafka-transactions.sh

Use the kafka-transactions tool to list and describe transactions. Use to detect and abort hanging transactions. For more information, see Detect and Abort Hanging Transactions

Usage details
usage: kafka-transactions.sh [-h] [-v] [--command-config FILE] --bootstrap-server host:port COMMAND ... This tool is used to analyze the transactional state of producers in the cluster. It can be used to detect and recover from hanging transactions. optional arguments:  -h, --help show this help message and exit  -v, --version show the version of this Kafka distribution and exit  --command-config FILE property file containing configs to be passed to admin client  --bootstrap-server host:port  hostname and port for the broker to connect to, in the form `host:port`  (multiple comma-separated entries can be given) commands:  list list transactions  describe describe the state of an active transactional-id  describe-producers describe the states of active producers for a topic partition  abort abort a hanging transaction (requires administrative privileges)  find-hanging find hanging transactions 

kafka-reassign-partitions.sh

Use the kafka-reassign-partitions to move topic partitions between replicas You pass a JSON-formatted file to specify the new replicas. To learn more, see Changing the replication factor in the Confluent documentation.

Usage details
This tool helps to move topic partitions between replicas. Option Description ------ ----------- --additional Execute this reassignment in addition  to any other ongoing ones. This  option can also be used to change  the throttle of an ongoing  reassignment. --bootstrap-controller The controller to use for reassignment. [String: bootstrap controller By default, the tool will get the quorum controller. This to connect to] option supports the actions --cancel and --list. --bootstrap-server <String: Server(s) REQUIRED: the server(s) to use for  to use for bootstrapping> bootstrapping. --broker-list <String: brokerlist> The list of brokers to which the  partitions need to be reassigned in  the form "0,1,2". This is required  if --topics-to-move-json-file is  used to generate reassignment  configuration --cancel Cancel an active reassignment. --command-config <String: Admin client Property file containing configs to be  property file> passed to Admin Client. --disable-rack-aware Disable rack aware replica assignment --execute Kick off the reassignment as specified  by the --reassignment-json-file  option. --generate Generate a candidate partition  reassignment configuration. Note  that this only generates a candidate  assignment, it does not execute it. --help Print usage information. --list List all active partition  reassignments. --preserve-throttles Do not modify broker or topic  throttles. --reassignment-json-file <String: The JSON file with the partition  manual assignment json file path> reassignment configurationThe format  to use is -  {"partitions":  [{"topic": "foo",  "partition": 1,  "replicas": [1,2,3,4],  "observers":[3,4],  "log_dirs": ["dir1","dir2","dir3","  dir4"] }],  "version":1  }  Note that "log_dirs" is optional. When  it is specified, its length must  equal the length of the replicas  list. The value in this list can be  either "any" or the absolution path  of the log directory on the broker.  If absolute log directory path is  specified, the replica will be moved  to the specified log directory on  the broker.  Note that "observers" is optional.  When it is specified it must be a  suffix of the replicas list. --replica-alter-log-dirs-throttle The movement of replicas between log  <Long: replicaAlterLogDirsThrottle> directories on the same broker will  be throttled to this value  (bytes/sec). This option can be  included with --execute when a  reassignment is started, and it can  be altered by resubmitting the  current reassignment along with the  --additional flag. The throttle rate  should be at least 1 KB/s. (default:  -1) --throttle <Long: throttle> The movement of partitions between  brokers will be throttled to this  value (bytes/sec). This option can  be included with --execute when a  reassignment is started, and it can  be altered by resubmitting the  current reassignment along with the  --additional flag. The throttle rate  should be at least 1 KB/s. (default:  -1) --timeout <Long: timeout> The maximum time in ms to wait for log  directory replica assignment to  begin. (default: 10000) --topics-to-move-json-file <String: Generate a reassignment configuration  topics to reassign json file path> to move the partitions of the  specified topics to the list of  brokers specified by the --broker-  list option. The format to use is -  {"topics":  [{"topic": "foo"},{"topic": "foo1"}],  "version":1  } --verify Verify if the reassignment completed  as specified by the --reassignment-  json-file option. If there is a  throttle engaged for the replicas  specified, and the rebalance has  completed, the throttle will be  removed --version Display Kafka version. 

kafka-delete-records.sh

Use the kafka-delete-records tool to delete partition records. Use this if a topic receives bad data. Pass a JSON-formatted file that specifies the topic, partition, and offset for data deletion. Data will be deleted up to the offset specified. Example:

bin/kafka-delete-records.sh --bootstrap-server host1:9092 --offset-json-file deleteme.json 
Usage details
This tool helps to delete records of the given partitions down to the specified offset. Option Description ------ ----------- --bootstrap-server <String: server(s) REQUIRED: The server to connect to.  to use for bootstrapping> --command-config <String: command A property file containing configs to  config property file path> be passed to Admin Client. --help Print usage information. --offset-json-file <String: Offset REQUIRED: The JSON file with offset  json file path> per partition. The format to use is:  {"partitions":  [{"topic": "foo", "partition": 1,  "offset": 1}],  "version":1  } --version Display Kafka version. 

kafka-log-dirs.sh

Use the kafka-log-dirs tool to get a list of replicas per log directory on a broker.

Usage details
This tool helps to query log directory usage on the specified brokers. ------ ----------- --bootstrap-server <String: The server REQUIRED: the server(s) to use for  (s) to use for bootstrapping> bootstrapping --broker-list <String: Broker list> The list of brokers to be queried in  the form "0,1,2". All brokers in the  cluster will be queried if no broker  list is specified --command-config <String: Admin client Property file containing configs to be  property file> passed to Admin Client. --describe Describe the specified log directories  on the specified brokers. --help Print usage information. --topic-list <String: Topic list> The list of topics to be queried in  the form "topic1,topic2,topic3". All  topics will be queried if no topic  list is specified (default: ) --version Display Kafka version. 

kafka-replica-verification.sh

Use the kafka-replica-verification tool to verify that all replicas of a topic contain the same data. Requires a broker-list parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to.

Usage details
Validate that all replicas for a set of topics have the same data. Option Description ------ ----------- --broker-list <String: hostname: REQUIRED: The list of hostname and  port,...,hostname:port> port of the server to connect to. --fetch-size <Integer: bytes> The fetch size of each request.  (default: 1048576) --help Print usage information. --max-wait-ms <Integer: ms> The max amount of time each fetch  request waits. (default: 1000) --report-interval-ms <Long: ms> The reporting interval. (default:  30000) --time <Long: timestamp/-1(latest)/-2 Timestamp for getting the initial  (earliest)> offsets. (default: -1) --topic-white-list <String: Java regex DEPRECATED use --topics-include  (String)> instead; ignored if --topics-include  specified. List of topics to verify  replica consistency. Defaults to '.  *' (all topics) (default: .*) --topics-include <String: Java regex List of topics to verify replica  (String)> consistency. Defaults to '.*' (all  topics) (default: .*) --version Print version information and exit. 

kafka-mirror-maker.sh

DEPRECATED: For an alternative, see connect-mirror-maker.sh. Enables the creation of a replica of an existing Kafka cluster. Example: bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters secondary. Learn more Kafka mirroring

Usage details
This tool helps to continuously copy data between two Kafka clusters. Option Description ------ ----------- --abort.on.send.failure <String: Stop Configure the mirror maker to exit on  the entire mirror maker when a send a failed send. (default: true)  failure occurs> --consumer.config <String: config file> Embedded consumer config for consuming  from the source cluster. --consumer.rebalance.listener <String: The consumer rebalance listener to use  A custom rebalance listener of type for mirror maker consumer.  ConsumerRebalanceListener> --help Print usage information. --include <String: Java regex (String)> List of included topics to mirror. --message.handler <String: A custom Message handler which will process  message handler of type every record in-between consumer and  MirrorMakerMessageHandler> producer. --message.handler.args <String: Arguments used by custom message  Arguments passed to message handler handler for mirror maker.  constructor.> --new.consumer DEPRECATED Use new consumer in mirror  maker (this is the default so this  option will be removed in a future  version). --num.streams <Integer: Number of Number of consumption streams.  threads> (default: 1) --offset.commit.interval.ms <Integer: Offset commit interval in ms.  offset commit interval in (default: 60000)  millisecond> --producer.config <String: config file> Embedded producer config. --rebalance.listener.args <String: Arguments used by custom rebalance  Arguments passed to custom rebalance listener for mirror maker consumer.  listener constructor as a string.> --version Display Kafka version. --whitelist <String: Java regex DEPRECATED, use --include instead;  (String)> ignored if --include specified. List  of included topics to mirror. 

connect-mirror-maker.sh

Use the connect-mirror-maker tool to replicate topics from one cluster to another using the Connect framework. You must pass an an mm2.properties MM2 configuration file. For more information, see KIP-382: MirrorMaker 2.0 or Getting up to speed with MirrorMaker 2.

Usage details
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.properties MirrorMaker 2.0 driver positional arguments:  mm2.properties MM2 configuration file. optional arguments:  -h, --help show this help message and exit  --clusters CLUSTER [CLUSTER ...]  Target cluster to use for this node. 

Client, producer, and consumer tools

kafka-client-metrics.sh

Use the kafka-client-metrics tool to manipulate and describe client metrics configurations for clusters where client metrics are enabled. This tool provides a simpler alternative to using kafka-configs.sh to configure client metrics.

For example, to list all of the client metric configuration resource, use the following command:

kafka-client-metrics.sh --bootstrap-server HOST1:PORT1 --describe 

To describe a specific configuration:

kafka-client-metrics.sh --bootstrap-server HOST1:PORT1 --describe --name MYMETRICS 

You can use this tool to create a client metric configuration resource and generate a unique name. In this example, --generate-name is used to create a type-4 UUID to use as the client metrics configuration resource name:

kafka-client-metrics.sh --bootstrap-server HOST1:PORT1 --alter --generate-name \ --metrics org.apache.kafka.producer.node.request.latency.,org.apache.kafka.consumer.node.request.latency. \ --interval 60000 
Usage details
Option Description ------ ----------- --alter Alter the configuration for the client  metrics resource. --bootstrap-server <String: server to REQUIRED: The Kafka server to connect  connect to> to. --command-config <String: command Property file containing configs to be  config property file> passed to Admin Client. --delete Delete the configuration for the  client metrics resource. --describe List configurations for the client  metrics resource. --generate-name Generate a UUID to use as the name. --help Print usage information. --interval <Integer: push interval> The metrics push interval in  milliseconds. --list List the client metrics resources. --match <String: k1=v1,k2=v2> Matching selector 'k1=v1,k2=v2'. The  following is a list of valid  selector names:  client_id  client_instance_id  client_software_name  client_software_version  client_source_address  client_source_port --metrics <String: m1,m2> Telemetry metric name prefixes 'm1,m2'. --name <String: name> Name of client metrics configuration  resource. --version Display Kafka version. 

kafka-verifiable-consumer.sh

The kafka-verifiable-consumer tool consumes messages from a topic and emits consumer events as JSON objects to STDOUT. For example, group rebalances, received messages, and offsets committed. Intended for internal testing.

Usage details
usage: verifiable-consumer [-h] --topic TOPIC --group-id GROUP_ID  [--group-instance-id GROUP_INSTANCE_ID]  [--max-messages MAX-MESSAGES] [--session-timeout TIMEOUT_MS]  [--verbose] [--enable-autocommit] [--send-offset-for-times-data]  [--reset-policy RESETPOLICY]  [--assignment-strategy ASSIGNMENTSTRATEGY]  [--consumer.config CONFIG_FILE]  (--bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]] |  --broker-list HOST1:PORT1[,HOST2:PORT2[...]]) This tool consumes messages from a specific topic and emits consumer events (e.g. group rebalances, received messages, and offsets committed) as JSON objects to STDOUT. optional arguments:  -h, --help show this help message and exit  --topic TOPIC Consumes messages from this topic.  --group-protocol GROUP_PROTOCOL  Group protocol (must be one of CLASSIC, CONSUMER) (default: classic)  --group-remote-assignor GROUP_REMOTE_ASSIGNOR  Group remote assignor; only used if the group protocol is CONSUMER  --group-id GROUP_ID The groupId shared among members of the consumer group  --group-instance-id GROUP_INSTANCE_ID  A unique identifier of the consumer instance  --max-messages MAX-MESSAGES  Consume this many messages. If -1 (the default), the consumer will  consume until the process is killed externally (default: -1)  --session-timeout TIMEOUT_MS  Set the consumer's session timeout (default: 30000)  --verbose Enable to log individual consumed records (default: false)  --enable-autocommit Enable offset auto-commit on consumer (default: false)  --reset-policy RESETPOLICY  Set reset policy (must be either 'earliest', 'latest', or 'none'  (default: earliest)  --assignment-strategy ASSIGNMENTSTRATEGY  Set assignment strategy (e.g. org.apache.kafka.clients.consumer.  RoundRobinAssignor) (default: org.apache.kafka.clients.consumer.  RangeAssignor)  --consumer.config CONFIG_FILE  Consumer config properties file (config options shared with command  line parameters will be overridden). Connection Group:  Group of arguments for connection to brokers  --bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]]  REQUIRED unless --broker-list(deprecated) is specified. The server(s)  to connect to. Comma-separated list of Kafka brokers in the form  HOST1:PORT1,HOST2:PORT2,...  --broker-list HOST1:PORT1[,HOST2:PORT2[...]]  DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-  server is specified. Comma-separated list of Kafka brokers in the  form HOST1:PORT1,HOST2:PORT2,... 

kafka-configs.sh

Use the kafka-configs tool to change and describe topic, client, user, broker, IP configuration setting or KRaft controller. To describe or view a KRaft controller, use the --bootstrap-controller option, and do not specify a bootstrap-server.

To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.

/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000 

When you use the --add-config flag to add multiple values, use square brackets around the comma-separated list like the following example:

/bin/kafka-configs.sh --bootstrap-server host1:9092 --alter --add-config max.connections.per.ip.overrides=[host1:50,host2:9] --entity-type brokers --entity-default 

The following example shows how you might check the cluster ID, by specifying the --bootstrap-controller option.

/bin/kafka-cluster.sh cluster-id --bootstrap-controller localhost:9092 

See Kafka Topic Operations for more examples of how to work with topics.

Usage details
This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip or client-metrics Option Description ------ ----------- --add-config <String> Key Value pairs of configs to add.  Square brackets can be used to group  values which contain commas: 'k1=v1,  k2=[v1,v2,v2],k3=v3'. The following  is a list of valid configurations:  For entity-type 'topics':  cleanup.policy  compression.gzip.level  compression.lz4.level  compression.type  compression.zstd.level  delete.retention.ms  file.delete.delay.ms  flush.messages  flush.ms  follower.replication.throttled.  replicas  index.interval.bytes  leader.replication.throttled.replicas  local.retention.bytes  local.retention.ms  max.compaction.lag.ms  max.message.bytes  message.timestamp.after.max.ms  message.timestamp.before.max.ms  message.timestamp.type  min.cleanable.dirty.ratio  min.compaction.lag.ms  min.insync.replicas  preallocate  remote.log.copy.disable  remote.log.delete.on.disable  remote.storage.enable  retention.bytes  retention.ms  segment.bytes  segment.index.bytes  segment.jitter.ms  segment.ms  unclean.leader.election.enable  For entity-type 'brokers':  background.threads  compression.gzip.level  compression.lz4.level  compression.type  compression.zstd.level  follower.replication.throttled.rate  leader.replication.throttled.rate  listener.security.protocol.map  listeners  log.cleaner.backoff.ms  log.cleaner.dedupe.buffer.size  log.cleaner.delete.retention.ms  log.cleaner.io.buffer.load.factor  log.cleaner.io.buffer.size  log.cleaner.io.max.bytes.per.second  log.cleaner.max.compaction.lag.ms  log.cleaner.min.cleanable.ratio  log.cleaner.min.compaction.lag.ms  log.cleaner.threads  log.cleanup.policy  log.flush.interval.messages  log.flush.interval.ms  log.index.interval.bytes  log.index.size.max.bytes  log.local.retention.bytes  log.local.retention.ms  log.message.timestamp.after.max.ms  log.message.timestamp.before.max.ms  log.message.timestamp.type  log.preallocate  log.retention.bytes  log.retention.ms  log.roll.jitter.ms  log.roll.ms  log.segment.bytes  log.segment.delete.delay.ms  max.connection.creation.rate  max.connections  max.connections.per.ip  max.connections.per.ip.overrides  message.max.bytes  metric.reporters  min.insync.replicas  num.io.threads  num.network.threads  num.recovery.threads.per.data.dir  num.replica.fetchers  principal.builder.class  producer.id.expiration.ms  remote.fetch.max.wait.ms  remote.list.offsets.request.timeout.ms  remote.log.index.file.cache.total.  size.bytes  remote.log.manager.copier.thread.pool.  size  remote.log.manager.copy.max.bytes.per.  second  remote.log.manager.expiration.thread.  pool.size  remote.log.manager.fetch.max.bytes.  per.second  remote.log.reader.threads  replica.alter.log.dirs.io.max.bytes.  per.second  sasl.enabled.mechanisms  sasl.jaas.config  sasl.kerberos.kinit.cmd  sasl.kerberos.min.time.before.relogin  sasl.kerberos.principal.to.local.rules  sasl.kerberos.service.name  sasl.kerberos.ticket.renew.jitter  sasl.kerberos.ticket.renew.window.  factor  sasl.login.refresh.buffer.seconds  sasl.login.refresh.min.period.seconds  sasl.login.refresh.window.factor  sasl.login.refresh.window.jitter  sasl.mechanism.inter.broker.protocol  ssl.cipher.suites  ssl.client.auth  ssl.enabled.protocols  ssl.endpoint.identification.algorithm  ssl.engine.factory.class  ssl.key.password  ssl.keymanager.algorithm  ssl.keystore.certificate.chain  ssl.keystore.key  ssl.keystore.location  ssl.keystore.password  ssl.keystore.type  ssl.protocol  ssl.provider  ssl.secure.random.implementation  ssl.trustmanager.algorithm  ssl.truststore.certificates  ssl.truststore.location  ssl.truststore.password  ssl.truststore.type  transaction.partition.verification.  enable  unclean.leader.election.enable  For entity-type 'users':  SCRAM-SHA-256  SCRAM-SHA-512  consumer_byte_rate  controller_mutation_rate  producer_byte_rate  request_percentage  For entity-type 'clients':  consumer_byte_rate  controller_mutation_rate  producer_byte_rate  request_percentage  For entity-type 'ips':  connection_creation_rate  For entity-type 'client-metrics':  interval.ms  match  metrics  For entity-type 'groups':  consumer.heartbeat.interval.ms  consumer.session.timeout.ms  share.auto.offset.reset  share.heartbeat.interval.ms  share.record.lock.duration.ms  share.session.timeout.ms  Entity types 'users' and 'clients' may  be specified together to update  config for clients of a specific  user. --add-config-file <String> Path to a properties file with configs  to add. See add-config for a list of  valid configurations. --all List all configs for the given topic,  broker, or broker-logger entity  (includes static configuration when  the entity type is brokers) --alter Alter the configuration for the entity. --bootstrap-controller <String: The Kafka controllers to connect to. controller to connect to> --bootstrap-server <String: server to The Kafka servers to connect to. connect to> --broker <String> The broker's ID. --broker-defaults The config defaults for all brokers. --broker-logger <String> The broker's ID for its logger config. --client <String> The client's ID. --client-defaults The config defaults for all clients. --client-metrics <String> The client metrics config resource  name. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used  only with --bootstrap-server option  for describing and altering broker  configs. --delete-config <String> config keys to remove 'k1,k2' --describe List configs for the given entity. --entity-default Default entity name for  clients/users/brokers/ips (applies  to corresponding entity type in  command line) --entity-name <String> Name of entity (topic name/client  id/user principal name/broker  id/ip/client metrics) --entity-type <String> Type of entity  (topics/clients/users/brokers/broker-  loggers/ips/client-metrics) --force Suppress console prompts --group <String> The group ID. --help Print usage information. --ip <String> The IP address. --ip-defaults The config defaults for all IPs. --topic <String> The topic's name. --user <String> The user's principal name. --user-defaults The config defaults for all users. --version Display Kafka version. 

kafka-verifiable-producer.sh

The kafka-verifiable-producer tool produces increasing integers to the specified topic and prints JSON metadata to STDOUT on each send request. This tool shows which messages have been acked and which have not. This tool is intended for internal testing.

Usage details
usage: verifiable-producer [-h] --topic TOPIC [--max-messages MAX-MESSAGES]  [--throughput THROUGHPUT] [--acks ACKS]  [--producer.config CONFIG_FILE] [--message-create-time CREATETIME]  [--value-prefix VALUE-PREFIX] [--repeating-keys REPEATING-KEYS]  (--bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]] |  --broker-list HOST1:PORT1[,HOST2:PORT2[...]]) This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each "send" request, making externally visible which messages have been acked and which have not. optional arguments:  -h, --help show this help message and exit  --topic TOPIC Produce messages to this topic.  --max-messages MAX-MESSAGES  Produce this many messages. If -1, produce messages until the process  is killed externally. (default: -1)  --throughput THROUGHPUT  If set >= 0, throttle maximum message throughput to *approximately*  THROUGHPUT messages/sec. (default: -1)  --acks ACKS Acks required on each produced message. See Kafka docs on acks for  details. (default: -1)  --producer.config CONFIG_FILE  Producer config properties file.  --message-create-time CREATETIME  Send messages with creation time starting at the arguments value, in  milliseconds since epoch (default: -1)  --value-prefix VALUE-PREFIX  If specified, each produced value will have this prefix with a dot  separator  --repeating-keys REPEATING-KEYS  If specified, each produced record will have a key starting at 0  increment by 1 up to the number specified (exclusive), then the key  is set to 0 again Connection Group:  Group of arguments for connection to brokers  --bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]]  REQUIRED: The server(s) to connect to. Comma-separated list of Kafka  brokers in the form HOST1:PORT1,HOST2:PORT2,...  --broker-list HOST1:PORT1[,HOST2:PORT2[...]]  DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-  server is specified. Comma-separated list of Kafka brokers in the  form HOST1:PORT1,HOST2:PORT2,... 

kafka-verifiable-share-consumer.sh

This tool creates a share group and consumes messages from a specific topic. It outputs share consumer events, such as share consumer startup, received messages, and offsets acknowledged, as JSON objects to STDOUT.

Usage details
usage: verifiable-share-consumer [-h] --topic TOPIC --group-id GROUP_ID  [--max-messages MAX-MESSAGES] [--verbose]  [--acknowledgement-mode ACKNOWLEDGEMENTMODE]  [--offset-reset-strategy OFFSETRESETSTRATEGY]  [--command-config CONFIG_FILE]  --bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]] Optional arguments:  -h, --help Shows this help message and exits.  --topic TOPIC Consumes messages from this topic.  --group-id GROUP_ID The group ID shared among members of  the share group.  --max-messages MAX-MESSAGES Consumes this many messages. If set to -1, which is  default, the share consumers consume  until the process is killed externally.  (Default: -1)  --verbose Enables logging of individual consumed  records. (Default: false)  --acknowledgement-mode Acknowledgement mode for the share  ACKNOWLEDGEMENTMODE consumers. The mode can be auto,  sync, or async. (Default: auto)  --offset-reset-strategy Sets the share group reset strategy.  OFFSETRESETSTRATEGY Must be earliest or latest.  (Default: latest)  --command-config CONFIG_FILE Specifies a configuration properties file.  Configuration options shared with  command line parameters are overridden by this file. Connection Group:  Group of arguments for connection to brokers  --bootstrap-server The servers to connect to. A comma-  HOST1:PORT1[,HOST2:PORT2[...]] separated list of Kafka brokers in the  form HOST1:PORT1,HOST2:PORT2,... 

kafka-console-consumer.sh

Use the kafka-console-consumer tool to consume records from a topic. Requires bootstrap-server parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to.

Confluent Tip

If you are using Confluent, you can use the Confluent CLI and the kafka topic command to produce and consume from a topic.

Example:

bin/kafka-console-consumer.sh --bootstrap-server HOST1:PORT1,HOST2:PORT2 --consumer.config config.properties --topic testTopic --property "print.key=true" 
Usage details
This tool helps to read data from Kafka topics and outputs it to standard output. Option Description ------ ----------- --bootstrap-server <String: server to REQUIRED: The servers to connect to.  connect to> --consumer-property <String: A mechanism to pass user-defined  consumer_prop> properties in the form key=value to  the consumer. --consumer.config <String: config file> Consumer config properties file. Note  that [consumer-property] takes  precedence over this config. --enable-systest-events Log lifecycle events of the consumer  in addition to logging consumed  messages. (This is specific for  system tests.) --formatter <String: class> The name of a class to use for  formatting kafka messages for  display. (default: kafka.tools.  DefaultMessageFormatter) --formatter-config <String: config Config properties file to initialize file> the message formatter. Note that  [property] takes precedence over  this config. --from-beginning If the consumer does not already have  an established offset to consume  from, start with the earliest  message present in the log rather  than the latest message. --group <String: consumer group id> The consumer group id of the consumer. --help Print usage information. --include <String: Java regex (String)> Regular expression specifying list of  topics to include for consumption. --isolation-level <String> Set to read_committed in order to  filter out transactional messages  which are not committed. Set to  read_uncommitted to read all  messages. (default: read_uncommitted) --key-deserializer <String:  deserializer for key> --max-messages <Integer: num_messages> The maximum number of messages to  consume before exiting. If not set,  consumption is continual. --offset <String: consume offset> The offset to consume from (a non-  negative number), or 'earliest'  which means from beginning, or  'latest' which means from end  (default: latest) --partition <Integer: partition> The partition to consume from.  Consumption starts from the end of  the partition unless '--offset' is  specified. --property <String: prop> The properties to initialize the  message formatter. Default  properties include:  print.timestamp=true|false  print.key=true|false  print.offset=true|false  print.partition=true|false  print.headers=true|false  print.value=true|false  key.separator=<key.separator>  line.separator=<line.separator>  headers.separator=<line.separator>  null.literal=<null.literal>  key.deserializer=<key.deserializer>  value.deserializer=<value.  deserializer>  header.deserializer=<header.  deserializer>  Users can also pass in customized  properties for their formatter; more  specifically, users can pass in  properties keyed with 'key.  deserializer.', 'value.  deserializer.' and 'headers.  deserializer.' prefixes to configure  their deserializers. --skip-message-on-error If there is an error when processing a  message, skip it instead of halt. --timeout-ms <Integer: timeout_ms> If specified, exit if no message is  available for consumption for the  specified interval. --topic <String: topic> The topic to consume on. --value-deserializer <String:  deserializer for values> --version Display Kafka version. 

kafka-console-producer.sh

Use the kafka-console-producer tool to produce records to a topic. Requires a bootstrap-server parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to. Example:

kafka-console-producer.sh --bootstrap-server HOST1:PORT1,HOST2:PORT2 --producer.config config.properties --topic testTopic --property "parse.key=true" --property "key.separator=:" 

Confluent Tip

If you are using Confluent, you can use the Confluent CLI and the kafka topic command to produce and consume from a topic.

Usage details
This tool helps to read data from standard input and publish it to Kafka. Option Description ------ ----------- --batch-size <Integer: size> Number of messages to send in a single  batch if they are not being sent  synchronously. please note that this  option will be replaced if max-  partition-memory-bytes is also set  (default: 16384) --bootstrap-server <String: server to REQUIRED unless --broker-list  connect to> (deprecated) is specified. The server  (s) to connect to. The broker list  string in the form HOST1:PORT1,HOST2:  PORT2. --broker-list <String: broker-list> DEPRECATED, use --bootstrap-server  instead; ignored if --bootstrap-  server is specified. The broker  list string in the form HOST1:PORT1,  HOST2:PORT2. --compression-codec [String: The compression codec: either 'none',  compression-codec] 'gzip', 'snappy', 'lz4', or 'zstd'.  If specified without value, then it  defaults to 'gzip' --help Print usage information. --line-reader <String: reader_class> The class name of the class to use for  reading lines from standard in. By  default each line is read as a  separate message. (default: kafka.  tools.  ConsoleProducer$LineMessageReader) --max-block-ms <Long: max block on The max time that the producer will  send> block for during a send request.  (default: 60000) --max-memory-bytes <Long: total memory The total memory used by the producer  in bytes> to buffer records waiting to be sent  to the server. This is the option to  control `buffer.memory` in producer  configs. (default: 33554432) --max-partition-memory-bytes <Integer: The buffer size allocated for a  memory in bytes per partition> partition. When records are received  which are smaller than this size the  producer will attempt to  optimistically group them together  until this size is reached. This is  the option to control `batch.size`  in producer configs. (default: 16384) --message-send-max-retries <Integer> Brokers can fail receiving the message  for multiple reasons, and being  unavailable transiently is just one  of them. This property specifies the  number of retries before the  producer give up and drop this  message. This is the option to  control `retries` in producer  configs. (default: 3) --metadata-expiry-ms <Long: metadata The period of time in milliseconds  expiration interval> after which we force a refresh of  metadata even if we haven't seen any  leadership changes. This is the  option to control `metadata.max.age.  ms` in producer configs. (default:  300000) --producer-property <String: A mechanism to pass user-defined  producer_prop> properties in the form key=value to  the producer. --producer.config <String: config file> Producer config properties file. Note  that [producer-property] takes  precedence over this config. --property <String: prop> A mechanism to pass user-defined  properties in the form key=value to  the message reader. This allows  custom configuration for a user-  defined message reader.  Default properties include:  parse.key=false  parse.headers=false  ignore.error=false  key.separator=\t  headers.delimiter=\t  headers.separator=,  headers.key.separator=:  null.marker= When set, any fields  (key, value and headers) equal to  this will be replaced by null  Default parsing pattern when:  parse.headers=true and parse.key=true:  "h1:v1,h2:v2...\tkey\tvalue"  parse.key=true:  "key\tvalue"  parse.headers=true:  "h1:v1,h2:v2...\tvalue" --reader-config <String: config file> Config properties file for the message  reader. Note that [property] takes  precedence over this config. --request-required-acks <String: The required `acks` of the producer  request required acks> requests (default: -1) --request-timeout-ms <Integer: request The ack timeout of the producer  timeout ms> requests. Value must be non-negative  and non-zero. (default: 1500) --retry-backoff-ms <Long> Before each retry, the producer  refreshes the metadata of relevant  topics. Since leader election takes  a bit of time, this property  specifies the amount of time that  the producer waits before refreshing  the metadata. This is the option to  control `retry.backoff.ms` in  producer configs. (default: 100) --socket-buffer-size <Integer: size> The size of the tcp RECV size. This is  the option to control `send.buffer.  bytes` in producer configs.  (default: 102400) --sync If set message send requests to the  brokers are synchronously, one at a  time as they arrive. --timeout <Long: timeout_ms> If set and the producer is running in  asynchronous mode, this gives the  maximum amount of time a message  will queue awaiting sufficient batch  size. The value is given in ms. This  is the option to control `linger.ms`  in producer configs. (default: 1000) --topic <String: topic> REQUIRED: The topic id to produce  messages to. --version Display Kafka version. 

kafka-console-share-consumer.sh

Use the kafka-console-share-consumer tool to consume messages from a Kafka topic using a share group. It allows multiple consumer instances to jointly consume messages from a topic in a queue-like fashion, where each message is processed by only one consumer within the share group. This differs from traditional Kafka consumer groups, which assign exclusive partitions to consumers.

Example:

kafka-console-share-consumer.sh --bootstrap-server localhost:9092 --topic my-share-topic --group my-shared-group 
Usage details
This tool helps to read data from Kafka topics using share groups and outputs it to standard output. Option Description ------ ----------- --bootstrap-server <String: server to REQUIRED: The servers to connect to.  connect to> --consumer-config <String: config file> Consumer config properties file. Note  that [consumer-property] takes  precedence over this config. --consumer-property <String: A mechanism to pass user-defined  consumer_prop> properties in the form key=value to  the consumer. --enable-systest-events Log lifecycle events of the share  consumer in addition to logging  consumed messages. This is specific  to system tests. --formatter <String: class> The name of a class to use for  formatting Kafka messages for  display. (default: org.apache.kafka.  tools.consumer.  DefaultMessageFormatter) --formatter-config <String: config Config properties file to initialize  file> the message formatter. Note that  [property] takes precedence over  this config. --group <String: share group id> The share group id of the consumer. --help Print usage information. --key-deserializer <String: The name of the class to use for  deserializer for key> deserializing keys. --max-messages <Integer: num_messages> The maximum number of messages to  consume before exiting. If not set,  consumption is continual. --property <String: prop> The properties to initialize the  message formatter. Default  properties include:  print.timestamp=true|false  print.key=true|false  print.offset=true|false  print.delivery=true|false  print.epoch=true|false  print.partition=true|false  print.headers=true|false  print.value=true|false  key.separator=<key.separator>  line.separator=<line.separator>  headers.separator=<line.separator>  null.literal=<null.literal>  key.deserializer=<key.deserializer>  value.deserializer=<value.  deserializer>  header.deserializer=<header.  deserializer>  You can also pass in customized  properties for their formatter; more  specifically, you can pass in  properties keyed with 'key.  deserializer.', 'value.  deserializer.' and 'headers.  deserializer.' prefixes to configure  their deserializers. --reject If specified, messages are rejected as  they are consumed. --reject-message-on-error If there is an error when processing a  message, reject it instead of  halting. --release If specified, messages are released as  they are consumed. --timeout-ms <Integer: timeout_ms> If specified, exit if no message is  available for consumption for the  specified interval. --topic <String: topic> The topic to consume from. --value-deserializer <String: The name of the class to use for  deserializer for values> deserializing values. --version Display Kafka version. 

kafka-producer-perf-test.sh

The kafka-producer-perf-test tool enables you to produce a large quantity of data to test producer performance for the Kafka cluster.

Example:

bin/kafka-producer-perf-test.sh --topic topic-a --num-records 200000 --record-size 1000 --throughput 10000000 --producer-props bootstrap.servers=host1:9092 
Usage details
usage: producer-perf-test [-h] --topic TOPIC --num-records NUM-RECORDS  [--payload-delimiter PAYLOAD-DELIMITER] --throughput THROUGHPUT  [--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]]  [--producer.config CONFIG-FILE] [--print-metrics]  [--transactional-id TRANSACTIONAL-ID]  [--transaction-duration-ms TRANSACTION-DURATION]  (--record-size RECORD-SIZE | --payload-file PAYLOAD-FILE) | --payload-monotonic) This tool is used to verify the producer performance. To enable transactions, you can specify a transaction id or set a transaction duration using --transaction-duration-ms. There are three ways to specify the transaction id: set transaction.id=<id> via --producer-props, set transaction.id=<id> in the config file via --producer.config, or use --transaction-id <id>. optional arguments:  -h, --help show this help message and exit  --topic TOPIC produce messages to this topic  --num-records NUM-RECORDS  number of messages to produce  --payload-delimiter PAYLOAD-DELIMITER  provides delimiter to be used when --payload-file is provided.  Defaults to new line. Note that this parameter will be ignored if --  payload-file is not provided. (default: \n)  --throughput THROUGHPUT  throttle maximum message throughput to *approximately* THROUGHPUT  messages/sec. Set this to -1 to disable throttling.  --producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]  kafka producer related configuration properties like bootstrap.  servers, client.id etc. These configs take precedence over those  passed via --producer.config.  --producer.config CONFIG-FILE  producer config properties file.  --print-metrics print out metrics at the end of the test. (default: false)  --transactional-id TRANSACTIONAL-ID  The transactionalId to use if transaction-duration-ms is > 0. Useful  when testing the performance of concurrent transactions. (default:  performance-producer-default-transactional-id)  --transaction-duration-ms TRANSACTION-DURATION  The max age of each transaction. The commitTransaction will be called  after this time has elapsed. The value should be greater than 0. If the  transactional id is specified via --producer-props, --producer.config,  or --transactional-id but --transaction-duration-ms is not specified,  the default value will be 3000.  either --record-size or --payload-file must be specified but not both.  --record-size RECORD-SIZE  message size in bytes. Note that you must provide exactly one of --  record-size or --payload-file.  --payload-file PAYLOAD-FILE  file to read the message payloads from. This works only for UTF-8  encoded text files. Payloads will be read from this file and a  payload will be randomly selected when sending messages. Note that  you must provide exactly one of --record-size or --payload-file.  --payload-monotonic  payload is monotonically increasing integer. Note that you must provide exactly one  of --record-size or --payload-file or --payload-monotonic. (default: false) 

kafka-groups.sh

This tool helps to list groups of all types.

Usage details
Option Description ------ ----------- --bootstrap-server <String: The Kafka server to connect  server to REQUIRED:  connect to> --command-config <String: command Property file containing configs to be  config property file> passed to the admin client. --consumer Filter the groups to show all kinds of  consumer groups, including classic  and simple consumer groups. This  matches group type 'consumer', and  group type 'classic' where the  protocol type is 'consumer' or empty. --group-type <String: type> Filter the groups based on group type.  Valid types are: 'classic',  'consumer' and 'share'. --help Print usage information. --list List the groups. --protocol <String: protocol> Filter the groups based on protocol  type. --share Filter the groups to show share groups. --version Display Kafka version. 

kafka-consumer-groups.sh

Use the kafka-consumer-groups tool to get a list of the active groups in the cluster.

For example, to show the position of all consumers in a group named user-group, you might use the following command.

bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --describe --group user-group 

This would result in output like the following (CONSUMER-ID entries truncated for readability).

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID user 0 2 4 2 consumer-1-... /127.0.0.1 consumer-1 user 1 2 3 1 consumer-1-... /127.0.0.1 consumer-1 user 2 2 3 1 consumer-2-... /127.0.0.1 consumer-2 

For more examples, see View Consumer Group Info in Kafka.

Usage details
Option Description ------ ----------- --all-groups Apply to all consumer groups. --all-topics Consider all topics assigned to a  group in the `reset-offsets` process. --bootstrap-server <String: server to REQUIRED: The servers to connect to. Required for all options except for --validate-regex.  connect to> --by-duration <String: duration> Reset offsets to offset by duration  from current timestamp. Format:  'PnDTnHnMnS' --command-config <String: command Property file containing configs to be  config property file> passed to Admin Client and Consumer. --delete Pass in groups to delete topic  partition offsets and ownership  information over the entire consumer  group. For instance --group g1 --  group g2 --delete-offsets Delete offsets of consumer group.  Supports one consumer group at the  time, and multiple topics. --describe Describe consumer group and list  offset lag (number of messages not  yet processed) related to given  group. --dry-run Only show results without executing  changes on Consumer Groups.  Supported operations: reset-offsets. --execute Execute operation. Supported  operations: reset-offsets. --export Export operation execution to a CSV  file. Supported operations: reset-  offsets. --from-file <String: path to CSV file> Reset offsets to values defined in CSV  file. --group <String: consumer group> The consumer group we wish to act on. --help Print usage information. --list List all consumer groups. --members Describe members of the group. This  option may be used with '--describe'  and '--bootstrap-server' options  only.  Example: --bootstrap-server localhost:  9092 --describe --group group1 --  members --offsets Describe the group and list all topic  partitions in the group along with  their offset lag. This is the  default sub-action of and may be  used with '--describe' and '--  bootstrap-server' options only.  Example: --bootstrap-server localhost:  9092 --describe --group group1 --  offsets --reset-offsets Reset offsets of consumer group.  Supports one consumer group at the  time, and instances should be  inactive  Has 2 execution options: --dry-run  (the default) to plan which offsets  to reset, and --execute to update  the offsets. Additionally, the --  export option is used to export the  results to a CSV format.  You must choose one of the following  reset specifications: --to-datetime,  --by-period, --to-earliest, --to-  latest, --shift-by, --from-file, --  to-current.  To define the scope use --all-topics  or --topic. One scope must be  specified unless you use '--from-  file'. --shift-by <Long: number-of-offsets> Reset offsets shifting current offset  by 'n', where 'n' can be positive or  negative. --state [String] When specified with '--describe',  includes the state of the group.  Example: --bootstrap-server localhost:  9092 --describe --group group1 --  state  When specified with '--list', it  displays the state of all groups. It  can also be used to list groups with  specific states.  Example: --bootstrap-server localhost:  9092 --list --state stable,empty  This option may be used with '--  describe', '--list' and '--bootstrap-  server' options only. --timeout <Long: timeout (ms)> The timeout that can be set for some  use cases. For example, it can be  used when describing the group to  specify the maximum amount of time  in milliseconds to wait before the  group stabilizes (when the group is  just created, or is going through  some changes). (default: 5000) --to-current Reset offsets to current offset. --to-datetime <String: datetime> Reset offsets to offset from datetime.  Format: 'YYYY-MM-DDTHH:mm:SS.sss' --to-earliest Reset offsets to earliest offset. --to-latest Reset offsets to latest offset. --to-offset <Long: offset> Reset offsets to a specific offset. --topic <String: topic> The topic whose consumer group  information should be deleted or  topic whose should be included in  the reset offset process. In `reset-  offsets` case, partitions can be  specified using this format: `topic1:  0,1,2`, where 0,1,2 are the  partition to be included in the  process. Reset-offsets also supports  multiple topic inputs. --type [String] When specified with '--list', it  displays the types of all the  groups. It can also be used to list  groups with specific types.  Example: --bootstrap-server localhost:  9092 --list --type classic, consumer  This option may be used with the '--list'  option only. --validate-regex <String: regex> Validate that the syntax of the  provided regular expression is valid  according to the RE2 format. --verbose Provide additional information, if  any, when describing the group. This  option may be used with '--  offsets'/'--members'/'--state' and  '--bootstrap-server' options only.  Example: --bootstrap-server localhost:  9092 --describe --group group1 --  members --verbose --version Display Kafka version. 

kafka-consumer-perf-test.sh

This tool tests the consumer performance for the Kafka cluster.

Usage details
Option Description ------ ----------- --bootstrap-server <String: server to REQUIRED unless --broker-list  connect to> (deprecated) is specified. The server  (s) to connect to. --broker-list <String: broker-list> DEPRECATED, use --bootstrap-server  instead; ignored if --bootstrap-  server is specified. The broker  list string in the form HOST1:PORT1,  HOST2:PORT2. --consumer.config <String: config file> Consumer config properties file. --date-format <String: date format> The date format to use for formatting  the time field. See java.text.  SimpleDateFormat for options.  (default: yyyy-MM-dd HH:mm:ss:SSS) --fetch-size <Integer: size> The amount of data to fetch in a  single request. (default: 1048576) --from-latest If the consumer does not already have  an established offset to consume  from, start with the latest message  present in the log rather than the  earliest message. --group <String: gid> The group id to consume on. (default:  perf-consumer-50334) --help Print usage information. --hide-header If set, skips printing the header for  the stats --messages <Long: count> REQUIRED: The number of messages to  send or consume --num-fetch-threads <Integer: count> DEPRECATED AND IGNORED: Number of  fetcher threads. (default: 1) --print-metrics Print out the metrics. --reporting-interval <Integer: Interval in milliseconds at which to  interval_ms> print progress info. (default: 5000) --show-detailed-stats If set, stats are reported for each  reporting interval as configured by  reporting-interval --socket-buffer-size <Integer: size> The size of the tcp RECV size.  (default: 2097152) --threads <Integer: count> DEPRECATED AND IGNORED: Number of  processing threads. (default: 10) --timeout [Long: milliseconds] The maximum allowed time in  milliseconds between returned  records. (default: 10000) --topic <String: topic> REQUIRED: The topic to consume from. --version Display Kafka version. 

kafka-share-consumer-perf-test.sh

Use this tool to verify share consumer performance.

Usage details
Option Description ------ ----------- --bootstrap-server (REQUIRED) The servers to connect to.  <String: server to connect to> --consumer.config <String: config file> Share consumer configuration properties file. --date-format <String: date format> The date format to use for formatting  the time field. See :java:`java.text.  SimpleDateFormat` for options.  (Default: yyyy-MM-dd HH:mm:ss:SSS) --fetch-size <Integer: size> The amount of data to fetch in a  single request. (Default: 1048576) --group <String: gid> The group ID to consume on. (Default:  perf-share-consumer) --help Prints usage information. --hide-header If set, skips printing the header for  the stats. --messages <Long: count> (REQUIRED) The number of messages to  send or consume. --print-metrics Prints the metrics. --reporting-interval <Long: Interval in milliseconds at which to  interval_ms> print progress information. (Default: 5000) --show-consumer-stats If set, reports stats for each share  consumer depending on the number of  threads. --show-detailed-stats If set, reports stats for each  reporting interval as configured by  reporting-interval. --socket-buffer-size <Integer: size> The size of the TCP RECV size.  (Default: 2097152) --threads <Integer: count> The number of share consumers to use  for sharing the load. (Default: 1) --timeout [Long: milliseconds] The maximum allowed time in  milliseconds between returned records.  (Default: 10000) --topic <String: topic> (REQUIRED) The topic to consume from. --version Displays Kafka version. 

kafka-share-groups.sh

This tool helps to list, describe, reset and delete share groups.

Usage details
Option Description ------ ----------- --all-topics Consider all topics assigned to a  share group in the 'reset-offsets'  process. --bootstrap-server <String: The servers to connect to.  server to REQUIRED:  connect to> --command-config <String: command Property file containing configs to be  config property file> passed to the admin client. --delete Delete share group. --delete-offsets Delete offsets of share group.  Supports one share group at the  time, and multiple topics. --describe Describe share group, members and  offset information. --dry-run Only show results without executing  changes on share groups. Supported  operations: reset-offsets. --execute Execute operation. Supported  operations: reset-offsets. --group <String: share group> The share group we wish to act on. --help Print usage information. --list List all share groups. --members Describe members of the group. This  option may be used with the '--  describe' option only. --offsets Describe the group and list all topic  partitions in the group along with  their offset information. This is  the default sub-action and may be  used with the '--describe' option  only. --reset-offsets Reset offsets of share group. Supports  one share group at the time, and  instances must be inactive.  Has 2 execution options: --dry-run  (the default) to plan which offsets  to reset, and --execute to reset the  offsets.  You must choose one of the following  reset specifications: --to-datetime,  --to-earliest, --to-latest.  To define the scope use --all-topics  or --topic. --state [String] When specified with '--describe',  includes the state of the group.  When specified with '--list', it  displays the state of all groups. It  can also be used to list groups with  specific states. Valid values are  Empty, Stable and Dead. --timeout <Long: timeout (ms)> The timeout that can be set for some  use cases. For example, it can be  used when describing the group to  specify the maximum amount of time  in milliseconds to wait before the  group stabilizes. (default: 5000) --to-datetime <String: datetime> Reset offsets to offset from datetime.  Format: 'YYYY-MM-DDTHH:mm:SS.sss' --to-earliest Reset offsets to earliest offset. --to-latest Reset offsets to latest offset. --topic <String: topic> The topic whose share group  information should be deleted or  topic whose should be included in  the reset offset process. When  resetting offsets, partitions can be  specified using this format: 'topic1:  0,1,2', where 0,1,2 are the  partitions to be included. --version Display Kafka version. 

Manage Kafka Connect

connect-distributed.sh

Use the connect-distributed tool to run Connect workers in Distributed mode, meaning on multiple, distributed, machines. Distributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data.

Usage details
USAGE: ./connect-distributed.sh [-daemon] connect-distributed.properties 

connect-standalone.sh

Use the connect-standalone tool to run Kafka Connect in standalone mode meaning all work is performed in a single process. This is good for getting started but lacks fault tolerance. For more information, see Kafka Connect.

Usage details
USAGE: ./connect-standalone.sh [-daemon] connect-standalone.properties 

connect-mirror-maker.sh

Use the connect-mirror-maker tool to replicate topics from one cluster to another using the Connect framework. You must pass an an mm2.properties MM2 configuration file. For more information, see KIP-382: MirrorMaker 2.0 or Getting up to speed with MirrorMaker 2.

Usage details
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.properties MirrorMaker 2.0 driver positional arguments:  mm2.properties MM2 configuration file. optional arguments:  -h, --help show this help message and exit  --clusters CLUSTER [CLUSTER ...]  Target cluster to use for this node. 

Manage Kafka Streams

kafka-streams-application-reset.sh

For Kafka Streams applications, the kafka-streams-application-reset tool resets the application and forces it to reprocess its data from the beginning. Useful for debugging and testing.

For example, the following command would reset the my-streams-app application:

kafka-streams-application-reset.sh --application-id my-streams-app \ --input-topics my-input-topic \ --intermediate-topics rekeyed-topic 

For more information, see Kafka Streams Application Reset Tool in the Confluent documentation.

Usage details
This tool helps to quickly reset an application in order to reprocess its data from scratch. * This tool resets offsets of input topics to the earliest available offset (by default), or to a specific  defined position. * This tool deletes the internal topics that were created by Kafka Streams (topics starting with "<application.id>-").  The tool finds these internal topics automatically. If the topics flagged automatically for deletion by  the dry-run are unsuitable, you can specify a subset with the "--internal-topics" option. * This tool will not delete output topics (if you want to delete them, you need to do it yourself with the bin/kafka-topics.sh command). * This tool will not clean up the local state on the stream application instances (the persisted stores  used to cache aggregation results). You need to call KafkaStreams#cleanUp() in your application  or manually delete them from the directory specified by "state.dir"  configuration (${java.io.tmpdir}/kafka-streams/<application.id> by default). * When long session timeout has been configured, active members could take longer to get expired on the  broker thus blocking the reset job to complete. Use the "--force" option could remove those  left-over members immediately. Make sure to stop all stream applications when this option  is specified to avoid unexpected disruptions. *** Important! You will get wrong output if you don't clean up the local stores after running the reset tool! *** Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this first with "--dry-run" to preview your changes before making them. Option (* = required) Description --------------------- ----------- * --application-id <String: id> The Kafka Streams application ID  (application.id). --bootstrap-servers <String: urls> Comma-separated list of broker urls with  format: HOST1:PORT1,HOST2:PORT2  (default: localhost:9092) --by-duration <String> Reset offsets to offset by duration from  current timestamp. Format: 'PnDTnHnMnS' --config-file <String: file name> Property file containing configs to be  passed to admin clients and embedded  consumer. --dry-run Display the actions that would be  performed without executing the reset  commands. --force Force the removal of members of the  consumer group (intended to remove  stopped members if a long session  timeout was used). Make sure to shut  down all stream applications when this  option is specified to avoid unexpected  rebalances. --from-file <String> Reset offsets to values defined in CSV  file. --help Print usage information. --input-topics <String: list> Comma-separated list of user input  topics. For these topics, the tool by  default will reset the offset to the  earliest available offset. Reset to  other offset position by appending  other reset offset option, ex: --input-  topics foo --shift-by 5 --intermediate-topics <String: list> Comma-separated list of intermediate user  topics (topics that are input and  output topics, e.g., used in the  deprecated through() method). For these  topics, the tool will skip to the end. --internal-topics <String: list> Comma-separated list of internal topics  to delete. Must be a subset of the  internal topics marked for deletion by  the default behaviour (do a dry-run  without this option to view these  topics). --shift-by <Long: number-of-offsets> Reset offsets shifting current offset by  'n', where 'n' can be positive or  negative --to-datetime <String> Reset offsets to offset from datetime.  Format: 'YYYY-MM-DDTHH:mm:SS.sss' --to-earliest Reset offsets to earliest offset. --to-latest Reset offsets to latest offset. --to-offset <Long> Reset offsets to a specific offset. --version Print version information and exit. 

kafka-streams-groups.sh

Use this tool to list or describe streams groups.

Usage details
Option Description ------ ----------- --all-groups Applies to all streams groups. --all-input-topics Considers all input topics assigned to  a group in the reset-offsets and  delete-offsets process. Only input  topics are supported. --bootstrap-server (REQUIRED) The servers to connect to.  <String: server to connect to> --command-config <String: command Property file that contains  config property file> configurations to pass to the admin  client. --delete Deletes topic partition offsets and  ownership information for the entire  streams group. For example, --group  g1 --group g2. --delete-offsets Deletes offsets of a streams group.  Supports one streams group at a time  and multiple topics. --describe Describes the streams group and lists  offset lag related to the specified  group. --group <String: streams group> The streams group to act on. --help Prints usage information. --input-topic <String: topic> The topic whose streams group  information to delete, or the topic to  include in the reset offset process.  For reset-offsets, you can specify  partitions using this format: topic1:  0,1,2, where 0,1,2 are the partitions  to include in the process.  Reset-offsets also supports multiple topic  inputs. All types of topics are  supported. --list Lists all streams groups. --members Describes members of the group. Use  this option with the --describe  option only. --offsets Describes the group and lists all  topic partitions in the group along  with their offset information. This is  the default sub-action. Use with the  --describe option only. --state [String] When specified with --list, displays  the state of all groups. Can also be  used to list groups with specific  states. The valid values are Empty, NotReady,  Stable, Assigning, Reconciling, and  Dead. --timeout <Long: timeout (ms)> The timeout for some use cases. For  example, when describing the group,  specifies the maximum amount of time  in milliseconds to wait before the  group stabilizes. (Default: 5000) --verbose Use with --describe --state to show  group epoch and target assignment  epoch.  Use with --describe --members to  show each member the member's epoch,  target assignment epoch, current  assignment, target assignment, and  whether the member is still using the  classic rebalance protocol.  Use with --describe --offsets and  --describe to show leader epochs for  each partition. --version Displays Kafka version. 

Manage security

kafka-acls

Use the kafka-acls tool to add, remove and list ACLs. For example, if you wanted to add two principal users, Jose and Jane to have read and write permissions on the user topic from specific IP addresses, you could use a command like the following:

bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Jose --allow-principal User:Jane --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic user 
Usage details
This tool helps to manage acls on kafka. Option Description ------ ----------- --add Indicates you are trying to add ACLs. --allow-host <String: allow-host> Host from which principals listed in --  allow-principal will have access. If  you have specified --allow-principal  then the default for this option  will be set to '*' which allows  access from all hosts. --allow-principal <String: allow- principal is in principalType:name  principal> format. Note that principalType must  be supported by the Authorizer being  used. For example, User:'*' is the  wild card indicating all users. --bootstrap-controller <String: A list of host/port pairs to use for  controller to connect to> establishing the connection to the  Kafka cluster. This list should be  in the form host1:port1,host2:  port2,... This config is required  for acl management using admin  client API. --bootstrap-server <String: server to A list of host/port pairs to use for  connect to> establishing the connection to the  Kafka cluster. This list should be  in the form host1:port1,host2:  port2,... This config is required  for acl management using admin  client API. --cluster Add/Remove cluster ACLs. --command-config [String: command- A property file containing configs to  config] be passed to Admin Client. --consumer Convenience option to add/remove ACLs  for consumer role. This will  generate ACLs that allows READ,  DESCRIBE on topic and READ on group. --delegation-token <String: delegation- Delegation token to which ACLs should  token> be added or removed. A value of '*'  indicates ACL should apply to all  tokens. --deny-host <String: deny-host> Host from which principals listed in --  deny-principal will be denied  access. If you have specified --deny-  principal then the default for this  option will be set to '*' which  denies access from all hosts. --deny-principal <String: deny- principal is in principalType:name  principal> format. By default anyone not added  through --allow-principal is denied  access. You only need to use this  option as negation to already  allowed set. Note that principalType  must be supported by the Authorizer  being used. For example if you  wanted to allow access to all users  in the system but not test-user you  can define an ACL that allows access  to User:'*' and specify --deny-  principal=User:test@EXAMPLE.COM. AND  PLEASE REMEMBER DENY RULES TAKES  PRECEDENCE OVER ALLOW RULES. --force Assume Yes to all queries and do not  prompt. --group <String: group> Consumer Group to which the ACLs  should be added or removed. A value  of '*' indicates the ACLs should  apply to all groups. --help Print usage information. --idempotent Enable idempotence for the producer.  This should be used in combination  with the --producer option. Note  that idempotence is enabled  automatically if the producer is  authorized to a particular  transactional-id. --list List ACLs for the specified resource,  use --topic <topic> or --group  <group> or --cluster to specify a  resource. --operation <String> Operation that is being allowed or  denied. Valid operation names are:  Describe  DescribeConfigs  Alter  IdempotentWrite  Read  Delete  Create  ClusterAction  All  CreateTokens  DescribeTokens  Write  AlterConfigs  (default: All) --principal [String: principal] List ACLs for the specified principal.  principal is in principalType:name  format. Note that principalType must  be supported by the Authorizer being  used. Multiple --principal option  can be passed. --producer Convenience option to add/remove ACLs  for producer role. This will  generate ACLs that allows WRITE,  DESCRIBE and CREATE on topic. --remove Indicates you are trying to remove  ACLs. --resource-pattern-type The type of the resource pattern or  <ANY|MATCH|LITERAL|PREFIXED> pattern filter. When adding acls,  this should be a specific pattern  type, e.g. 'literal' or 'prefixed'.  When listing or removing acls, a  specific pattern type can be used to  list or remove acls from specific  resource patterns, or use the filter  values of 'any' or 'match', where  'any' will match any pattern type,  but will match the resource name  exactly, where as 'match' will  perform pattern matching to list or  remove all acls that affect the  supplied resource(s). WARNING:  'match', when used in combination  with the '--remove' switch, should  be used with care. (default: LITERAL) --topic <String: topic> topic to which ACLs should be added or  removed. A value of '*' indicates  ACL should apply to all topics. --transactional-id <String: The transactionalId to which ACLs  transactional-id> should be added or removed. A value  of '*' indicates the ACLs should  apply to all transactionalIds. --user-principal <String: user- Specifies a user principal as a  principal> resource in relation with the  operation. For instance one could  grant CreateTokens or DescribeTokens  permission on a given user principal. --version Display Kafka version. 

kafka-delegation-tokens.sh

Use the kafka-delegation-tokens tool to create, renew, expire and describe delegation tokens. Delegation tokens are shared secrets between Kafka brokers and clients, and are a lightweight authentication mechanism meant to complement existing SASL/SSL methods. For more information, see Authentication using Delegation Tokens in the Confluent Documentation.

Usage details
This tool helps to create, renew, expire, or describe delegation tokens. Option Description ------ ----------- --bootstrap-server <String> REQUIRED: server(s) to use for bootstrapping. --command-config <String> REQUIRED: A property file containing configs to  be passed to Admin Client. Token management  operations are allowed in secure mode only.  This config file is used to pass security  related configs. --create Create a new delegation token. Use --renewer-  principal option to pass renewers principals. --describe Describe delegation tokens for the given  principals. Use --owner-principal to pass  owner/renewer principals. If --owner-principal  option is not supplied, all the user owned  tokens and tokens where user have Describe  permission will be returned. --expire Expire delegation token. Use --expiry-time-  period option to expire the token. --expiry-time-period [Long] Expiry time period in milliseconds. If the value  is -1, then the token will get invalidated  immediately. --help Print usage information. --hmac [String] HMAC of the delegation token --max-life-time-period [Long] Max life period for the token in milliseconds.  If the value is -1, then token max life time  will default to a server side config value  (delegation.token.max.lifetime.ms). --owner-principal [String] owner is a kafka principal. It is should be in  principalType:name format. --renew Renew delegation token. Use --renew-time-period  option to set renew time period. --renew-time-period [Long] Renew time period in milliseconds. If the value  is -1, then the renew time period will default  to a server side config value (delegation.  token.expiry.time.ms). --renewer-principal [String] renewer is a kafka principal. It is should be in  principalType:name format. --version Display Kafka version. 

Test and troubleshoot

This section contains tools you can use for testing and troubleshooting your applications.

kafka-e2e-latency.sh

The kafka-e2e-latency tool is a performance testing tool used to measure end-to-end latency in Kafka. It works by sending messages to a Kafka topic and then consuming those messages from a Kafka consumer. The tool calculates the time difference between when a message was produced and when it was consumed, giving you an idea of the end-to-end latency for your Kafka cluster. This tool is useful for testing the performance of your Kafka cluster and identifying any bottlenecks or issues that may be affecting latency.

To run the tool, you provide details such as the message size, number of messages and acks setting for the producer. For more about end-to-end latency, see Configure Kafka to Minimize Latency.

Following are the required arguments
  • broker_list: The location of the bootstrap broker for both the producer and the consumer

  • topic: The topic name used by both the producer and the consumer to send/receive messages

  • num_messages: The number of messages to send

  • producer_acks: The producer setting for acks.

  • message_size_bytes: size of each message in bytes

For example:

kafka-e2e-latency.sh localhost:9092 test 10000 1 20 
Usage details
USAGE: java org.apache.kafka.tools.EndToEndLatency broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file 

kafka-dump-log.sh

The kafka-dump-log tool can be used in KRaft mode to parse a metadata log file and output its contents to the console. Requires a comma-separated list of log files. The tool will scan the provided files and decode the metadata records.

The following example shows using the cluster-metadata-decoder argument to decode the metadata records in a log segment.

bin/kafka-dump-log.sh --cluster-metadata-decoder --files tmp/kraft-combined-logs/_cluster_metadata-0/00000000000000023946.log 
Usage details
This tool helps to parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment. Option Description ------ ----------- --cluster-metadata-decoder if set, log data will be parsed as cluster metadata records. --deep-iteration if set, uses deep instead of shallow iteration. Automatically set if print- data-log is enabled. --files <String: file1, file2, ...> REQUIRED: The comma separated list of data and index log files to be dumped. --help Print usage information. --index-sanity-check if set, just checks the index sanity without printing its content. This is the same check that is executed on broker startup to determine if an index needs rebuilding or not. --key-decoder-class [String] if set, used to deserialize the keys. This class should implement kafka.serializer. Decoder trait. Custom jar should be available in kafka/libs directory. (default: kafka.serializer.StringDecoder) --max-bytes <Integer: size> Limit the amount of total batches read in bytes avoiding reading the whole .log file(s). (default: 2147483647) --max-message-size <Integer: size> Size of largest message. (default: 5242880) --offsets-decoder if set, log data will be parsed as offset data from the __consumer_offsets topic. --print-data-log if set, printing the messages content when dumping data logs. Automatically set if any decoder option is specified. --remote-log-metadata-decoder If set, log data will be parsed as TopicBasedRemoteLogMetadataManager (RLMM) metadata records. Instead, the value-decoder-class option can be used if a custom RLMM implementation is configured. --share-group-state-decoder If set, log data will be parsed as share group state data from the __share_group_state topic. --skip-record-metadata whether to skip printing metadata for each record. --transaction-log-decoder if set, log data will be parsed as transaction metadata from the __transaction_state topic. --value-decoder-class [String] if set, used to deserialize the messages. This class should implement kafka. serializer.Decoder trait. Custom jar should be available in kafka/libs directory. (default: kafka.serializer. StringDecoder) --verify-index-only if set, just verify the index log without printing its content. --version Display Kafka version. 

kafka-jmx.sh

The kafka-jmx tool enables you to read JMX metrics from a given endpoint. This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.

Usage details
 Dump JMX values to standard output. Option Description ------ ----------- --attributes <String: name> The list of attributes to include in the query. This is a comma-separated list. If no attributes are specified all objects will be queried. --date-format <String: format> The date format to use for formatting the time field. See java.text. SimpleDateFormat for options. --help Print usage information. --jmx-auth-prop <String: jmx-auth-prop> A mechanism to pass property in the form 'username=password' when enabling remote JMX with password authentication. --jmx-ssl-enable <Boolean: ssl-enable> Flag to enable remote JMX with SSL. (default: false) --jmx-url <String: service-url> The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details. (default: service:jmx: rmi:///jndi/rmi://:9999/jmxrmi) --object-name <String: name> A JMX object name to use as a query. This can contain wild cards, and this option can be given multiple times to specify more than one query. If no objects are specified all objects will be queried. --one-time [Boolean: one-time] Flag to indicate run once only. (default: false) --report-format <String: report-format> output format name: either 'original', 'properties', 'csv', 'tsv' (default: original) --reporting-interval <Integer: ms> Interval in MS with which to poll jmx stats; default value is 2 seconds. Value of -1 equivalent to setting one-time to true (default: 2000) --version Display Kafka version. --wait Wait for requested JMX objects to become available before starting output. Only supported when the list of objects is non-empty and contains no object name patterns. 

trogdor.sh

Trogdor is a test framework for Kafka. Trogdor can run benchmarks and other workloads. Trogdor can also inject faults in order to stress test the system. For more information, see Trogdor and TROGDOR.

Usage details
The Trogdor fault injector. Usage: ./trogdor.sh [action] [options] Actions: agent: Run the trogdor agent. coordinator: Run the trogdor coordinator. client: Run the client which communicates with the trogdor coordinator. agent-client: Run the client which communicates with the trogdor agent. help: This help message. 

kafka-run-class.sh

This kafka-run-class tool is a thin wrapper around the Kafka Java class. It is called by other tools, and should not be run or modified directly.

Usage details
USAGE: ./kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname [opts]