Skip to content

Commit a2674cf

Browse files
kaisechengjsvd
andauthored
shut down pipeline when finish register fails (#1151)
Prior to this change, the errors in the post-register phase (finish_register) behaved differently: template manager logged failure; ILM setup loop printed error messages. Continuing the pipeline would not be meaningful. With this commit, ConfigurationError is introduced for critical bootstrap failures, such as when the template path is not found. This change stops the pipeline when it gets ConfigurationError or 4xx status code calling template or ILM related API. For HTTP 429 too many request, finish_register retries the bootstrap --------- Co-authored-by: João Duarte <jsvd@users.noreply.github.com>
1 parent 70654fb commit a2674cf

File tree

9 files changed

+61
-22
lines changed

9 files changed

+61
-22
lines changed

.travis.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@ import:
22
- logstash-plugins/.ci:travis/travis.yml@1.x
33

44
env:
5-
- INTEGRATION=false ELASTIC_STACK_VERSION=6.x
65
- INTEGRATION=false ELASTIC_STACK_VERSION=7.x
76
- INTEGRATION=false ELASTIC_STACK_VERSION=8.x SNAPSHOT=true
8-
- INTEGRATION=true ELASTIC_STACK_VERSION=6.x
97
- INTEGRATION=true ELASTIC_STACK_VERSION=7.x
108
- INTEGRATION=true ELASTIC_STACK_VERSION=7.x SNAPSHOT=true LOG_LEVEL=info
119
- INTEGRATION=true ELASTIC_STACK_VERSION=8.x SNAPSHOT=true LOG_LEVEL=info

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
## 11.20.0
2+
- Changed the register to initiate pipeline shutdown upon bootstrap failure instead of simply logging the error [#1151](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1151)
3+
14
## 11.19.0
2-
- Added `filter_path` to bulk requests to reduce the size of responses from elasticsearch
5+
- Added `filter_path` to bulk requests to reduce the size of responses from elasticsearch [#1154](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1154)
36

47
## 11.18.0
58
- Added request header `Elastic-Api-Version` for serverless [#1147](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1147)

lib/logstash/outputs/elasticsearch.rb

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,12 +322,29 @@ def register
322322
@bulk_request_metrics = metric.namespace(:bulk_requests)
323323
@document_level_metrics = metric.namespace(:documents)
324324

325+
@shutdown_from_finish_register = Concurrent::AtomicBoolean.new(false)
325326
@after_successful_connection_thread = after_successful_connection do
326327
begin
327328
finish_register
328329
true # thread.value
330+
rescue LogStash::ConfigurationError, LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
331+
return e if pipeline_shutdown_requested?
332+
333+
# retry when 429
334+
@logger.debug("Received a 429 status code during registration. Retrying..") && retry if too_many_requests?(e)
335+
336+
# shut down pipeline
337+
if execution_context&.agent.respond_to?(:stop_pipeline)
338+
details = { message: e.message, exception: e.class }
339+
details[:backtrace] = e.backtrace if @logger.debug?
340+
@logger.error("Failed to bootstrap. Pipeline \"#{execution_context.pipeline_id}\" is going to shut down", details)
341+
342+
@shutdown_from_finish_register.make_true
343+
execution_context.agent.stop_pipeline(execution_context.pipeline_id)
344+
end
345+
346+
e
329347
rescue => e
330-
# we do not want to halt the thread with an exception as that has consequences for LS
331348
e # thread.value
332349
ensure
333350
@after_successful_connection_done.make_true
@@ -450,7 +467,11 @@ def close
450467
private
451468

452469
def stop_after_successful_connection_thread
453-
@after_successful_connection_thread.join unless @after_successful_connection_thread.nil?
470+
# avoid deadlock when finish_register calling execution_context.agent.stop_pipeline
471+
# stop_pipeline triggers plugin close and the plugin close waits for after_successful_connection_thread to join
472+
return if @shutdown_from_finish_register&.true?
473+
474+
@after_successful_connection_thread.join if @after_successful_connection_thread&.alive?
454475
end
455476

456477
# Convert the event into a 3-tuple of action, params and event hash
@@ -599,6 +620,7 @@ def install_template
599620
details = { message: e.message, exception: e.class, backtrace: e.backtrace }
600621
details[:body] = e.response_body if e.respond_to?(:response_body)
601622
@logger.error("Failed to install template", details)
623+
raise e if register_termination_error?(e)
602624
end
603625

604626
def setup_ecs_compatibility_related_defaults

lib/logstash/outputs/elasticsearch/data_stream_support.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,12 @@ def inherited_internal_config_param?(name)
145145
# @note assumes to be running AFTER {after_successful_connection} completed, due ES version checks
146146
# @return [Gem::Version] if ES supports DS nil (or raise) otherwise
147147
def assert_es_version_supports_data_streams
148-
fail 'no last_es_version' unless last_es_version # assert - should not happen
148+
raise LogStash::ConfigurationError 'no last_es_version' unless last_es_version # assert - should not happen
149149
es_version = ::Gem::Version.create(last_es_version)
150150
if es_version < ::Gem::Version.create(DATA_STREAMS_ORIGIN_ES_VERSION)
151151
@logger.error "Elasticsearch version does not support data streams, Logstash might end up writing to an index", es_version: es_version.version
152152
# NOTE: when switching to synchronous check from register, this should be a ConfigurationError
153-
raise LogStash::Error, "A data_stream configuration is only supported since Elasticsearch #{DATA_STREAMS_ORIGIN_ES_VERSION} " +
153+
raise LogStash::ConfigurationError, "A data_stream configuration is only supported since Elasticsearch #{DATA_STREAMS_ORIGIN_ES_VERSION} " +
154154
"(detected version #{es_version.version}), please upgrade your cluster"
155155
end
156156
es_version # return truthy

lib/logstash/outputs/elasticsearch/http_client/pool.rb

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,12 @@ def forbidden?
2828
@response_code == 403
2929
end
3030

31+
def too_many_requests?
32+
@response_code == 429
33+
end
34+
3135
end
36+
3237
class HostUnreachableError < Error;
3338
attr_reader :original_error, :url
3439

@@ -69,7 +74,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
6974
@adapter = adapter
7075
@metric = options[:metric]
7176
@initial_urls = initial_urls
72-
77+
7378
raise ArgumentError, "No URL Normalizer specified!" unless options[:url_normalizer]
7479
@url_normalizer = options[:url_normalizer]
7580
DEFAULT_OPTIONS.merge(options).tap do |merged|
@@ -159,7 +164,7 @@ def until_stopped(task_name, delay)
159164
:error_message => e.message,
160165
:class => e.class.name,
161166
:backtrace => e.backtrace
162-
)
167+
)
163168
end
164169
end
165170
end
@@ -197,11 +202,11 @@ def check_sniff
197202
sniff(nodes)
198203
end
199204
end
200-
205+
201206
def major_version(version_string)
202207
version_string.split('.').first.to_i
203208
end
204-
209+
205210
def sniff(nodes)
206211
nodes.map do |id,info|
207212
# Skip master-only nodes
@@ -360,7 +365,7 @@ def normalize_url(uri)
360365

361366
def update_urls(new_urls)
362367
return if new_urls.nil?
363-
368+
364369
# Normalize URLs
365370
new_urls = new_urls.map(&method(:normalize_url))
366371

@@ -388,14 +393,14 @@ def update_urls(new_urls)
388393
if state_changes[:removed].size > 0 || state_changes[:added].size > 0
389394
logger.info? && logger.info("Elasticsearch pool URLs updated", :changes => state_changes)
390395
end
391-
396+
392397
# Run an inline healthcheck anytime URLs are updated
393398
# This guarantees that during startup / post-startup
394399
# sniffing we don't have idle periods waiting for the
395400
# periodic sniffer to allow new hosts to come online
396-
healthcheck!
401+
healthcheck!
397402
end
398-
403+
399404
def size
400405
@state_mutex.synchronize { @url_info.size }
401406
end

lib/logstash/outputs/elasticsearch/template_manager.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def self.load_default_template(es_major_version, ecs_compatibility)
3737
template_path = default_template_path(es_major_version, ecs_compatibility)
3838
read_template_file(template_path)
3939
rescue => e
40-
fail "Failed to load default template for Elasticsearch v#{es_major_version} with ECS #{ecs_compatibility}; caused by: #{e.inspect}"
40+
raise LogStash::ConfigurationError, "Failed to load default template for Elasticsearch v#{es_major_version} with ECS #{ecs_compatibility}; caused by: #{e.inspect}"
4141
end
4242

4343
def self.install(client, template_endpoint, template_name, template, template_overwrite)
@@ -99,9 +99,11 @@ def self.default_template_path(es_major_version, ecs_compatibility=:disabled)
9999
end
100100

101101
def self.read_template_file(template_path)
102-
raise ArgumentError, "Template file '#{template_path}' could not be found" unless ::File.exists?(template_path)
102+
raise LogStash::ConfigurationError, "Template file '#{template_path}' could not be found" unless ::File.exists?(template_path)
103103
template_data = ::IO.read(template_path)
104104
LogStash::Json.load(template_data)
105+
rescue => e
106+
raise LogStash::ConfigurationError, "Failed to load template file '#{template_path}': #{e.message}"
105107
end
106108

107109
def self.template_endpoint(plugin)

lib/logstash/plugin_mixins/elasticsearch/common.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,5 +407,14 @@ def dig_value(val, first_key, *rest_keys)
407407
return val if rest_keys.empty? || val == nil
408408
dig_value(val, *rest_keys)
409409
end
410+
411+
def register_termination_error?(e)
412+
e.is_a?(LogStash::ConfigurationError) || e.is_a?(LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError)
413+
end
414+
415+
def too_many_requests?(e)
416+
e.is_a?(LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError) &&
417+
e.too_many_requests?
418+
end
410419
end
411420
end; end; end

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '11.19.0'
3+
s.version = '11.20.0'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/integration/outputs/no_es_on_startup_spec.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,25 +33,25 @@
3333
end
3434

3535
it 'should ingest events when Elasticsearch recovers before documents are sent' do
36-
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(:get_es_version).and_raise(
36+
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(:get_root_path).with(any_args).and_raise(
3737
::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new StandardError.new("TEST: before docs are sent"), 'http://test.es/'
3838
)
3939
subject.register
40-
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(:get_es_version).and_return(ESHelper.es_version)
40+
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(:get_root_path).with(any_args).and_call_original
4141
subject.multi_receive([event1, event2])
4242
@es.indices.refresh
4343
r = @es.search(index: 'logstash-*')
4444
expect(r).to have_hits(2)
4545
end
4646

4747
it 'should ingest events when Elasticsearch recovers after documents are sent' do
48-
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(:get_es_version).and_raise(
48+
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(:get_root_path).with(any_args).and_raise(
4949
::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new StandardError.new("TEST: after docs are sent"), 'http://test.es/'
5050
)
5151
subject.register
5252
Thread.new do
5353
sleep 4
54-
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(:get_es_version).and_return(ESHelper.es_version)
54+
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(:get_root_path).with(any_args).and_call_original
5555
end
5656
subject.multi_receive([event1, event2])
5757
@es.indices.refresh

0 commit comments

Comments
 (0)