Kafka Command-Line Interface (CLI) Tools
Apache Kafka® is an open-source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale.
Kafka provides a suite of command-line interface (CLI) tools that can be accessed from the /bin directory after downloading Kafka and extracting the files. These tools offer a range of capabilities, including starting and stopping Kafka, managing topics, and handling partitions. To learn how to use each tool, simply run it with no argument or use the --help argument for detailed instructions.
Use Kafka CLI Tools with Confluent Cloud or Confluent Platform
Quickly manage and interact with your Kafka cluster using built-in Kafka CLI tools. Sign up for Confluent Cloud to start with $400 in free credits, or download Confluent Platform for local development.
To use the Kafka tools with Confluent Cloud, see How to Use Kafka Tools With Confluent Cloud.
You can use all of the Kafka CLI tools with Confluent Platform. They are installed under $CONFLUENT_HOME/bin folder when you install Confluent Platform, along with additional Confluent tools. Confluent has dropped the .sh extensions, so you do not need to use the extensions when calling the Confluent versions of these tools. In addition, when you pass a properties file, remember that Confluent Platform properties files are stored under $CONFLUENT_HOME/etc directory. For more information, see CLI Tools for Confluent Platform.
The following sections group the tools by function and provide basic usage information. In some cases, a tool is listed in more than one section.
Search by tool name
Enter a string to search and filter tools by name.
Manage Kafka and configure metadata
This section contains tools to start Kafka running in KRaft mode and to manage brokers.
kafka-server-start.sh
Use the kafka-server-start tool to start a Kafka server. You must pass the path to the properties file you want to use. To start Kafka in KRaft mode, first generate a cluster ID and store it in the properties file.
Usage details
USAGE: ./kafka-server-start.sh [-daemon] server.properties [--override property=value]* Option Description ------ ----------- --override <String> Optional property that should override values set in server.properties file --version Print version information and exit. kafka-server-stop.sh
Use the kafka-server-stop tool to stop the running Kafka server. When you run this tool, you do not need to have any arguments, but starting with Kafka 3.7 you can optionally specify either a process-role value of broker or controller or a node-id value indicating the node you want to stop.
For example, to stop all brokers, you would use the following command:
./bin/kafka-server-stop.sh --process-role=broker To stop node 1234, you would use the following command.
./bin/kafka-server-stop.sh --node-id=1234 Usage details
USAGE: ./kafka-server-stop.sh {[--process-role=value] | [--node-id=value]} kafka-storage.sh
Use the kafka-storage tool to generate a Cluster UUID and format storage with the generated UUID when running Kafka in KRaft mode. You must explicitly create a cluster ID for a KRaft cluster, and format the storage specifying that ID.
For example, the following command generates a cluster ID and stores it in a variable named KAFKA_CLUSTER_ID. The next command formats storage with that ID.
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID -c config/kraft/server.properties Usage details
USAGE: kafka-storage.sh [-h] {info,format,version-mapping,feature-dependencies,random-uuid} ... The Kafka storage tool. positional arguments: {info,format,version-mapping,feature-dependencies,random-uuid} info Get information about the Kafka log directories on this node. format Format the Kafka log directories on this node. version-mapping Look up the corresponding features for a given metadata version. Using the command with no ``--release-version`` argument will return the mapping for the latest stable metadata version. feature-dependencies Look up dependencies for a given feature version. If the feature is not known or the version not yet defined, an error is thrown. Multiple features can be specified. random-uuid Print a random UUID. optional arguments: --config CONFIG, -c CONFIG The Kafka configuration file to use. --cluster-id CLUSTER_ID, -t CLUSTER_ID The cluster ID to use. --add-scram ADD_SCRAM, -S ADD_SCRAM A SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g. 'SCRAM-SHA-256=[name=alice,password=alice-secret]' 'SCRAM-SHA-512=[name=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]' --ignore-formatted, -g When this option is passed, the format command will skip over already formatted directories rather than failing. --release-version The release version to use for the initial feature settings. The minimum is 3.3-IV3; the RELEASE_VERSION, -r default is 4.0-IV3 RELEASE_VERSION --feature FEATURE, The setting to use for a specific feature, in feature=level format. For example: ``kraft.version=1``. -f FEATURE --standalone, -s Used to initialize a controller as a single-node dynamic quorum. --no-initial-controllers, -N Used to initialize a server without a dynamic quorum topology. --initial-controllers Used to initialize a server with a specific dynamic quorum topology. The argument is a INITIAL_CONTROLLERS, comma-separated list of id@hostname:port:directory. The same values must be used to -I INITIAL_CONTROLLERS format all nodes. For example: 0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:MvDxzVmcRsaTz33bUuRU6A, 2@example.com:8084:07R5amHmR32VDA6jHkGbTA -h, --help show this help message and exit kafka-cluster.sh
Use the kafka-cluster tool to get the ID of a cluster or unregister a cluster. The following example shows how to retrieve the cluster ID, which requires a bootstrap-server argument.
bin/kafka-cluster.sh cluster-id --bootstrap-server localhost:9092 The output for this command might look like the following.
Cluster ID: WZEKwK-b123oT3ZOSU0dgw Usage details
USAGE: kafka-cluster.sh [-h] {cluster-id,unregister,list-endpoints} ... The Kafka cluster tool. positional arguments: {cluster-id,unregister,list-endpoints} cluster-id Get information about the ID of a cluster. unregister Unregister a broker. list-endpoints List endpoints optional arguments: -h, --help show this help message and exit kafka-features.sh
Use the kafka-features tool to manage feature flags to disable or enable functionality at runtime in Kafka. Pass the describe argument to describe the current active feature flags, upgrade to upgrade one or more feature flags, downgrade to downgrade one or more, and disable to disable one or more feature flags, which is the same as downgrading the version to zero.
Usage details
usage: kafka-features [-h] [--command-config COMMAND_CONFIG] (--bootstrap-server BOOTSTRAP_SERVER | --bootstrap-controller BOOTSTRAP_CONTROLLER) {describe,upgrade,downgrade,disable,version-mapping,feature-dependencies} ... This tool manages feature flags in Kafka. positional arguments: {describe,upgrade,downgrade,disable,version-mapping,feature-dependencies} describe Describes the current active feature flags. upgrade Upgrade one or more feature flags. downgrade Upgrade one or more feature flags. disable Disable one or more feature flags. This is the same as downgrading the version to zero. version-mapping Look up the corresponding features for a given metadata version. Using the command with no --release-version argument will return the mapping for the latest stable metadata version feature-dependencies Look up dependencies for a given feature version. If the feature is not known or the version not yet defined, an error is thrown. Multiple features can be specified. optional arguments: -h, --help show this help message and exit --bootstrap-server BOOTSTRAP_SERVER A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster. --bootstrap-controller BOOTSTRAP_CONTROLLER A comma-separated list of host:port pairs to use for establishing the connection to the KRaft quorum. --command-config COMMAND_CONFIG Property file containing configs to be passed to Admin Client. kafka-broker-api-versions.sh
The kafka-broker-api-versions tool retrieves and displays broker information. For example, the following command outputs the version of Kafka that is running on the broker:
bin/kafka-broker-api-versions.sh --bootstrap-server host1:9092 --version This command might have the following output:
3.3.1 (Commit:e23c59d00e687ff5) Usage details
This tool helps to retrieve broker version information. Option Description ------ ----------- --bootstrap-server <String: server(s) REQUIRED: The server to connect to. to use for bootstrapping> --command-config <String: command A property file containing configs to config property file> be passed to Admin Client. --help Print usage information. --version Display Kafka version. kafka-metadata-quorum.sh
Use the kafka-metadata-quorum tool to query the metadata quorum status. This tool is useful when you are debugging a cluster in KRaft mode. Pass the describe command to describe the current state of the metadata quorum.
The following code example displays a summary of the metadata quorum:
bin/kafka-metadata-quorum.sh --bootstrap-server host1:9092 describe --status The output for this command might look like the following.
ClusterId: fMCL8kv1SWm87L_Md-I2hg LeaderId: 3002 LeaderEpoch: 2 HighWatermark: 10 MaxFollowerLag: 0 MaxFollowerLagTimeMs: -1 CurrentVoters: [3000,3001,3002] CurrentObservers: [0,1,2] Usage details
usage: kafka-metadata-quorum [-h] [--command-config COMMAND_CONFIG] (--bootstrap-server BOOTSTRAP_SERVER | --bootstrap-controller BOOTSTRAP_CONTROLLER) {describe,add-controller,remove-controller} ... This tool describes kraft metadata quorum status. positional arguments: {describe,add-controller,remove-controller} describe Describe the metadata quorum info add-controller Add a controller to the KRaft controller cluster remove-controller Remove a controller from the KRaft controller cluster optional arguments: -h, --help show this help message and exit --bootstrap-server BOOTSTRAP_SERVER A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster. --bootstrap-controller BOOTSTRAP_CONTROLLER A comma-separated list of host:port pairs to use for establishing the connection to the Kafka controllers. --command-config COMMAND_CONFIG Property file containing configs to be passed to Admin Client. kafka-metadata-shell.sh
The kafka-metadata-shell tool enables you to interactively examine the metadata stored in a KRaft cluster.
To open the metadata shell, provide the path to your cluster’s metadata log directory using the --directory flag:
kafka-metadata-shell.sh --directory tmp/kraft-combined-logs/_cluster-metadata-0/ To ensure kafka-metadata-shell.sh functions correctly, the --directory flag must point to your Kafka cluster’s metadata log directory. While examples often show /tmp/kraft-combined-logs/_cluster-metadata-0/, this path varies based on your Kafka configuration.
If your metadata files are not in their default or expected location, you have two options:
Specify the full, correct path to your metadata log directory.
Copy the metadata log directory to a temporary location (for example, /tmp), and then specify that temporary path.
After the shell loads, you can explore the contents of the metadata log, and exit. The following code shows an example of this.
Loading... [ Kafka Metadata Shell ] >> ls image local >>ls image acls cells clientQuotas cluster clusterLinks configs delegationToken encryptor features producerIds provenance replicaExclusions scram tenants topics >> ls image/topics/ byID byLinkId byName byTenant >>ls image/topics/byName test perf_test_124 othersourcetopic >> cat /image/topics/byName/othersourcetopic/0/ { "replicas": [1], "observers": null, "directories": ["U9TE_0zN-oReJ5XwsShc-w"], "isr": [1], "removingReplicas": null, "addingReplicas": null, "removingObservers": null, "addingObservers": null, "elr": null, "lastKnownElr": null, "leader": -1, "leaderRecoveryState": "RECOVERED", "leaderEpoch": 57, "partitionEpoch": 9, "linkedLeaderEpoch": 54, "linkState": "ACTIVE" } >> exit For more information, see the Kafka Wiki.
Usage details
usage: metadata-shell-tool [-h] [--snapshot SNAPSHOT] [command [command ...]] The Apache Kafka metadata tool positional arguments: command The command to run. optional arguments: -h, --help show this help message and exit --snapshot SNAPSHOT, -s SNAPSHOT The snapshot file to read. kafka-configs.sh
Use the kafka-configs tool to change and describe topic, client, user, broker, IP configuration setting or KRaft controller. To describe or view a KRaft controller, use the --bootstrap-controller option, and do not specify a bootstrap-server.
To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.
/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000 When you use the --add-config flag to add multiple values, use square brackets around the comma-separated list like the following example:
/bin/kafka-configs.sh --bootstrap-server host1:9092 --alter --add-config max.connections.per.ip.overrides=[host1:50,host2:9] --entity-type brokers --entity-default The following example shows how you might check the cluster ID, by specifying the --bootstrap-controller option.
/bin/kafka-cluster.sh cluster-id --bootstrap-controller localhost:9092 See Kafka Topic Operations for more examples of how to work with topics.
Usage details
This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip or client-metrics Option Description ------ ----------- --add-config <String> Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1, k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: For entity-type 'topics': cleanup.policy compression.gzip.level compression.lz4.level compression.type compression.zstd.level delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas local.retention.bytes local.retention.ms max.compaction.lag.ms max.message.bytes message.timestamp.after.max.ms message.timestamp.before.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate remote.log.copy.disable remote.log.delete.on.disable remote.storage.enable retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable For entity-type 'brokers': background.threads compression.gzip.level compression.lz4.level compression.type compression.zstd.level follower.replication.throttled.rate leader.replication.throttled.rate listener.security.protocol.map listeners log.cleaner.backoff.ms log.cleaner.dedupe.buffer.size log.cleaner.delete.retention.ms log.cleaner.io.buffer.load.factor log.cleaner.io.buffer.size log.cleaner.io.max.bytes.per.second log.cleaner.max.compaction.lag.ms log.cleaner.min.cleanable.ratio log.cleaner.min.compaction.lag.ms log.cleaner.threads log.cleanup.policy log.flush.interval.messages log.flush.interval.ms log.index.interval.bytes log.index.size.max.bytes log.local.retention.bytes log.local.retention.ms log.message.timestamp.after.max.ms log.message.timestamp.before.max.ms log.message.timestamp.type log.preallocate log.retention.bytes log.retention.ms log.roll.jitter.ms log.roll.ms log.segment.bytes log.segment.delete.delay.ms max.connection.creation.rate max.connections max.connections.per.ip max.connections.per.ip.overrides message.max.bytes metric.reporters min.insync.replicas num.io.threads num.network.threads num.recovery.threads.per.data.dir num.replica.fetchers principal.builder.class producer.id.expiration.ms remote.fetch.max.wait.ms remote.list.offsets.request.timeout.ms remote.log.index.file.cache.total. size.bytes remote.log.manager.copier.thread.pool. size remote.log.manager.copy.max.bytes.per. second remote.log.manager.expiration.thread. pool.size remote.log.manager.fetch.max.bytes. per.second remote.log.reader.threads replica.alter.log.dirs.io.max.bytes. per.second sasl.enabled.mechanisms sasl.jaas.config sasl.kerberos.kinit.cmd sasl.kerberos.min.time.before.relogin sasl.kerberos.principal.to.local.rules sasl.kerberos.service.name sasl.kerberos.ticket.renew.jitter sasl.kerberos.ticket.renew.window. factor sasl.login.refresh.buffer.seconds sasl.login.refresh.min.period.seconds sasl.login.refresh.window.factor sasl.login.refresh.window.jitter sasl.mechanism.inter.broker.protocol ssl.cipher.suites ssl.client.auth ssl.enabled.protocols ssl.endpoint.identification.algorithm ssl.engine.factory.class ssl.key.password ssl.keymanager.algorithm ssl.keystore.certificate.chain ssl.keystore.key ssl.keystore.location ssl.keystore.password ssl.keystore.type ssl.protocol ssl.provider ssl.secure.random.implementation ssl.trustmanager.algorithm ssl.truststore.certificates ssl.truststore.location ssl.truststore.password ssl.truststore.type transaction.partition.verification. enable unclean.leader.election.enable For entity-type 'users': SCRAM-SHA-256 SCRAM-SHA-512 consumer_byte_rate controller_mutation_rate producer_byte_rate request_percentage For entity-type 'clients': consumer_byte_rate controller_mutation_rate producer_byte_rate request_percentage For entity-type 'ips': connection_creation_rate For entity-type 'client-metrics': interval.ms match metrics For entity-type 'groups': consumer.heartbeat.interval.ms consumer.session.timeout.ms share.auto.offset.reset share.heartbeat.interval.ms share.record.lock.duration.ms share.session.timeout.ms Entity types 'users' and 'clients' may be specified together to update config for clients of a specific user. --add-config-file <String> Path to a properties file with configs to add. See add-config for a list of valid configurations. --all List all configs for the given topic, broker, or broker-logger entity (includes static configuration when the entity type is brokers) --alter Alter the configuration for the entity. --bootstrap-controller <String: The Kafka controllers to connect to. controller to connect to> --bootstrap-server <String: server to The Kafka servers to connect to. connect to> --broker <String> The broker's ID. --broker-defaults The config defaults for all brokers. --broker-logger <String> The broker's ID for its logger config. --client <String> The client's ID. --client-defaults The config defaults for all clients. --client-metrics <String> The client metrics config resource name. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs. --delete-config <String> config keys to remove 'k1,k2' --describe List configs for the given entity. --entity-default Default entity name for clients/users/brokers/ips (applies to corresponding entity type in command line) --entity-name <String> Name of entity (topic name/client id/user principal name/broker id/ip/client metrics) --entity-type <String> Type of entity (topics/clients/users/brokers/broker- loggers/ips/client-metrics) --force Suppress console prompts --group <String> The group ID. --help Print usage information. --ip <String> The IP address. --ip-defaults The config defaults for all IPs. --topic <String> The topic's name. --user <String> The user's principal name. --user-defaults The config defaults for all users. --version Display Kafka version. Manage topics, partitions, and replication
kafka-topics.sh
Use the kafka-topics tool to create or delete a topic. You can also use the tool to retrieve a list of topics associated with a Kafka cluster. For more information, see Kafka Topic Operations.
To change a topic, see kafka-configs.sh, or how to modify a topic.
Example:
bin/kafka-topics.sh --bootstrap-server host1:9092 --topic test-topic --partitions 3 Usage details
This tool helps to create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions and replica assignment. (To alter topic configurations, the kafka-configs tool can be used.) --at-min-isr-partitions if set when describing topics, only show partitions whose isr count is equal to the configured minimum. --bootstrap-server <String: server to REQUIRED: The Kafka server to connect connect to> to. --command-config <String: command Property file containing configs to be config property file> passed to the Admin Client. --config <String: name=value> A topic configuration override for the topic being created or altered. The following is a list of valid configurations: cleanup.policy compression.gzip.level compression.lz4.level compression.type compression.zstd.level delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas local.retention.bytes local.retention.ms max.compaction.lag.ms max.message.bytes message.timestamp.after.max.ms message.timestamp.before.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate remote.log.copy.disable remote.log.delete.on.disable remote.storage.enable retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs. It is supported only in combination with -- create. (To alter topic configurations, the kafka-configs tool can be used.) --create Create a new topic. --delete Delete a topic --delete-config <String: name> This option is no longer supported and has been deprecated since 4.0. --describe List details for the given topics. --exclude-internal Exclude internal topics when running list or describe command. The internal topics will be listed by default. --help Print usage information. --if-exists if set when altering or deleting or describing topics, the action will only execute if the topic exists. --if-not-exists If set when creating topics, the action will only execute if the topic does not already exist. --list List all available topics. --partition-size-limit-per-response The maximum partition size to be <Integer: maximum number of included in one partitions per response> DescribeTopicPartitions response. --partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected). If not supplied for create, defaults to the cluster default. --replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each replication factor> partition in the topic being created. If not supplied, defaults to the cluster default. --topic <String: topic> The topic to create, alter, describe or delete. It also accepts a regular expression, except for --create option. Put topic name in double quotes and use the '\' prefix to escape regular expression symbols; e. g. "test\.topic". --topic-id <String: topic-id> The topic-id to describe. This is used only with --bootstrap-server option for describing topics. --topics-with-overrides If set when describing topics, only show topics that have overridden configs. --unavailable-partitions If set when describing topics, only show partitions whose leader is not available. --under-min-isr-partitions If set when describing topics, only show partitions whose isr count is less than the configured minimum. --under-replicated-partitions If set when describing topics, only show under replicated partitions --version Display Kafka version. kafka-configs.sh
Use the kafka-configs tool to change and describe topic, client, user, broker, IP configuration setting or KRaft controller. To describe or view a KRaft controller, use the --bootstrap-controller option, and do not specify a bootstrap-server.
To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.
/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000 When you use the --add-config flag to add multiple values, use square brackets around the comma-separated list like the following example:
/bin/kafka-configs.sh --bootstrap-server host1:9092 --alter --add-config max.connections.per.ip.overrides=[host1:50,host2:9] --entity-type brokers --entity-default The following example shows how you might check the cluster ID, by specifying the --bootstrap-controller option.
/bin/kafka-cluster.sh cluster-id --bootstrap-controller localhost:9092 See Kafka Topic Operations for more examples of how to work with topics.
Usage details
This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip or client-metrics Option Description ------ ----------- --add-config <String> Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1, k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: For entity-type 'topics': cleanup.policy compression.gzip.level compression.lz4.level compression.type compression.zstd.level delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas local.retention.bytes local.retention.ms max.compaction.lag.ms max.message.bytes message.timestamp.after.max.ms message.timestamp.before.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate remote.log.copy.disable remote.log.delete.on.disable remote.storage.enable retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable For entity-type 'brokers': background.threads compression.gzip.level compression.lz4.level compression.type compression.zstd.level follower.replication.throttled.rate leader.replication.throttled.rate listener.security.protocol.map listeners log.cleaner.backoff.ms log.cleaner.dedupe.buffer.size log.cleaner.delete.retention.ms log.cleaner.io.buffer.load.factor log.cleaner.io.buffer.size log.cleaner.io.max.bytes.per.second log.cleaner.max.compaction.lag.ms log.cleaner.min.cleanable.ratio log.cleaner.min.compaction.lag.ms log.cleaner.threads log.cleanup.policy log.flush.interval.messages log.flush.interval.ms log.index.interval.bytes log.index.size.max.bytes log.local.retention.bytes log.local.retention.ms log.message.timestamp.after.max.ms log.message.timestamp.before.max.ms log.message.timestamp.type log.preallocate log.retention.bytes log.retention.ms log.roll.jitter.ms log.roll.ms log.segment.bytes log.segment.delete.delay.ms max.connection.creation.rate max.connections max.connections.per.ip max.connections.per.ip.overrides message.max.bytes metric.reporters min.insync.replicas num.io.threads num.network.threads num.recovery.threads.per.data.dir num.replica.fetchers principal.builder.class producer.id.expiration.ms remote.fetch.max.wait.ms remote.list.offsets.request.timeout.ms remote.log.index.file.cache.total. size.bytes remote.log.manager.copier.thread.pool. size remote.log.manager.copy.max.bytes.per. second remote.log.manager.expiration.thread. pool.size remote.log.manager.fetch.max.bytes. per.second remote.log.reader.threads replica.alter.log.dirs.io.max.bytes. per.second sasl.enabled.mechanisms sasl.jaas.config sasl.kerberos.kinit.cmd sasl.kerberos.min.time.before.relogin sasl.kerberos.principal.to.local.rules sasl.kerberos.service.name sasl.kerberos.ticket.renew.jitter sasl.kerberos.ticket.renew.window. factor sasl.login.refresh.buffer.seconds sasl.login.refresh.min.period.seconds sasl.login.refresh.window.factor sasl.login.refresh.window.jitter sasl.mechanism.inter.broker.protocol ssl.cipher.suites ssl.client.auth ssl.enabled.protocols ssl.endpoint.identification.algorithm ssl.engine.factory.class ssl.key.password ssl.keymanager.algorithm ssl.keystore.certificate.chain ssl.keystore.key ssl.keystore.location ssl.keystore.password ssl.keystore.type ssl.protocol ssl.provider ssl.secure.random.implementation ssl.trustmanager.algorithm ssl.truststore.certificates ssl.truststore.location ssl.truststore.password ssl.truststore.type transaction.partition.verification. enable unclean.leader.election.enable For entity-type 'users': SCRAM-SHA-256 SCRAM-SHA-512 consumer_byte_rate controller_mutation_rate producer_byte_rate request_percentage For entity-type 'clients': consumer_byte_rate controller_mutation_rate producer_byte_rate request_percentage For entity-type 'ips': connection_creation_rate For entity-type 'client-metrics': interval.ms match metrics For entity-type 'groups': consumer.heartbeat.interval.ms consumer.session.timeout.ms share.auto.offset.reset share.heartbeat.interval.ms share.record.lock.duration.ms share.session.timeout.ms Entity types 'users' and 'clients' may be specified together to update config for clients of a specific user. --add-config-file <String> Path to a properties file with configs to add. See add-config for a list of valid configurations. --all List all configs for the given topic, broker, or broker-logger entity (includes static configuration when the entity type is brokers) --alter Alter the configuration for the entity. --bootstrap-controller <String: The Kafka controllers to connect to. controller to connect to> --bootstrap-server <String: server to The Kafka servers to connect to. connect to> --broker <String> The broker's ID. --broker-defaults The config defaults for all brokers. --broker-logger <String> The broker's ID for its logger config. --client <String> The client's ID. --client-defaults The config defaults for all clients. --client-metrics <String> The client metrics config resource name. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs. --delete-config <String> config keys to remove 'k1,k2' --describe List configs for the given entity. --entity-default Default entity name for clients/users/brokers/ips (applies to corresponding entity type in command line) --entity-name <String> Name of entity (topic name/client id/user principal name/broker id/ip/client metrics) --entity-type <String> Type of entity (topics/clients/users/brokers/broker- loggers/ips/client-metrics) --force Suppress console prompts --group <String> The group ID. --help Print usage information. --ip <String> The IP address. --ip-defaults The config defaults for all IPs. --topic <String> The topic's name. --user <String> The user's principal name. --user-defaults The config defaults for all users. --version Display Kafka version. kafka-get-offsets.sh
Use the kafka-get-offsets tool to retrieve topic-partition offsets.
Usage details
An interactive shell for getting topic-partition offsets. Option Description ------ ----------- --bootstrap-server <String: HOST1: REQUIRED. The server(s) to connect to PORT1,...,HOST3:PORT3> in the form HOST1:PORT1,HOST2:PORT2. --broker-list <String: HOST1:PORT1,..., DEPRECATED, use --bootstrap-server HOST3:PORT3> instead; ignored if --bootstrap- server is specified. The server(s) to connect to in the form HOST1: PORT1,HOST2:PORT2. --command-config <String: config file> Property file containing configs to be passed to Admin Client. --exclude-internal-topics By default, internal topics are included. If specified, internal topics are excluded. --help Print usage information. --partitions <String: partition ids> Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present. --time <String: <timestamp> / -1 or timestamp of the offsets before that. latest / -2 or earliest / -3 or max- [Note: No offset is returned, if the timestamp> timestamp greater than recently committed record timestamp is given.] (default: latest) --topic <String: topic> The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present. --topic-partitions <String: topic1:1, Comma separated list of topic- topic2:0-3,topic3,topic4:5-,topic5:-3 partition patterns to get the > offsets for, with the format of: ([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*) -([0-9]*))))?. The first group is an optional regex for the topic name, if omitted, it matches any topic name. The section after ':' describes a partition pattern, which can be: a number, a range in the format of NUMBER-NUMBER (lower inclusive, upper exclusive), an inclusive lower bound in the format of NUMBER-, an exclusive upper bound in the format of -NUMBER or may be omitted to accept all partitions. --version Display Kafka version. kafka-leader-election.sh
Use the kafka-leader-election tool to attempt to elect a new leader for a set of topic partitions.
Run this tool manually to restore leadership if the auto.leader.rebalance.enable property is set to false.
Usage details
This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas. Option Description ------ ----------- --admin.config <String: config file> Configuration properties files to pass to the admin client --all-topic-partitions Perform election on all of the eligible topic partitions based on the type of election (see the -- election-type flag). Not allowed if --topic or --path-to-json-file is specified. --bootstrap-server <String: host:port> A hostname and port for the broker to connect to, in the form host:port. Multiple comma separated URLs can be given. REQUIRED. --election-type <[PREFERRED,UNCLEAN]: Type of election to attempt. Possible election type> values are "preferred" for preferred leader election or "unclean" for unclean leader election. If preferred election is selection, the election is only performed if the current leader is not the preferred leader for the topic partition. If unclean election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED. --help Print usage information. --partition <Integer: partition id> Partition id for which to perform an election. REQUIRED if --topic is specified. --path-to-json-file <String: Path to The JSON file with the list of JSON file> partition for which leader elections should be performed. This is an example format. {"partitions": [{"topic": "foo", "partition": 1}, {"topic": "foobar", "partition": 2}] } Not allowed if --all-topic-partitions or --topic flags are specified. --topic <String: topic name> Name of topic for which to perform an election. Not allowed if --path-to- json-file or --all-topic-partitions is specified. --version Display Kafka version. kafka-transactions.sh
Use the kafka-transactions tool to list and describe transactions. Use to detect and abort hanging transactions. For more information, see Detect and Abort Hanging Transactions
Usage details
usage: kafka-transactions.sh [-h] [-v] [--command-config FILE] --bootstrap-server host:port COMMAND ... This tool is used to analyze the transactional state of producers in the cluster. It can be used to detect and recover from hanging transactions. optional arguments: -h, --help show this help message and exit -v, --version show the version of this Kafka distribution and exit --command-config FILE property file containing configs to be passed to admin client --bootstrap-server host:port hostname and port for the broker to connect to, in the form `host:port` (multiple comma-separated entries can be given) commands: list list transactions describe describe the state of an active transactional-id describe-producers describe the states of active producers for a topic partition abort abort a hanging transaction (requires administrative privileges) find-hanging find hanging transactions kafka-reassign-partitions.sh
Use the kafka-reassign-partitions to move topic partitions between replicas You pass a JSON-formatted file to specify the new replicas. To learn more, see Changing the replication factor in the Confluent documentation.
Usage details
This tool helps to move topic partitions between replicas. Option Description ------ ----------- --additional Execute this reassignment in addition to any other ongoing ones. This option can also be used to change the throttle of an ongoing reassignment. --bootstrap-controller The controller to use for reassignment. [String: bootstrap controller By default, the tool will get the quorum controller. This to connect to] option supports the actions --cancel and --list. --bootstrap-server <String: Server(s) REQUIRED: the server(s) to use for to use for bootstrapping> bootstrapping. --broker-list <String: brokerlist> The list of brokers to which the partitions need to be reassigned in the form "0,1,2". This is required if --topics-to-move-json-file is used to generate reassignment configuration --cancel Cancel an active reassignment. --command-config <String: Admin client Property file containing configs to be property file> passed to Admin Client. --disable-rack-aware Disable rack aware replica assignment --execute Kick off the reassignment as specified by the --reassignment-json-file option. --generate Generate a candidate partition reassignment configuration. Note that this only generates a candidate assignment, it does not execute it. --help Print usage information. --list List all active partition reassignments. --preserve-throttles Do not modify broker or topic throttles. --reassignment-json-file <String: The JSON file with the partition manual assignment json file path> reassignment configurationThe format to use is - {"partitions": [{"topic": "foo", "partition": 1, "replicas": [1,2,3,4], "observers":[3,4], "log_dirs": ["dir1","dir2","dir3"," dir4"] }], "version":1 } Note that "log_dirs" is optional. When it is specified, its length must equal the length of the replicas list. The value in this list can be either "any" or the absolution path of the log directory on the broker. If absolute log directory path is specified, the replica will be moved to the specified log directory on the broker. Note that "observers" is optional. When it is specified it must be a suffix of the replicas list. --replica-alter-log-dirs-throttle The movement of replicas between log <Long: replicaAlterLogDirsThrottle> directories on the same broker will be throttled to this value (bytes/sec). This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment along with the --additional flag. The throttle rate should be at least 1 KB/s. (default: -1) --throttle <Long: throttle> The movement of partitions between brokers will be throttled to this value (bytes/sec). This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment along with the --additional flag. The throttle rate should be at least 1 KB/s. (default: -1) --timeout <Long: timeout> The maximum time in ms to wait for log directory replica assignment to begin. (default: 10000) --topics-to-move-json-file <String: Generate a reassignment configuration topics to reassign json file path> to move the partitions of the specified topics to the list of brokers specified by the --broker- list option. The format to use is - {"topics": [{"topic": "foo"},{"topic": "foo1"}], "version":1 } --verify Verify if the reassignment completed as specified by the --reassignment- json-file option. If there is a throttle engaged for the replicas specified, and the rebalance has completed, the throttle will be removed --version Display Kafka version. kafka-delete-records.sh
Use the kafka-delete-records tool to delete partition records. Use this if a topic receives bad data. Pass a JSON-formatted file that specifies the topic, partition, and offset for data deletion. Data will be deleted up to the offset specified. Example:
bin/kafka-delete-records.sh --bootstrap-server host1:9092 --offset-json-file deleteme.json Usage details
This tool helps to delete records of the given partitions down to the specified offset. Option Description ------ ----------- --bootstrap-server <String: server(s) REQUIRED: The server to connect to. to use for bootstrapping> --command-config <String: command A property file containing configs to config property file path> be passed to Admin Client. --help Print usage information. --offset-json-file <String: Offset REQUIRED: The JSON file with offset json file path> per partition. The format to use is: {"partitions": [{"topic": "foo", "partition": 1, "offset": 1}], "version":1 } --version Display Kafka version. kafka-log-dirs.sh
Use the kafka-log-dirs tool to get a list of replicas per log directory on a broker.
Usage details
This tool helps to query log directory usage on the specified brokers. ------ ----------- --bootstrap-server <String: The server REQUIRED: the server(s) to use for (s) to use for bootstrapping> bootstrapping --broker-list <String: Broker list> The list of brokers to be queried in the form "0,1,2". All brokers in the cluster will be queried if no broker list is specified --command-config <String: Admin client Property file containing configs to be property file> passed to Admin Client. --describe Describe the specified log directories on the specified brokers. --help Print usage information. --topic-list <String: Topic list> The list of topics to be queried in the form "topic1,topic2,topic3". All topics will be queried if no topic list is specified (default: ) --version Display Kafka version. kafka-replica-verification.sh
Use the kafka-replica-verification tool to verify that all replicas of a topic contain the same data. Requires a broker-list parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to.
Usage details
Validate that all replicas for a set of topics have the same data. Option Description ------ ----------- --broker-list <String: hostname: REQUIRED: The list of hostname and port,...,hostname:port> port of the server to connect to. --fetch-size <Integer: bytes> The fetch size of each request. (default: 1048576) --help Print usage information. --max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000) --report-interval-ms <Long: ms> The reporting interval. (default: 30000) --time <Long: timestamp/-1(latest)/-2 Timestamp for getting the initial (earliest)> offsets. (default: -1) --topic-white-list <String: Java regex DEPRECATED use --topics-include (String)> instead; ignored if --topics-include specified. List of topics to verify replica consistency. Defaults to '. *' (all topics) (default: .*) --topics-include <String: Java regex List of topics to verify replica (String)> consistency. Defaults to '.*' (all topics) (default: .*) --version Print version information and exit. kafka-mirror-maker.sh
DEPRECATED: For an alternative, see connect-mirror-maker.sh. Enables the creation of a replica of an existing Kafka cluster. Example: bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters secondary. Learn more Kafka mirroring
Usage details
This tool helps to continuously copy data between two Kafka clusters. Option Description ------ ----------- --abort.on.send.failure <String: Stop Configure the mirror maker to exit on the entire mirror maker when a send a failed send. (default: true) failure occurs> --consumer.config <String: config file> Embedded consumer config for consuming from the source cluster. --consumer.rebalance.listener <String: The consumer rebalance listener to use A custom rebalance listener of type for mirror maker consumer. ConsumerRebalanceListener> --help Print usage information. --include <String: Java regex (String)> List of included topics to mirror. --message.handler <String: A custom Message handler which will process message handler of type every record in-between consumer and MirrorMakerMessageHandler> producer. --message.handler.args <String: Arguments used by custom message Arguments passed to message handler handler for mirror maker. constructor.> --new.consumer DEPRECATED Use new consumer in mirror maker (this is the default so this option will be removed in a future version). --num.streams <Integer: Number of Number of consumption streams. threads> (default: 1) --offset.commit.interval.ms <Integer: Offset commit interval in ms. offset commit interval in (default: 60000) millisecond> --producer.config <String: config file> Embedded producer config. --rebalance.listener.args <String: Arguments used by custom rebalance Arguments passed to custom rebalance listener for mirror maker consumer. listener constructor as a string.> --version Display Kafka version. --whitelist <String: Java regex DEPRECATED, use --include instead; (String)> ignored if --include specified. List of included topics to mirror. connect-mirror-maker.sh
Use the connect-mirror-maker tool to replicate topics from one cluster to another using the Connect framework. You must pass an an mm2.properties MM2 configuration file. For more information, see KIP-382: MirrorMaker 2.0 or Getting up to speed with MirrorMaker 2.
Usage details
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.properties MirrorMaker 2.0 driver positional arguments: mm2.properties MM2 configuration file. optional arguments: -h, --help show this help message and exit --clusters CLUSTER [CLUSTER ...] Target cluster to use for this node. Client, producer, and consumer tools
kafka-client-metrics.sh
Use the kafka-client-metrics tool to manipulate and describe client metrics configurations for clusters where client metrics are enabled. This tool provides a simpler alternative to using kafka-configs.sh to configure client metrics.
For example, to list all of the client metric configuration resource, use the following command:
kafka-client-metrics.sh --bootstrap-server HOST1:PORT1 --describe To describe a specific configuration:
kafka-client-metrics.sh --bootstrap-server HOST1:PORT1 --describe --name MYMETRICS You can use this tool to create a client metric configuration resource and generate a unique name. In this example, --generate-name is used to create a type-4 UUID to use as the client metrics configuration resource name:
kafka-client-metrics.sh --bootstrap-server HOST1:PORT1 --alter --generate-name \ --metrics org.apache.kafka.producer.node.request.latency.,org.apache.kafka.consumer.node.request.latency. \ --interval 60000 Usage details
Option Description ------ ----------- --alter Alter the configuration for the client metrics resource. --bootstrap-server <String: server to REQUIRED: The Kafka server to connect connect to> to. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. --delete Delete the configuration for the client metrics resource. --describe List configurations for the client metrics resource. --generate-name Generate a UUID to use as the name. --help Print usage information. --interval <Integer: push interval> The metrics push interval in milliseconds. --list List the client metrics resources. --match <String: k1=v1,k2=v2> Matching selector 'k1=v1,k2=v2'. The following is a list of valid selector names: client_id client_instance_id client_software_name client_software_version client_source_address client_source_port --metrics <String: m1,m2> Telemetry metric name prefixes 'm1,m2'. --name <String: name> Name of client metrics configuration resource. --version Display Kafka version. kafka-verifiable-consumer.sh
The kafka-verifiable-consumer tool consumes messages from a topic and emits consumer events as JSON objects to STDOUT. For example, group rebalances, received messages, and offsets committed. Intended for internal testing.
Usage details
usage: verifiable-consumer [-h] --topic TOPIC --group-id GROUP_ID [--group-instance-id GROUP_INSTANCE_ID] [--max-messages MAX-MESSAGES] [--session-timeout TIMEOUT_MS] [--verbose] [--enable-autocommit] [--send-offset-for-times-data] [--reset-policy RESETPOLICY] [--assignment-strategy ASSIGNMENTSTRATEGY] [--consumer.config CONFIG_FILE] (--bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]] | --broker-list HOST1:PORT1[,HOST2:PORT2[...]]) This tool consumes messages from a specific topic and emits consumer events (e.g. group rebalances, received messages, and offsets committed) as JSON objects to STDOUT. optional arguments: -h, --help show this help message and exit --topic TOPIC Consumes messages from this topic. --group-protocol GROUP_PROTOCOL Group protocol (must be one of CLASSIC, CONSUMER) (default: classic) --group-remote-assignor GROUP_REMOTE_ASSIGNOR Group remote assignor; only used if the group protocol is CONSUMER --group-id GROUP_ID The groupId shared among members of the consumer group --group-instance-id GROUP_INSTANCE_ID A unique identifier of the consumer instance --max-messages MAX-MESSAGES Consume this many messages. If -1 (the default), the consumer will consume until the process is killed externally (default: -1) --session-timeout TIMEOUT_MS Set the consumer's session timeout (default: 30000) --verbose Enable to log individual consumed records (default: false) --enable-autocommit Enable offset auto-commit on consumer (default: false) --reset-policy RESETPOLICY Set reset policy (must be either 'earliest', 'latest', or 'none' (default: earliest) --assignment-strategy ASSIGNMENTSTRATEGY Set assignment strategy (e.g. org.apache.kafka.clients.consumer. RoundRobinAssignor) (default: org.apache.kafka.clients.consumer. RangeAssignor) --consumer.config CONFIG_FILE Consumer config properties file (config options shared with command line parameters will be overridden). Connection Group: Group of arguments for connection to brokers --bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]] REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,... --broker-list HOST1:PORT1[,HOST2:PORT2[...]] DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap- server is specified. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,... kafka-configs.sh
Use the kafka-configs tool to change and describe topic, client, user, broker, IP configuration setting or KRaft controller. To describe or view a KRaft controller, use the --bootstrap-controller option, and do not specify a bootstrap-server.
To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.
/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000 When you use the --add-config flag to add multiple values, use square brackets around the comma-separated list like the following example:
/bin/kafka-configs.sh --bootstrap-server host1:9092 --alter --add-config max.connections.per.ip.overrides=[host1:50,host2:9] --entity-type brokers --entity-default The following example shows how you might check the cluster ID, by specifying the --bootstrap-controller option.
/bin/kafka-cluster.sh cluster-id --bootstrap-controller localhost:9092 See Kafka Topic Operations for more examples of how to work with topics.
Usage details
This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip or client-metrics Option Description ------ ----------- --add-config <String> Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1, k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: For entity-type 'topics': cleanup.policy compression.gzip.level compression.lz4.level compression.type compression.zstd.level delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas local.retention.bytes local.retention.ms max.compaction.lag.ms max.message.bytes message.timestamp.after.max.ms message.timestamp.before.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate remote.log.copy.disable remote.log.delete.on.disable remote.storage.enable retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable For entity-type 'brokers': background.threads compression.gzip.level compression.lz4.level compression.type compression.zstd.level follower.replication.throttled.rate leader.replication.throttled.rate listener.security.protocol.map listeners log.cleaner.backoff.ms log.cleaner.dedupe.buffer.size log.cleaner.delete.retention.ms log.cleaner.io.buffer.load.factor log.cleaner.io.buffer.size log.cleaner.io.max.bytes.per.second log.cleaner.max.compaction.lag.ms log.cleaner.min.cleanable.ratio log.cleaner.min.compaction.lag.ms log.cleaner.threads log.cleanup.policy log.flush.interval.messages log.flush.interval.ms log.index.interval.bytes log.index.size.max.bytes log.local.retention.bytes log.local.retention.ms log.message.timestamp.after.max.ms log.message.timestamp.before.max.ms log.message.timestamp.type log.preallocate log.retention.bytes log.retention.ms log.roll.jitter.ms log.roll.ms log.segment.bytes log.segment.delete.delay.ms max.connection.creation.rate max.connections max.connections.per.ip max.connections.per.ip.overrides message.max.bytes metric.reporters min.insync.replicas num.io.threads num.network.threads num.recovery.threads.per.data.dir num.replica.fetchers principal.builder.class producer.id.expiration.ms remote.fetch.max.wait.ms remote.list.offsets.request.timeout.ms remote.log.index.file.cache.total. size.bytes remote.log.manager.copier.thread.pool. size remote.log.manager.copy.max.bytes.per. second remote.log.manager.expiration.thread. pool.size remote.log.manager.fetch.max.bytes. per.second remote.log.reader.threads replica.alter.log.dirs.io.max.bytes. per.second sasl.enabled.mechanisms sasl.jaas.config sasl.kerberos.kinit.cmd sasl.kerberos.min.time.before.relogin sasl.kerberos.principal.to.local.rules sasl.kerberos.service.name sasl.kerberos.ticket.renew.jitter sasl.kerberos.ticket.renew.window. factor sasl.login.refresh.buffer.seconds sasl.login.refresh.min.period.seconds sasl.login.refresh.window.factor sasl.login.refresh.window.jitter sasl.mechanism.inter.broker.protocol ssl.cipher.suites ssl.client.auth ssl.enabled.protocols ssl.endpoint.identification.algorithm ssl.engine.factory.class ssl.key.password ssl.keymanager.algorithm ssl.keystore.certificate.chain ssl.keystore.key ssl.keystore.location ssl.keystore.password ssl.keystore.type ssl.protocol ssl.provider ssl.secure.random.implementation ssl.trustmanager.algorithm ssl.truststore.certificates ssl.truststore.location ssl.truststore.password ssl.truststore.type transaction.partition.verification. enable unclean.leader.election.enable For entity-type 'users': SCRAM-SHA-256 SCRAM-SHA-512 consumer_byte_rate controller_mutation_rate producer_byte_rate request_percentage For entity-type 'clients': consumer_byte_rate controller_mutation_rate producer_byte_rate request_percentage For entity-type 'ips': connection_creation_rate For entity-type 'client-metrics': interval.ms match metrics For entity-type 'groups': consumer.heartbeat.interval.ms consumer.session.timeout.ms share.auto.offset.reset share.heartbeat.interval.ms share.record.lock.duration.ms share.session.timeout.ms Entity types 'users' and 'clients' may be specified together to update config for clients of a specific user. --add-config-file <String> Path to a properties file with configs to add. See add-config for a list of valid configurations. --all List all configs for the given topic, broker, or broker-logger entity (includes static configuration when the entity type is brokers) --alter Alter the configuration for the entity. --bootstrap-controller <String: The Kafka controllers to connect to. controller to connect to> --bootstrap-server <String: server to The Kafka servers to connect to. connect to> --broker <String> The broker's ID. --broker-defaults The config defaults for all brokers. --broker-logger <String> The broker's ID for its logger config. --client <String> The client's ID. --client-defaults The config defaults for all clients. --client-metrics <String> The client metrics config resource name. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs. --delete-config <String> config keys to remove 'k1,k2' --describe List configs for the given entity. --entity-default Default entity name for clients/users/brokers/ips (applies to corresponding entity type in command line) --entity-name <String> Name of entity (topic name/client id/user principal name/broker id/ip/client metrics) --entity-type <String> Type of entity (topics/clients/users/brokers/broker- loggers/ips/client-metrics) --force Suppress console prompts --group <String> The group ID. --help Print usage information. --ip <String> The IP address. --ip-defaults The config defaults for all IPs. --topic <String> The topic's name. --user <String> The user's principal name. --user-defaults The config defaults for all users. --version Display Kafka version. kafka-verifiable-producer.sh
The kafka-verifiable-producer tool produces increasing integers to the specified topic and prints JSON metadata to STDOUT on each send request. This tool shows which messages have been acked and which have not. This tool is intended for internal testing.
Usage details
usage: verifiable-producer [-h] --topic TOPIC [--max-messages MAX-MESSAGES] [--throughput THROUGHPUT] [--acks ACKS] [--producer.config CONFIG_FILE] [--message-create-time CREATETIME] [--value-prefix VALUE-PREFIX] [--repeating-keys REPEATING-KEYS] (--bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]] | --broker-list HOST1:PORT1[,HOST2:PORT2[...]]) This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each "send" request, making externally visible which messages have been acked and which have not. optional arguments: -h, --help show this help message and exit --topic TOPIC Produce messages to this topic. --max-messages MAX-MESSAGES Produce this many messages. If -1, produce messages until the process is killed externally. (default: -1) --throughput THROUGHPUT If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. (default: -1) --acks ACKS Acks required on each produced message. See Kafka docs on acks for details. (default: -1) --producer.config CONFIG_FILE Producer config properties file. --message-create-time CREATETIME Send messages with creation time starting at the arguments value, in milliseconds since epoch (default: -1) --value-prefix VALUE-PREFIX If specified, each produced value will have this prefix with a dot separator --repeating-keys REPEATING-KEYS If specified, each produced record will have a key starting at 0 increment by 1 up to the number specified (exclusive), then the key is set to 0 again Connection Group: Group of arguments for connection to brokers --bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]] REQUIRED: The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,... --broker-list HOST1:PORT1[,HOST2:PORT2[...]] DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap- server is specified. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,... kafka-console-consumer.sh
Use the kafka-console-consumer tool to consume records from a topic. Requires bootstrap-server parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to.
Confluent Tip
If you are using Confluent, you can use the Confluent CLI and the kafka topic command to produce and consume from a topic.
Example:
bin/kafka-console-consumer.sh --bootstrap-server HOST1:PORT1,HOST2:PORT2 --consumer.config config.properties --topic testTopic --property "print.key=true" Usage details
This tool helps to read data from Kafka topics and outputs it to standard output. Option Description ------ ----------- --bootstrap-server <String: server to REQUIRED: The servers to connect to. connect to> --consumer-property <String: A mechanism to pass user-defined consumer_prop> properties in the form key=value to the consumer. --consumer.config <String: config file> Consumer config properties file. Note that [consumer-property] takes precedence over this config. --enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) --formatter <String: class> The name of a class to use for formatting kafka messages for display. (default: kafka.tools. DefaultMessageFormatter) --formatter-config <String: config Config properties file to initialize file> the message formatter. Note that [property] takes precedence over this config. --from-beginning If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message. --group <String: consumer group id> The consumer group id of the consumer. --help Print usage information. --include <String: Java regex (String)> Regular expression specifying list of topics to include for consumption. --isolation-level <String> Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted to read all messages. (default: read_uncommitted) --key-deserializer <String: deserializer for key> --max-messages <Integer: num_messages> The maximum number of messages to consume before exiting. If not set, consumption is continual. --offset <String: consume offset> The offset to consume from (a non- negative number), or 'earliest' which means from beginning, or 'latest' which means from end (default: latest) --partition <Integer: partition> The partition to consume from. Consumption starts from the end of the partition unless '--offset' is specified. --property <String: prop> The properties to initialize the message formatter. Default properties include: print.timestamp=true|false print.key=true|false print.offset=true|false print.partition=true|false print.headers=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> headers.separator=<line.separator> null.literal=<null.literal> key.deserializer=<key.deserializer> value.deserializer=<value. deserializer> header.deserializer=<header. deserializer> Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key. deserializer.', 'value. deserializer.' and 'headers. deserializer.' prefixes to configure their deserializers. --skip-message-on-error If there is an error when processing a message, skip it instead of halt. --timeout-ms <Integer: timeout_ms> If specified, exit if no message is available for consumption for the specified interval. --topic <String: topic> The topic to consume on. --value-deserializer <String: deserializer for values> --version Display Kafka version. kafka-console-producer.sh
Use the kafka-console-producer tool to produce records to a topic. Requires a bootstrap-server parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to. Example:
kafka-console-producer.sh --bootstrap-server HOST1:PORT1,HOST2:PORT2 --producer.config config.properties --topic testTopic --property "parse.key=true" --property "key.separator=:" Confluent Tip
If you are using Confluent, you can use the Confluent CLI and the kafka topic command to produce and consume from a topic.
Usage details
This tool helps to read data from standard input and publish it to Kafka. Option Description ------ ----------- --batch-size <Integer: size> Number of messages to send in a single batch if they are not being sent synchronously. please note that this option will be replaced if max- partition-memory-bytes is also set (default: 16384) --bootstrap-server <String: server to REQUIRED unless --broker-list connect to> (deprecated) is specified. The server (s) to connect to. The broker list string in the form HOST1:PORT1,HOST2: PORT2. --broker-list <String: broker-list> DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap- server is specified. The broker list string in the form HOST1:PORT1, HOST2:PORT2. --compression-codec [String: The compression codec: either 'none', compression-codec] 'gzip', 'snappy', 'lz4', or 'zstd'. If specified without value, then it defaults to 'gzip' --help Print usage information. --line-reader <String: reader_class> The class name of the class to use for reading lines from standard in. By default each line is read as a separate message. (default: kafka. tools. ConsoleProducer$LineMessageReader) --max-block-ms <Long: max block on The max time that the producer will send> block for during a send request. (default: 60000) --max-memory-bytes <Long: total memory The total memory used by the producer in bytes> to buffer records waiting to be sent to the server. This is the option to control `buffer.memory` in producer configs. (default: 33554432) --max-partition-memory-bytes <Integer: The buffer size allocated for a memory in bytes per partition> partition. When records are received which are smaller than this size the producer will attempt to optimistically group them together until this size is reached. This is the option to control `batch.size` in producer configs. (default: 16384) --message-send-max-retries <Integer> Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. This is the option to control `retries` in producer configs. (default: 3) --metadata-expiry-ms <Long: metadata The period of time in milliseconds expiration interval> after which we force a refresh of metadata even if we haven't seen any leadership changes. This is the option to control `metadata.max.age. ms` in producer configs. (default: 300000) --producer-property <String: A mechanism to pass user-defined producer_prop> properties in the form key=value to the producer. --producer.config <String: config file> Producer config properties file. Note that [producer-property] takes precedence over this config. --property <String: prop> A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user- defined message reader. Default properties include: parse.key=false parse.headers=false ignore.error=false key.separator=\t headers.delimiter=\t headers.separator=, headers.key.separator=: null.marker= When set, any fields (key, value and headers) equal to this will be replaced by null Default parsing pattern when: parse.headers=true and parse.key=true: "h1:v1,h2:v2...\tkey\tvalue" parse.key=true: "key\tvalue" parse.headers=true: "h1:v1,h2:v2...\tvalue" --reader-config <String: config file> Config properties file for the message reader. Note that [property] takes precedence over this config. --request-required-acks <String: The required `acks` of the producer request required acks> requests (default: -1) --request-timeout-ms <Integer: request The ack timeout of the producer timeout ms> requests. Value must be non-negative and non-zero. (default: 1500) --retry-backoff-ms <Long> Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. This is the option to control `retry.backoff.ms` in producer configs. (default: 100) --socket-buffer-size <Integer: size> The size of the tcp RECV size. This is the option to control `send.buffer. bytes` in producer configs. (default: 102400) --sync If set message send requests to the brokers are synchronously, one at a time as they arrive. --timeout <Long: timeout_ms> If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms. This is the option to control `linger.ms` in producer configs. (default: 1000) --topic <String: topic> REQUIRED: The topic id to produce messages to. --version Display Kafka version. kafka-producer-perf-test.sh
The kafka-producer-perf-test tool enables you to produce a large quantity of data to test producer performance for the Kafka cluster.
Example:
bin/kafka-producer-perf-test.sh --topic topic-a --num-records 200000 --record-size 1000 --throughput 10000000 --producer-props bootstrap.servers=host1:9092 Usage details
usage: producer-perf-test [-h] --topic TOPIC --num-records NUM-RECORDS [--payload-delimiter PAYLOAD-DELIMITER] --throughput THROUGHPUT [--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]] [--producer.config CONFIG-FILE] [--print-metrics] [--transactional-id TRANSACTIONAL-ID] [--transaction-duration-ms TRANSACTION-DURATION] (--record-size RECORD-SIZE | --payload-file PAYLOAD-FILE) | --payload-monotonic) This tool is used to verify the producer performance. To enable transactions, you can specify a transaction id or set a transaction duration using --transaction-duration-ms. There are three ways to specify the transaction id: set transaction.id=<id> via --producer-props, set transaction.id=<id> in the config file via --producer.config, or use --transaction-id <id>. optional arguments: -h, --help show this help message and exit --topic TOPIC produce messages to this topic --num-records NUM-RECORDS number of messages to produce --payload-delimiter PAYLOAD-DELIMITER provides delimiter to be used when --payload-file is provided. Defaults to new line. Note that this parameter will be ignored if -- payload-file is not provided. (default: \n) --throughput THROUGHPUT throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling. --producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...] kafka producer related configuration properties like bootstrap. servers, client.id etc. These configs take precedence over those passed via --producer.config. --producer.config CONFIG-FILE producer config properties file. --print-metrics print out metrics at the end of the test. (default: false) --transactional-id TRANSACTIONAL-ID The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions. (default: performance-producer-default-transactional-id) --transaction-duration-ms TRANSACTION-DURATION The max age of each transaction. The commitTransaction will be called after this time has elapsed. The value should be greater than 0. If the transactional id is specified via --producer-props, --producer.config, or --transactional-id but --transaction-duration-ms is not specified, the default value will be 3000. either --record-size or --payload-file must be specified but not both. --record-size RECORD-SIZE message size in bytes. Note that you must provide exactly one of -- record-size or --payload-file. --payload-file PAYLOAD-FILE file to read the message payloads from. This works only for UTF-8 encoded text files. Payloads will be read from this file and a payload will be randomly selected when sending messages. Note that you must provide exactly one of --record-size or --payload-file. --payload-monotonic payload is monotonically increasing integer. Note that you must provide exactly one of --record-size or --payload-file or --payload-monotonic. (default: false) kafka-groups.sh
This tool helps to list groups of all types.
Usage details
Option Description ------ ----------- --bootstrap-server <String: The Kafka server to connect server to REQUIRED: connect to> --command-config <String: command Property file containing configs to be config property file> passed to the admin client. --consumer Filter the groups to show all kinds of consumer groups, including classic and simple consumer groups. This matches group type 'consumer', and group type 'classic' where the protocol type is 'consumer' or empty. --group-type <String: type> Filter the groups based on group type. Valid types are: 'classic', 'consumer' and 'share'. --help Print usage information. --list List the groups. --protocol <String: protocol> Filter the groups based on protocol type. --share Filter the groups to show share groups. --version Display Kafka version. kafka-consumer-groups.sh
Use the kafka-consumer-groups tool to get a list of the active groups in the cluster.
For example, to show the position of all consumers in a group named user-group, you might use the following command.
bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --describe --group user-group This would result in output like the following (CONSUMER-ID entries truncated for readability).
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID user 0 2 4 2 consumer-1-... /127.0.0.1 consumer-1 user 1 2 3 1 consumer-1-... /127.0.0.1 consumer-1 user 2 2 3 1 consumer-2-... /127.0.0.1 consumer-2 For more examples, see View Consumer Group Info in Kafka.
Usage details
Option Description ------ ----------- --all-groups Apply to all consumer groups. --all-topics Consider all topics assigned to a group in the `reset-offsets` process. --bootstrap-server <String: server to REQUIRED: The servers to connect to. Required for all options except for --validate-regex. connect to> --by-duration <String: duration> Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS' --command-config <String: command Property file containing configs to be config property file> passed to Admin Client and Consumer. --delete Pass in groups to delete topic partition offsets and ownership information over the entire consumer group. For instance --group g1 -- group g2 --delete-offsets Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics. --describe Describe consumer group and list offset lag (number of messages not yet processed) related to given group. --dry-run Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets. --execute Execute operation. Supported operations: reset-offsets. --export Export operation execution to a CSV file. Supported operations: reset- offsets. --from-file <String: path to CSV file> Reset offsets to values defined in CSV file. --group <String: consumer group> The consumer group we wish to act on. --help Print usage information. --list List all consumer groups. --members Describe members of the group. This option may be used with '--describe' and '--bootstrap-server' options only. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- members --offsets Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with '--describe' and '-- bootstrap-server' options only. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- offsets --reset-offsets Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. Additionally, the -- export option is used to export the results to a CSV format. You must choose one of the following reset specifications: --to-datetime, --by-period, --to-earliest, --to- latest, --shift-by, --from-file, -- to-current. To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from- file'. --shift-by <Long: number-of-offsets> Reset offsets shifting current offset by 'n', where 'n' can be positive or negative. --state [String] When specified with '--describe', includes the state of the group. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- state When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. Example: --bootstrap-server localhost: 9092 --list --state stable,empty This option may be used with '-- describe', '--list' and '--bootstrap- server' options only. --timeout <Long: timeout (ms)> The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000) --to-current Reset offsets to current offset. --to-datetime <String: datetime> Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss' --to-earliest Reset offsets to earliest offset. --to-latest Reset offsets to latest offset. --to-offset <Long: offset> Reset offsets to a specific offset. --topic <String: topic> The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. In `reset- offsets` case, partitions can be specified using this format: `topic1: 0,1,2`, where 0,1,2 are the partition to be included in the process. Reset-offsets also supports multiple topic inputs. --type [String] When specified with '--list', it displays the types of all the groups. It can also be used to list groups with specific types. Example: --bootstrap-server localhost: 9092 --list --type classic, consumer This option may be used with the '--list' option only. --validate-regex <String: regex> Validate that the syntax of the provided regular expression is valid according to the RE2 format. --verbose Provide additional information, if any, when describing the group. This option may be used with '-- offsets'/'--members'/'--state' and '--bootstrap-server' options only. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- members --verbose --version Display Kafka version. kafka-consumer-perf-test.sh
This tool tests the consumer performance for the Kafka cluster.
Usage details
Option Description ------ ----------- --bootstrap-server <String: server to REQUIRED unless --broker-list connect to> (deprecated) is specified. The server (s) to connect to. --broker-list <String: broker-list> DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap- server is specified. The broker list string in the form HOST1:PORT1, HOST2:PORT2. --consumer.config <String: config file> Consumer config properties file. --date-format <String: date format> The date format to use for formatting the time field. See java.text. SimpleDateFormat for options. (default: yyyy-MM-dd HH:mm:ss:SSS) --fetch-size <Integer: size> The amount of data to fetch in a single request. (default: 1048576) --from-latest If the consumer does not already have an established offset to consume from, start with the latest message present in the log rather than the earliest message. --group <String: gid> The group id to consume on. (default: perf-consumer-50334) --help Print usage information. --hide-header If set, skips printing the header for the stats --messages <Long: count> REQUIRED: The number of messages to send or consume --num-fetch-threads <Integer: count> DEPRECATED AND IGNORED: Number of fetcher threads. (default: 1) --print-metrics Print out the metrics. --reporting-interval <Integer: Interval in milliseconds at which to interval_ms> print progress info. (default: 5000) --show-detailed-stats If set, stats are reported for each reporting interval as configured by reporting-interval --socket-buffer-size <Integer: size> The size of the tcp RECV size. (default: 2097152) --threads <Integer: count> DEPRECATED AND IGNORED: Number of processing threads. (default: 10) --timeout [Long: milliseconds] The maximum allowed time in milliseconds between returned records. (default: 10000) --topic <String: topic> REQUIRED: The topic to consume from. --version Display Kafka version. Manage Kafka Connect
connect-distributed.sh
Use the connect-distributed tool to run Connect workers in Distributed mode, meaning on multiple, distributed, machines. Distributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data.
Usage details
USAGE: ./connect-distributed.sh [-daemon] connect-distributed.properties connect-standalone.sh
Use the connect-standalone tool to run Kafka Connect in standalone mode meaning all work is performed in a single process. This is good for getting started but lacks fault tolerance. For more information, see Kafka Connect.
Usage details
USAGE: ./connect-standalone.sh [-daemon] connect-standalone.properties connect-mirror-maker.sh
Use the connect-mirror-maker tool to replicate topics from one cluster to another using the Connect framework. You must pass an an mm2.properties MM2 configuration file. For more information, see KIP-382: MirrorMaker 2.0 or Getting up to speed with MirrorMaker 2.
Usage details
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.properties MirrorMaker 2.0 driver positional arguments: mm2.properties MM2 configuration file. optional arguments: -h, --help show this help message and exit --clusters CLUSTER [CLUSTER ...] Target cluster to use for this node. Manage Kafka Streams
kafka-streams-application-reset.sh
For Kafka Streams applications, the kafka-streams-application-reset tool resets the application and forces it to reprocess its data from the beginning. Useful for debugging and testing.
For example, the following command would reset the my-streams-app application:
kafka-streams-application-reset.sh --application-id my-streams-app \ --input-topics my-input-topic \ --intermediate-topics rekeyed-topic For more information, see Kafka Streams Application Reset Tool in the Confluent documentation.
Usage details
This tool helps to quickly reset an application in order to reprocess its data from scratch. * This tool resets offsets of input topics to the earliest available offset (by default), or to a specific defined position. * This tool deletes the internal topics that were created by Kafka Streams (topics starting with "<application.id>-"). The tool finds these internal topics automatically. If the topics flagged automatically for deletion by the dry-run are unsuitable, you can specify a subset with the "--internal-topics" option. * This tool will not delete output topics (if you want to delete them, you need to do it yourself with the bin/kafka-topics.sh command). * This tool will not clean up the local state on the stream application instances (the persisted stores used to cache aggregation results). You need to call KafkaStreams#cleanUp() in your application or manually delete them from the directory specified by "state.dir" configuration (${java.io.tmpdir}/kafka-streams/<application.id> by default). * When long session timeout has been configured, active members could take longer to get expired on the broker thus blocking the reset job to complete. Use the "--force" option could remove those left-over members immediately. Make sure to stop all stream applications when this option is specified to avoid unexpected disruptions. *** Important! You will get wrong output if you don't clean up the local stores after running the reset tool! *** Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this first with "--dry-run" to preview your changes before making them. Option (* = required) Description --------------------- ----------- * --application-id <String: id> The Kafka Streams application ID (application.id). --bootstrap-servers <String: urls> Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2 (default: localhost:9092) --by-duration <String> Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS' --config-file <String: file name> Property file containing configs to be passed to admin clients and embedded consumer. --dry-run Display the actions that would be performed without executing the reset commands. --force Force the removal of members of the consumer group (intended to remove stopped members if a long session timeout was used). Make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances. --from-file <String> Reset offsets to values defined in CSV file. --help Print usage information. --input-topics <String: list> Comma-separated list of user input topics. For these topics, the tool by default will reset the offset to the earliest available offset. Reset to other offset position by appending other reset offset option, ex: --input- topics foo --shift-by 5 --intermediate-topics <String: list> Comma-separated list of intermediate user topics (topics that are input and output topics, e.g., used in the deprecated through() method). For these topics, the tool will skip to the end. --internal-topics <String: list> Comma-separated list of internal topics to delete. Must be a subset of the internal topics marked for deletion by the default behaviour (do a dry-run without this option to view these topics). --shift-by <Long: number-of-offsets> Reset offsets shifting current offset by 'n', where 'n' can be positive or negative --to-datetime <String> Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss' --to-earliest Reset offsets to earliest offset. --to-latest Reset offsets to latest offset. --to-offset <Long> Reset offsets to a specific offset. --version Print version information and exit. kafka-streams-groups.sh
Use this tool to list or describe streams groups.
Usage details
Option Description ------ ----------- --all-groups Applies to all streams groups. --all-input-topics Considers all input topics assigned to a group in the reset-offsets and delete-offsets process. Only input topics are supported. --bootstrap-server (REQUIRED) The servers to connect to. <String: server to connect to> --command-config <String: command Property file that contains config property file> configurations to pass to the admin client. --delete Deletes topic partition offsets and ownership information for the entire streams group. For example, --group g1 --group g2. --delete-offsets Deletes offsets of a streams group. Supports one streams group at a time and multiple topics. --describe Describes the streams group and lists offset lag related to the specified group. --group <String: streams group> The streams group to act on. --help Prints usage information. --input-topic <String: topic> The topic whose streams group information to delete, or the topic to include in the reset offset process. For reset-offsets, you can specify partitions using this format: topic1: 0,1,2, where 0,1,2 are the partitions to include in the process. Reset-offsets also supports multiple topic inputs. All types of topics are supported. --list Lists all streams groups. --members Describes members of the group. Use this option with the --describe option only. --offsets Describes the group and lists all topic partitions in the group along with their offset information. This is the default sub-action. Use with the --describe option only. --state [String] When specified with --list, displays the state of all groups. Can also be used to list groups with specific states. The valid values are Empty, NotReady, Stable, Assigning, Reconciling, and Dead. --timeout <Long: timeout (ms)> The timeout for some use cases. For example, when describing the group, specifies the maximum amount of time in milliseconds to wait before the group stabilizes. (Default: 5000) --verbose Use with --describe --state to show group epoch and target assignment epoch. Use with --describe --members to show each member the member's epoch, target assignment epoch, current assignment, target assignment, and whether the member is still using the classic rebalance protocol. Use with --describe --offsets and --describe to show leader epochs for each partition. --version Displays Kafka version. Manage security
kafka-acls
Use the kafka-acls tool to add, remove and list ACLs. For example, if you wanted to add two principal users, Jose and Jane to have read and write permissions on the user topic from specific IP addresses, you could use a command like the following:
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Jose --allow-principal User:Jane --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic user Usage details
This tool helps to manage acls on kafka. Option Description ------ ----------- --add Indicates you are trying to add ACLs. --allow-host <String: allow-host> Host from which principals listed in -- allow-principal will have access. If you have specified --allow-principal then the default for this option will be set to '*' which allows access from all hosts. --allow-principal <String: allow- principal is in principalType:name principal> format. Note that principalType must be supported by the Authorizer being used. For example, User:'*' is the wild card indicating all users. --bootstrap-controller <String: A list of host/port pairs to use for controller to connect to> establishing the connection to the Kafka cluster. This list should be in the form host1:port1,host2: port2,... This config is required for acl management using admin client API. --bootstrap-server <String: server to A list of host/port pairs to use for connect to> establishing the connection to the Kafka cluster. This list should be in the form host1:port1,host2: port2,... This config is required for acl management using admin client API. --cluster Add/Remove cluster ACLs. --command-config [String: command- A property file containing configs to config] be passed to Admin Client. --consumer Convenience option to add/remove ACLs for consumer role. This will generate ACLs that allows READ, DESCRIBE on topic and READ on group. --delegation-token <String: delegation- Delegation token to which ACLs should token> be added or removed. A value of '*' indicates ACL should apply to all tokens. --deny-host <String: deny-host> Host from which principals listed in -- deny-principal will be denied access. If you have specified --deny- principal then the default for this option will be set to '*' which denies access from all hosts. --deny-principal <String: deny- principal is in principalType:name principal> format. By default anyone not added through --allow-principal is denied access. You only need to use this option as negation to already allowed set. Note that principalType must be supported by the Authorizer being used. For example if you wanted to allow access to all users in the system but not test-user you can define an ACL that allows access to User:'*' and specify --deny- principal=User:test@EXAMPLE.COM. AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES. --force Assume Yes to all queries and do not prompt. --group <String: group> Consumer Group to which the ACLs should be added or removed. A value of '*' indicates the ACLs should apply to all groups. --help Print usage information. --idempotent Enable idempotence for the producer. This should be used in combination with the --producer option. Note that idempotence is enabled automatically if the producer is authorized to a particular transactional-id. --list List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource. --operation <String> Operation that is being allowed or denied. Valid operation names are: Describe DescribeConfigs Alter IdempotentWrite Read Delete Create ClusterAction All CreateTokens DescribeTokens Write AlterConfigs (default: All) --principal [String: principal] List ACLs for the specified principal. principal is in principalType:name format. Note that principalType must be supported by the Authorizer being used. Multiple --principal option can be passed. --producer Convenience option to add/remove ACLs for producer role. This will generate ACLs that allows WRITE, DESCRIBE and CREATE on topic. --remove Indicates you are trying to remove ACLs. --resource-pattern-type The type of the resource pattern or <ANY|MATCH|LITERAL|PREFIXED> pattern filter. When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'. When listing or removing acls, a specific pattern type can be used to list or remove acls from specific resource patterns, or use the filter values of 'any' or 'match', where 'any' will match any pattern type, but will match the resource name exactly, where as 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s). WARNING: 'match', when used in combination with the '--remove' switch, should be used with care. (default: LITERAL) --topic <String: topic> topic to which ACLs should be added or removed. A value of '*' indicates ACL should apply to all topics. --transactional-id <String: The transactionalId to which ACLs transactional-id> should be added or removed. A value of '*' indicates the ACLs should apply to all transactionalIds. --user-principal <String: user- Specifies a user principal as a principal> resource in relation with the operation. For instance one could grant CreateTokens or DescribeTokens permission on a given user principal. --version Display Kafka version. kafka-delegation-tokens.sh
Use the kafka-delegation-tokens tool to create, renew, expire and describe delegation tokens. Delegation tokens are shared secrets between Kafka brokers and clients, and are a lightweight authentication mechanism meant to complement existing SASL/SSL methods. For more information, see Authentication using Delegation Tokens in the Confluent Documentation.
Usage details
This tool helps to create, renew, expire, or describe delegation tokens. Option Description ------ ----------- --bootstrap-server <String> REQUIRED: server(s) to use for bootstrapping. --command-config <String> REQUIRED: A property file containing configs to be passed to Admin Client. Token management operations are allowed in secure mode only. This config file is used to pass security related configs. --create Create a new delegation token. Use --renewer- principal option to pass renewers principals. --describe Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals. If --owner-principal option is not supplied, all the user owned tokens and tokens where user have Describe permission will be returned. --expire Expire delegation token. Use --expiry-time- period option to expire the token. --expiry-time-period [Long] Expiry time period in milliseconds. If the value is -1, then the token will get invalidated immediately. --help Print usage information. --hmac [String] HMAC of the delegation token --max-life-time-period [Long] Max life period for the token in milliseconds. If the value is -1, then token max life time will default to a server side config value (delegation.token.max.lifetime.ms). --owner-principal [String] owner is a kafka principal. It is should be in principalType:name format. --renew Renew delegation token. Use --renew-time-period option to set renew time period. --renew-time-period [Long] Renew time period in milliseconds. If the value is -1, then the renew time period will default to a server side config value (delegation. token.expiry.time.ms). --renewer-principal [String] renewer is a kafka principal. It is should be in principalType:name format. --version Display Kafka version. Test and troubleshoot
This section contains tools you can use for testing and troubleshooting your applications.
kafka-e2e-latency.sh
The kafka-e2e-latency tool is a performance testing tool used to measure end-to-end latency in Kafka. It works by sending messages to a Kafka topic and then consuming those messages from a Kafka consumer. The tool calculates the time difference between when a message was produced and when it was consumed, giving you an idea of the end-to-end latency for your Kafka cluster. This tool is useful for testing the performance of your Kafka cluster and identifying any bottlenecks or issues that may be affecting latency.
To run the tool, you provide details such as the message size, number of messages and acks setting for the producer. For more about end-to-end latency, see Configure Kafka to Minimize Latency.
- Following are the required arguments
broker_list: The location of the bootstrap broker for both the producer and the consumertopic: The topic name used by both the producer and the consumer to send/receive messagesnum_messages: The number of messages to sendproducer_acks: The producer setting for acks.message_size_bytes: size of each message in bytes
For example:
kafka-e2e-latency.sh localhost:9092 test 10000 1 20 Usage details
USAGE: java org.apache.kafka.tools.EndToEndLatency broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file kafka-dump-log.sh
The kafka-dump-log tool can be used in KRaft mode to parse a metadata log file and output its contents to the console. Requires a comma-separated list of log files. The tool will scan the provided files and decode the metadata records.
The following example shows using the cluster-metadata-decoder argument to decode the metadata records in a log segment.
bin/kafka-dump-log.sh --cluster-metadata-decoder --files tmp/kraft-combined-logs/_cluster_metadata-0/00000000000000023946.log Usage details
This tool helps to parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment. Option Description ------ ----------- --cluster-metadata-decoder if set, log data will be parsed as cluster metadata records. --deep-iteration if set, uses deep instead of shallow iteration. Automatically set if print- data-log is enabled. --files <String: file1, file2, ...> REQUIRED: The comma separated list of data and index log files to be dumped. --help Print usage information. --index-sanity-check if set, just checks the index sanity without printing its content. This is the same check that is executed on broker startup to determine if an index needs rebuilding or not. --key-decoder-class [String] if set, used to deserialize the keys. This class should implement kafka.serializer. Decoder trait. Custom jar should be available in kafka/libs directory. (default: kafka.serializer.StringDecoder) --max-bytes <Integer: size> Limit the amount of total batches read in bytes avoiding reading the whole .log file(s). (default: 2147483647) --max-message-size <Integer: size> Size of largest message. (default: 5242880) --offsets-decoder if set, log data will be parsed as offset data from the __consumer_offsets topic. --print-data-log if set, printing the messages content when dumping data logs. Automatically set if any decoder option is specified. --remote-log-metadata-decoder If set, log data will be parsed as TopicBasedRemoteLogMetadataManager (RLMM) metadata records. Instead, the value-decoder-class option can be used if a custom RLMM implementation is configured. --share-group-state-decoder If set, log data will be parsed as share group state data from the __share_group_state topic. --skip-record-metadata whether to skip printing metadata for each record. --transaction-log-decoder if set, log data will be parsed as transaction metadata from the __transaction_state topic. --value-decoder-class [String] if set, used to deserialize the messages. This class should implement kafka. serializer.Decoder trait. Custom jar should be available in kafka/libs directory. (default: kafka.serializer. StringDecoder) --verify-index-only if set, just verify the index log without printing its content. --version Display Kafka version. kafka-jmx.sh
The kafka-jmx tool enables you to read JMX metrics from a given endpoint. This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
Usage details
Dump JMX values to standard output. Option Description ------ ----------- --attributes <String: name> The list of attributes to include in the query. This is a comma-separated list. If no attributes are specified all objects will be queried. --date-format <String: format> The date format to use for formatting the time field. See java.text. SimpleDateFormat for options. --help Print usage information. --jmx-auth-prop <String: jmx-auth-prop> A mechanism to pass property in the form 'username=password' when enabling remote JMX with password authentication. --jmx-ssl-enable <Boolean: ssl-enable> Flag to enable remote JMX with SSL. (default: false) --jmx-url <String: service-url> The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details. (default: service:jmx: rmi:///jndi/rmi://:9999/jmxrmi) --object-name <String: name> A JMX object name to use as a query. This can contain wild cards, and this option can be given multiple times to specify more than one query. If no objects are specified all objects will be queried. --one-time [Boolean: one-time] Flag to indicate run once only. (default: false) --report-format <String: report-format> output format name: either 'original', 'properties', 'csv', 'tsv' (default: original) --reporting-interval <Integer: ms> Interval in MS with which to poll jmx stats; default value is 2 seconds. Value of -1 equivalent to setting one-time to true (default: 2000) --version Display Kafka version. --wait Wait for requested JMX objects to become available before starting output. Only supported when the list of objects is non-empty and contains no object name patterns. trogdor.sh
Trogdor is a test framework for Kafka. Trogdor can run benchmarks and other workloads. Trogdor can also inject faults in order to stress test the system. For more information, see Trogdor and TROGDOR.
Usage details
The Trogdor fault injector. Usage: ./trogdor.sh [action] [options] Actions: agent: Run the trogdor agent. coordinator: Run the trogdor coordinator. client: Run the client which communicates with the trogdor coordinator. agent-client: Run the client which communicates with the trogdor agent. help: This help message. kafka-run-class.sh
This kafka-run-class tool is a thin wrapper around the Kafka Java class. It is called by other tools, and should not be run or modified directly.
Usage details
USAGE: ./kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname [opts]