Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 5.2.0
- ES|QL support [#233](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/233)

## 4.22.0
- Add "cursor"-like index tracking [#205](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205)

Expand Down
126 changes: 122 additions & 4 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,110 @@ The next scheduled run:
* uses {ref}/point-in-time-api.html#point-in-time-api[Point in time (PIT)] + {ref}/paginate-search-results.html#search-after[Search after] to paginate through all the data, and
* updates the value of the field at the end of the pagination.

[id="plugins-{type}s-{plugin}-esql"]
==== {esql} support

.Technical Preview
****
The {esql} feature that allows using ES|QL queries with this plugin is in Technical Preview.
Configuration options and implementation details are subject to change in minor releases without being preceded by deprecation warnings.
****

{es} Query Language ({esql}) provides a SQL-like interface for querying your {es} data.

To use {esql}, this plugin needs to be installed in {ls} 8.17.4 or newer, and must be connected to {es} 8.11 or newer.

To configure {esql} query in the plugin, set the `query_type` to `esql` and provide your {esql} query in the `query` parameter.

IMPORTANT: {esql} is evolving and may still have limitations with regard to result size or supported field types. We recommend understanding https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-limitations.html[ES|QL current limitations] before using it in production environments.

The following is a basic scheduled {esql} query that runs hourly:
[source, ruby]
input {
elasticsearch {
id => hourly_cron_job
hosts => [ 'https://..']
api_key => '....'
query_type => 'esql'
query => '
FROM food-index
| WHERE spicy_level = "hot" AND @timestamp > NOW() - 1 hour
| LIMIT 500
'
schedule => '0 * * * *' # every hour at min 0
}
}

Set `config.support_escapes: true` in `logstash.yml` if you need to escape special chars in the query.

NOTE: With {esql} query, {ls} doesn't generate `event.original`.

[id="plugins-{type}s-{plugin}-esql-event-mapping"]
===== Mapping {esql} result to {ls} event
{esql} returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries).
The plugin maps each value entry to an event, populating corresponding fields.
For example, a query might produce a table like:

[cols="2,1,1,1,2",options="header"]
|===
|`timestamp` |`user_id` | `action` | `status.code` | `status.desc`

|2025-04-10T12:00:00 |123 |login |200 | Success
|2025-04-10T12:05:00 |456 |purchase |403 | Forbidden (unauthorized user)
|===

For this case, the plugin emits two events look like
[source, json]
[
{
"timestamp": "2025-04-10T12:00:00",
"user_id": 123,
"action": "login",
"status": {
"code": 200,
"desc": "Success"
}
},
{
"timestamp": "2025-04-10T12:05:00",
"user_id": 456,
"action": "purchase",
"status": {
"code": 403,
"desc": "Forbidden (unauthorized user)"
}
}
]

NOTE: If your index has a mapping with sub-objects where `status.code` and `status.desc` actually dotted fields, they appear in {ls} events as a nested structure.

[id="plugins-{type}s-{plugin}-esql-multifields"]
===== Conflict on multi-fields

{esql} query fetches all parent and sub-fields fields if your {es} index has https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] or https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/subobjects[subobjects].
Since {ls} events cannot contain parent field's concrete value and sub-field values together, the plugin ignores sub-fields with warning and includes parent.
We recommend using the `RENAME` (or `DROP` to avoid warnings) keyword in your {esql} query explicitly rename the fields to include sub-fields into the event.

This a common occurrence if your template or mapping follows the pattern of always indexing strings as "text" (`field`) + " keyword" (`field.keyword`) multi-field.
In this case it's recommended to do `KEEP field` if the string is identical and there is only one subfield as the engine will optimize and retrieve the keyword, otherwise you can do `KEEP field.keyword | RENAME field.keyword as field`.

To illustrate the situation with example, assuming your mapping has a time `time` field with `time.min` and `time.max` sub-fields as following:
[source, ruby]
"properties": {
"time": { "type": "long" },
"time.min": { "type": "long" },
"time.max": { "type": "long" }
}

The {esql} result will contain all three fields but the plugin cannot map them into {ls} event.
To avoid this, you can use the `RENAME` keyword to rename the `time` parent field to get all three fields with unique fields.
[source, ruby]
...
query => 'FROM my-index | RENAME time AS time.current'
...

For comprehensive {esql} syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[{esql} documentation].

[id="plugins-{type}s-{plugin}-options"]
==== Elasticsearch Input configuration options

Expand All @@ -254,6 +358,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-proxy>> |<<uri,uri>>|No
| <<plugins-{type}s-{plugin}-query>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-query_type>> |<<string,string>>, one of `["dsl","esql"]`|No
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations"]`|No
| <<plugins-{type}s-{plugin}-request_timeout_seconds>> | <<number,number>>|No
| <<plugins-{type}s-{plugin}-schedule>> |<<string,string>>|No
Expand Down Expand Up @@ -495,22 +600,35 @@ environment variables e.g. `proxy => '${LS_PROXY:}'`.
* Value type is <<string,string>>
* Default value is `'{ "sort": [ "_doc" ] }'`

The query to be executed. Read the {ref}/query-dsl.html[Elasticsearch query DSL
documentation] for more information.
The query to be executed.
Accepted query shape is DSL or {esql} (when `query_type => 'esql'`).
Read the {ref}/query-dsl.html[{es} query DSL documentation] or {ref}/esql.html[{esql} documentation] for more information.

When <<plugins-{type}s-{plugin}-search_api>> resolves to `search_after` and the query does not specify `sort`,
the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the query. Please refer to the {ref}/paginate-search-results.html#search-after[Elasticsearch search_after] parameter to know more.

[id="plugins-{type}s-{plugin}-query_type"]
===== `query_type`

* Value can be `dsl` or `esql`
* Default value is `dsl`

Defines the <<plugins-{type}s-{plugin}-query>> shape.
When `dsl`, the query shape must be valid {es} JSON-style string.
When `esql`, the query shape must be a valid {esql} string and `index`, `size`, `slices`, `search_api`, `docinfo`, `docinfo_target`, `docinfo_fields`, `response_type` and `tracking_field` parameters are not allowed.

[id="plugins-{type}s-{plugin}-response_type"]
===== `response_type`

* Value can be any of: `hits`, `aggregations`
* Value can be any of: `hits`, `aggregations`, `esql`
* Default value is `hits`

Which part of the result to transform into Logstash events when processing the
response from the query.

The default `hits` will generate one event per returned document (i.e. "hit").
When set to `aggregations`, a single Logstash event will be generated with the

When set to `aggregations`, a single {ls} event will be generated with the
contents of the `aggregations` object of the query's response. In this case the
`hits` object will be ignored. The parameter `size` will be always be set to
0 regardless of the default or user-defined value set in this plugin.
Expand Down
114 changes: 78 additions & 36 deletions lib/logstash/inputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
require 'logstash/inputs/elasticsearch/paginated_search'
require 'logstash/inputs/elasticsearch/aggregation'
require 'logstash/inputs/elasticsearch/cursor_tracker'
require 'logstash/inputs/elasticsearch/esql'

include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
Expand All @@ -96,15 +97,21 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
# The index or alias to search.
config :index, :validate => :string, :default => "logstash-*"

# The query to be executed. Read the Elasticsearch query DSL documentation
# for more info
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
# A type of Elasticsearch query, provided by @query. This will validate query shape and other params.
config :query_type, :validate => %w[dsl esql], :default => 'dsl'

# The query to be executed. DSL or ES|QL (when `query_type => 'esql'`) query shape is accepted.
# Read the following documentations for more info
# Query DSL: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
# ES|QL: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html
config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }'

# This allows you to speccify the response type: either hits or aggregations
# where hits: normal search request
# aggregations: aggregation request
config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits'
# This allows you to specify the DSL response type: one of [hits, aggregations]
# where
# hits: normal search request
# aggregations: aggregation request
# Note that this param is invalid when `query_type => 'esql'`, ES|QL response shape is always a tabular format
config :response_type, :validate => %w[hits aggregations], :default => 'hits'

# This allows you to set the maximum number of hits returned per scroll.
config :size, :validate => :number, :default => 1000
Expand Down Expand Up @@ -293,6 +300,9 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze
INTERNAL_ORIGIN_HEADER = { 'x-elastic-product-origin' => 'logstash-input-elasticsearch'}.freeze

LS_ESQL_SUPPORT_VERSION = "8.17.4" # the version started using elasticsearch-ruby v8
ES_ESQL_SUPPORT_VERSION = "8.11.0"

def initialize(params={})
super(params)

Expand All @@ -309,10 +319,17 @@ def register
fill_hosts_from_cloud_id
setup_ssl_params!

@base_query = LogStash::Json.load(@query)
if @slices
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
if @query_type == 'esql'
validate_ls_version_for_esql_support!
validate_esql_query!
not_allowed_options = original_params.keys & %w(index size slices search_api docinfo docinfo_target docinfo_fields response_type tracking_field)
raise(LogStash::ConfigurationError, "Configured #{not_allowed_options} params are not allowed while using ES|QL query") if not_allowed_options&.size > 1
else
@base_query = LogStash::Json.load(@query)
if @slices
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
end
end

@retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`")
Expand Down Expand Up @@ -348,11 +365,13 @@ def register

test_connection!

validate_es_for_esql_support!

setup_serverless

setup_search_api

setup_query_executor
@query_executor = create_query_executor

setup_cursor_tracker

Expand All @@ -370,16 +389,6 @@ def run(output_queue)
end
end

def get_query_object
if @cursor_tracker
query = @cursor_tracker.inject_cursor(@query)
@logger.debug("new query is #{query}")
else
query = @query
end
LogStash::Json.load(query)
end

##
# This can be called externally from the query_executor
public
Expand All @@ -390,6 +399,23 @@ def push_hit(hit, output_queue, root_field = '_source')
record_last_value(event)
end

def decorate_event(event)
decorate(event)
end

private

def get_query_object
return @query if @query_type == 'esql'
if @cursor_tracker
query = @cursor_tracker.inject_cursor(@query)
@logger.debug("new query is #{query}")
else
query = @query
end
LogStash::Json.load(query)
end

def record_last_value(event)
@cursor_tracker.record_last_value(event) if @tracking_field
end
Expand Down Expand Up @@ -421,8 +447,6 @@ def set_docinfo_fields(hit, event)
event.set(@docinfo_target, docinfo_target)
end

private

def hosts_default?(hosts)
hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? )
end
Expand Down Expand Up @@ -700,18 +724,16 @@ def setup_search_api

end

def setup_query_executor
@query_executor = case @response_type
when 'hits'
if @resolved_search_api == "search_after"
LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self)
else
logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8
LogStash::Inputs::Elasticsearch::Scroll.new(@client, self)
end
when 'aggregations'
LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self)
end
def create_query_executor
return LogStash::Inputs::Elasticsearch::Esql.new(@client, self) if @query_type == 'esql'

# DSL query executor
return LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self) if @response_type == 'aggregations'
# response_type is hits, executor can be search_after or scroll type
return LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self) if @resolved_search_api == "search_after"

logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8
LogStash::Inputs::Elasticsearch::Scroll.new(@client, self)
end

def setup_cursor_tracker
Expand Down Expand Up @@ -750,6 +772,26 @@ def get_transport_client_class
::Elastic::Transport::Transport::HTTP::Manticore
end

def validate_ls_version_for_esql_support!
if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LS_ESQL_SUPPORT_VERSION)
fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{LS_ESQL_SUPPORT_VERSION}")
end
end

def validate_esql_query!
fail(LogStash::ConfigurationError, "`query` cannot be empty") if @query.strip.empty?
source_commands = %w[FROM ROW SHOW]
contains_source_command = source_commands.any? { |source_command| @query.strip.start_with?(source_command) }
fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command
end

def validate_es_for_esql_support!
return unless @query_type == 'esql'
# make sure connected ES supports ES|QL (8.11+)
es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create(ES_ESQL_SUPPORT_VERSION)
fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{ES_ESQL_SUPPORT_VERSION} version.") unless es_supports_esql
end

module URIOrEmptyValidator
##
# @override to provide :uri_or_empty validator
Expand Down
Loading