Skip to content

Commit 6e2e2b2

Browse files
committed
out_kafka/out_kafka2/out_kafka_buffered: Provide hash function choices
Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
1 parent a9fd985 commit 6e2e2b2

File tree

4 files changed

+27
-9
lines changed

4 files changed

+27
-9
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst
202202
headers_from_record (hash) :default => {}
203203
use_default_for_unknown_topic (bool) :default => false
204204
discard_kafka_delivery_failed (bool) :default => false (No discard)
205+
partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
205206

206207
<format>
207208
@type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
@@ -343,6 +344,7 @@ Support of fluentd v0.12 has ended. `kafka_buffered` will be an alias of `kafka2
343344
exclude_topic_key (bool) :default => false
344345
exclude_partition_key (bool) :default => false
345346
get_kafka_client_log (bool) :default => false
347+
partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
346348

347349
# See fluentd document for buffer related parameters: https://docs.fluentd.org/v/0.12/buffer
348350

@@ -385,6 +387,7 @@ This plugin uses ruby-kafka producer for writing data. For performance and relia
385387
output_include_time (bool) :default => false
386388
exclude_topic_key (bool) :default => false
387389
exclude_partition_key (bool) :default => false
390+
partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
388391

389392
# ruby-kafka producer options
390393
max_send_retries (integer) :default => 1

lib/fluent/plugin/out_kafka.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ class Fluent::KafkaOutput < Fluent::Output
1919
config_param :default_message_key, :string, :default => nil
2020
config_param :default_partition_key, :string, :default => nil
2121
config_param :default_partition, :integer, :default => nil
22+
config_param :partitioner_hash_function, :enum, list: [:crc32, :murmur2], :default => :crc32,
23+
:desc => "Specify kafka patrtitioner hash algorithm"
2224
config_param :client_id, :string, :default => 'kafka'
2325
config_param :sasl_over_ssl, :bool, :default => true,
2426
:desc => <<-DESC
@@ -109,15 +111,18 @@ def refresh_client
109111
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert_file_path: @ssl_ca_cert,
110112
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
111113
sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl,
112-
ssl_verify_hostname: @ssl_verify_hostname)
114+
ssl_verify_hostname: @ssl_verify_hostname,
115+
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
113116
elsif @username != nil && @password != nil
114117
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert_file_path: @ssl_ca_cert,
115118
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
116-
sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
119+
sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname,
120+
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
117121
else
118122
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert_file_path: @ssl_ca_cert,
119123
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
120-
sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
124+
sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname,
125+
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
121126
end
122127
log.info "initialized kafka producer: #{@client_id}"
123128
else

lib/fluent/plugin/out_kafka2.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ class Fluent::Kafka2Output < Output
2424
config_param :partition_key_key, :string, :default => 'partition_key', :desc => "Field for kafka partition key"
2525
config_param :default_partition_key, :string, :default => nil
2626
config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition"
27+
config_param :partitioner_hash_function, :enum, list: [:crc32, :murmur2], :default => :crc32,
28+
:desc => "Specify kafka patrtitioner hash algorithm"
2729
config_param :default_partition, :integer, :default => nil
2830
config_param :use_default_for_unknown_topic, :bool, :default => false, :desc => "If true, default_topic is used when topic not found"
2931
config_param :client_id, :string, :default => 'fluentd'
@@ -99,17 +101,20 @@ def refresh_client(raise_error = true)
99101
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
100102
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
101103
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
102-
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
104+
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname,
105+
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
103106
elsif @username != nil && @password != nil
104107
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
105108
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
106109
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
107-
ssl_verify_hostname: @ssl_verify_hostname)
110+
ssl_verify_hostname: @ssl_verify_hostname,
111+
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
108112
else
109113
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
110114
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
111115
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl,
112-
ssl_verify_hostname: @ssl_verify_hostname)
116+
ssl_verify_hostname: @ssl_verify_hostname,
117+
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
113118
end
114119
log.info "initialized kafka producer: #{@client_id}"
115120
rescue Exception => e

lib/fluent/plugin/out_kafka_buffered.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ class Fluent::KafkaOutputBuffered < Fluent::BufferedOutput
2626
config_param :default_partition_key, :string, :default => nil
2727
config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition"
2828
config_param :default_partition, :integer, :default => nil
29+
config_param :partitioner_hash_function, :enum, list: [:crc32, :murmur2], :default => :crc32,
30+
:desc => "Specify kafka patrtitioner hash algorithm"
2931
config_param :client_id, :string, :default => 'kafka'
3032
config_param :idempotent, :bool, :default => false, :desc => 'Enable idempotent producer'
3133
config_param :sasl_over_ssl, :bool, :default => true,
@@ -133,15 +135,18 @@ def refresh_client(raise_error = true)
133135
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
134136
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
135137
sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl,
136-
ssl_verify_hostname: @ssl_verify_hostname)
138+
ssl_verify_hostname: @ssl_verify_hostname,
139+
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
137140
elsif @username != nil && @password != nil
138141
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
139142
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
140-
sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
143+
sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname,
144+
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
141145
else
142146
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
143147
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
144-
sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
148+
sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname,
149+
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
145150
end
146151
log.info "initialized kafka producer: #{@client_id}"
147152
else

0 commit comments

Comments
 (0)