Skip to content

Commit c111521

Browse files
Add http compression level (#1148)
This commit deprecated `http_compression` in favor of `compression_level`. http_compression `false` mapped to compression_level `0`, and `true` mapped to `1`. The default setting has changed from disabling compression to enabling compression at level 1 (best speed) Co-authored-by: Edmo Vamerlatti Costa <11836452+edmocosta@users.noreply.github.com>
1 parent ad1a8d5 commit c111521

File tree

11 files changed

+172
-73
lines changed

11 files changed

+172
-73
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.17.0
2+
- Added support to http compression level. Deprecated `http_compression` in favour of `compression_level` and enabled compression level 1 by default. [#1148](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1148)
3+
14
## 11.16.0
25
- Added support to Serverless Elasticsearch [#1445](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1145)
36

docs/index.asciidoc

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -277,9 +277,9 @@ not reevaluate its DNS value while the keepalive is in effect.
277277
==== HTTP Compression
278278

279279
This plugin always reads compressed responses from {es}.
280-
It _can be configured_ to send compressed bulk requests to {es}.
280+
By default, it sends compressed bulk requests to {es}.
281281

282-
If you are concerned about bandwidth, you can enable <<plugins-{type}s-{plugin}-http_compression>> to trade a small amount of CPU capacity for a significant reduction in network IO.
282+
If you are concerned about bandwidth, you can set a higher <<plugins-{type}s-{plugin}-compression_level>> to trade CPU capacity for a reduction in network IO.
283283

284284
==== Authentication
285285

@@ -310,6 +310,7 @@ This plugin supports the following configuration options plus the
310310
| <<plugins-{type}s-{plugin}-ca_trusted_fingerprint>> |<<string,string>>|No
311311
| <<plugins-{type}s-{plugin}-cloud_auth>> |<<password,password>>|No
312312
| <<plugins-{type}s-{plugin}-cloud_id>> |<<string,string>>|No
313+
| <<plugins-{type}s-{plugin}-compression_level>> |<<number,number>>, one of `[0 ~ 9]`|No
313314
| <<plugins-{type}s-{plugin}-custom_headers>> |<<hash,hash>>|No
314315
| <<plugins-{type}s-{plugin}-data_stream>> |<<string,string>>, one of `["true", "false", "auto"]`|No
315316
| <<plugins-{type}s-{plugin}-data_stream_auto_routing>> |<<boolean,boolean>>|No
@@ -459,6 +460,17 @@ Cloud ID, from the Elastic Cloud web console. If set `hosts` should not be used.
459460
For more details, check out the
460461
{logstash-ref}/connecting-to-cloud.html[Logstash-to-Cloud documentation].
461462

463+
[id="plugins-{type}s-{plugin}-compression_level"]
464+
===== `compression_level`
465+
466+
* Value can be any of: `0`, `1`, `2`, `3`, `4`, `5`, `6`, `7`, `8`, `9`
467+
* Default value is `1`
468+
469+
The gzip compression level. Setting this value to `0` disables compression.
470+
The compression level must be in the range of `1` (best speed) to `9` (best compression).
471+
472+
Increasing the compression level will reduce the network usage but will increase the CPU usage.
473+
462474
[id="plugins-{type}s-{plugin}-data_stream"]
463475
===== `data_stream`
464476

@@ -618,7 +630,7 @@ NOTE: Deprecated, refer to <<plugins-{type}s-{plugin}-silence_errors_in_log>>.
618630
Pass a set of key value pairs as the headers sent in each request to
619631
an elasticsearch node. The headers will be used for any kind of request
620632
(_bulk request, template installation, health checks and sniffing).
621-
These custom headers will be overidden by settings like `http_compression`.
633+
These custom headers will be overidden by settings like `compression_level`.
622634

623635
[id="plugins-{type}s-{plugin}-healthcheck_path"]
624636
===== `healthcheck_path`
@@ -659,11 +671,12 @@ Any special characters present in the URLs here MUST be URL escaped! This means
659671

660672
[id="plugins-{type}s-{plugin}-http_compression"]
661673
===== `http_compression`
674+
deprecated[11.17.0, Replaced by <<plugins-{type}s-{plugin}-compression_level>>]
662675

663676
* Value type is <<boolean,boolean>>
664677
* Default value is `false`
665678

666-
Enable gzip compression on requests.
679+
Setting `true` enables gzip compression level 1 on requests.
667680

668681
This setting allows you to reduce this plugin's outbound network traffic by
669682
compressing each bulk _request_ to {es}.

lib/logstash/outputs/elasticsearch.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ def initialize(*params)
276276
super
277277
setup_ecs_compatibility_related_defaults
278278
setup_ssl_params!
279+
setup_compression_level!
279280
end
280281

281282
def register
@@ -368,6 +369,7 @@ def config_init(params)
368369
params['proxy'] = proxy # do not do resolving again
369370
end
370371
end
372+
371373
super(params)
372374
end
373375

@@ -669,6 +671,20 @@ def setup_ssl_params!
669671
params['ssl_verification_mode'] = @ssl_verification_mode unless @ssl_verification_mode.nil?
670672
end
671673

674+
def setup_compression_level!
675+
@compression_level = normalize_config(:compression_level) do |normalize|
676+
normalize.with_deprecated_mapping(:http_compression) do |http_compression|
677+
if http_compression == true
678+
DEFAULT_ZIP_LEVEL
679+
else
680+
0
681+
end
682+
end
683+
end
684+
685+
params['compression_level'] = @compression_level unless @compression_level.nil?
686+
end
687+
672688
# To be overidden by the -java version
673689
VALID_HTTP_ACTIONS = ["index", "delete", "create", "update"]
674690
def valid_actions

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def bulk(actions)
118118
end
119119

120120
body_stream = StringIO.new
121-
if http_compression
121+
if compression_level?
122122
body_stream.set_encoding "BINARY"
123123
stream_writer = gzip_writer(body_stream)
124124
else
@@ -141,14 +141,14 @@ def bulk(actions)
141141
:batch_offset => (index + 1 - batch_actions.size))
142142
bulk_responses << bulk_send(body_stream, batch_actions)
143143
body_stream.truncate(0) && body_stream.seek(0)
144-
stream_writer = gzip_writer(body_stream) if http_compression
144+
stream_writer = gzip_writer(body_stream) if compression_level?
145145
batch_actions.clear
146146
end
147147
stream_writer.write(as_json)
148148
batch_actions << action
149149
end
150150

151-
stream_writer.close if http_compression
151+
stream_writer.close if compression_level?
152152

153153
logger.debug("Sending final bulk request for batch.",
154154
:action_count => batch_actions.size,
@@ -157,15 +157,15 @@ def bulk(actions)
157157
:batch_offset => (actions.size - batch_actions.size))
158158
bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0
159159

160-
body_stream.close if !http_compression
160+
body_stream.close unless compression_level?
161161
join_bulk_responses(bulk_responses)
162162
end
163163

164164
def gzip_writer(io)
165165
fail(ArgumentError, "Cannot create gzip writer on IO with unread bytes") unless io.eof?
166166
fail(ArgumentError, "Cannot create gzip writer on non-empty IO") unless io.pos == 0
167167

168-
Zlib::GzipWriter.new(io, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
168+
Zlib::GzipWriter.new(io, client_settings.fetch(:compression_level), Zlib::DEFAULT_STRATEGY)
169169
end
170170

171171
def join_bulk_responses(bulk_responses)
@@ -176,7 +176,7 @@ def join_bulk_responses(bulk_responses)
176176
end
177177

178178
def bulk_send(body_stream, batch_actions)
179-
params = http_compression ? {:headers => {"Content-Encoding" => "gzip"}} : {}
179+
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}
180180
response = @pool.post(@bulk_path, params, body_stream.string)
181181

182182
@bulk_response_metrics.increment(response.code.to_s)
@@ -298,8 +298,10 @@ def ssl_options
298298
@_ssl_options ||= client_settings.fetch(:ssl, {})
299299
end
300300

301-
def http_compression
302-
client_settings.fetch(:http_compression, false)
301+
# return true if compression_level is [1..9]
302+
# return false if it is 0
303+
def compression_level?
304+
client_settings.fetch(:compression_level) > 0
303305
end
304306

305307
def build_adapter(options)

lib/logstash/outputs/elasticsearch/http_client_builder.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ def self.build(logger, hosts, params)
88
:pool_max => params["pool_max"],
99
:pool_max_per_route => params["pool_max_per_route"],
1010
:check_connection_timeout => params["validate_after_inactivity"],
11-
:http_compression => params["http_compression"],
11+
:compression_level => params["compression_level"],
1212
:headers => params["custom_headers"] || {}
1313
}
1414

lib/logstash/plugin_mixins/elasticsearch/api_configs.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ module APIConfigs
77
# This module defines common options that can be reused by alternate elasticsearch output plugins such as the elasticsearch_data_streams output.
88

99
DEFAULT_HOST = ::LogStash::Util::SafeURI.new("//127.0.0.1")
10+
DEFAULT_ZIP_LEVEL = 1
1011

1112
CONFIG_PARAMS = {
1213
# Username to authenticate to a secure Elasticsearch cluster
@@ -186,7 +187,14 @@ module APIConfigs
186187
:validate_after_inactivity => { :validate => :number, :default => 10000 },
187188

188189
# Enable gzip compression on requests. Note that response compression is on by default for Elasticsearch v5.0 and beyond
189-
:http_compression => { :validate => :boolean, :default => false },
190+
# Set `true` to enable compression with level 1
191+
# Set `false` to disable compression with level 0
192+
:http_compression => { :validate => :boolean, :default => true, :deprecated => "Set 'compression_level' instead." },
193+
194+
# Number `1` ~ `9` are the gzip compression level
195+
# Set `0` to disable compression
196+
# Set `1` (best speed) to `9` (best compression) to use compression
197+
:compression_level => { :validate => [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ], :default => DEFAULT_ZIP_LEVEL },
190198

191199
# Custom Headers to send on each request to elasticsearch nodes
192200
:custom_headers => { :validate => :hash, :default => {} },

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '11.16.0'
3+
s.version = '11.17.0'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/integration/outputs/compressed_indexing_spec.rb

Lines changed: 49 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -8,63 +8,64 @@
88
}
99
end
1010

11-
describe "indexing with http_compression turned on", :integration => true do
12-
let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type) }
13-
let(:index) { 10.times.collect { rand(10).to_s }.join("") }
14-
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" }
15-
let(:event_count) { 10000 + rand(500) }
16-
let(:events) { event_count.times.map { event }.to_a }
17-
let(:config) {
18-
{
19-
"hosts" => get_host_port,
20-
"index" => index,
21-
"http_compression" => true
11+
[ {"http_compression" => true}, {"compression_level" => 1} ].each do |compression_config|
12+
describe "indexing with http_compression turned on", :integration => true do
13+
let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type) }
14+
let(:index) { 10.times.collect { rand(10).to_s }.join("") }
15+
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" }
16+
let(:event_count) { 10000 + rand(500) }
17+
let(:events) { event_count.times.map { event }.to_a }
18+
let(:config) {
19+
{
20+
"hosts" => get_host_port,
21+
"index" => index
22+
}
2223
}
23-
}
24-
subject { LogStash::Outputs::ElasticSearch.new(config) }
24+
subject { LogStash::Outputs::ElasticSearch.new(config.merge(compression_config)) }
2525

26-
let(:es_url) { "http://#{get_host_port}" }
27-
let(:index_url) {"#{es_url}/#{index}"}
28-
let(:http_client_options) { {} }
29-
let(:http_client) do
30-
Manticore::Client.new(http_client_options)
31-
end
26+
let(:es_url) { "http://#{get_host_port}" }
27+
let(:index_url) {"#{es_url}/#{index}"}
28+
let(:http_client_options) { {} }
29+
let(:http_client) do
30+
Manticore::Client.new(http_client_options)
31+
end
3232

33-
before do
34-
subject.register
35-
subject.multi_receive([])
36-
end
33+
before do
34+
subject.register
35+
subject.multi_receive([])
36+
end
3737

38-
shared_examples "an indexer" do
39-
it "ships events" do
40-
subject.multi_receive(events)
38+
shared_examples "an indexer" do
39+
it "ships events" do
40+
subject.multi_receive(events)
4141

42-
http_client.post("#{es_url}/_refresh").call
42+
http_client.post("#{es_url}/_refresh").call
4343

44-
response = http_client.get("#{index_url}/_count?q=*")
45-
result = LogStash::Json.load(response.body)
46-
cur_count = result["count"]
47-
expect(cur_count).to eq(event_count)
44+
response = http_client.get("#{index_url}/_count?q=*")
45+
result = LogStash::Json.load(response.body)
46+
cur_count = result["count"]
47+
expect(cur_count).to eq(event_count)
4848

49-
response = http_client.get("#{index_url}/_search?q=*&size=1000")
50-
result = LogStash::Json.load(response.body)
51-
result["hits"]["hits"].each do |doc|
52-
if ESHelper.es_version_satisfies?("< 8")
53-
expect(doc["_type"]).to eq(type)
54-
else
55-
expect(doc).not_to include("_type")
49+
response = http_client.get("#{index_url}/_search?q=*&size=1000")
50+
result = LogStash::Json.load(response.body)
51+
result["hits"]["hits"].each do |doc|
52+
if ESHelper.es_version_satisfies?("< 8")
53+
expect(doc["_type"]).to eq(type)
54+
else
55+
expect(doc).not_to include("_type")
56+
end
57+
expect(doc["_index"]).to eq(index)
5658
end
57-
expect(doc["_index"]).to eq(index)
5859
end
5960
end
60-
end
6161

62-
it "sets the correct content-encoding header and body is compressed" do
63-
expect(subject.client.pool.adapter.client).to receive(:send).
64-
with(anything, anything, {:headers=>{"Content-Encoding"=>"gzip", "Content-Type"=>"application/json"}, :body => a_valid_gzip_encoded_string}).
65-
and_call_original
66-
subject.multi_receive(events)
67-
end
62+
it "sets the correct content-encoding header and body is compressed" do
63+
expect(subject.client.pool.adapter.client).to receive(:send).
64+
with(anything, anything, {:headers=>{"Content-Encoding"=>"gzip", "Content-Type"=>"application/json"}, :body => a_valid_gzip_encoded_string}).
65+
and_call_original
66+
subject.multi_receive(events)
67+
end
6868

69-
it_behaves_like("an indexer")
70-
end
69+
it_behaves_like("an indexer")
70+
end
71+
end

spec/integration/outputs/index_spec.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,8 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
262262
let(:config) {
263263
{
264264
"hosts" => get_host_port,
265-
"index" => index
265+
"index" => index,
266+
"http_compression" => false
266267
}
267268
}
268269
it_behaves_like("an indexer")
@@ -273,7 +274,8 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
273274
let(:config) {
274275
{
275276
"hosts" => get_host_port,
276-
"index" => index
277+
"index" => index,
278+
"http_compression" => false
277279
}
278280
}
279281
it_behaves_like("an indexer")
@@ -291,7 +293,8 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
291293
"password" => password,
292294
"ssl_enabled" => true,
293295
"ssl_certificate_authorities" => cacert,
294-
"index" => index
296+
"index" => index,
297+
"http_compression" => false
295298
}
296299
end
297300

@@ -351,7 +354,8 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
351354
"hosts" => ["https://#{CGI.escape(user)}:#{CGI.escape(password)}@elasticsearch:9200"],
352355
"ssl_enabled" => true,
353356
"ssl_certificate_authorities" => "spec/fixtures/test_certs/test.crt",
354-
"index" => index
357+
"index" => index,
358+
"http_compression" => false
355359
}
356360
end
357361

0 commit comments

Comments
 (0)