- Notifications
You must be signed in to change notification settings - Fork 86
ES|QL support #233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ES|QL support #233
Changes from 11 commits
a307db2
9c35f22
6f99055
086a592
e30e0f9
7746c14
76303d8
1fb29f7
5d47f2f
c291e24
22e72e9
af6e24a
4ce6fa4
4ed69ff
0725f98
cfb36f3
a92a71e
d4f559d
65eb675
789f467
fefe6a0
e108c87
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
| @@ -230,6 +230,46 @@ The next scheduled run: | |
* uses {ref}/point-in-time-api.html#point-in-time-api[Point in time (PIT)] + {ref}/paginate-search-results.html#search-after[Search after] to paginate through all the data, and | ||
* updates the value of the field at the end of the pagination. | ||
| ||
[id="plugins-{type}s-{plugin}-esql"] | ||
==== ES|QL support | ||
{es} Query Language (ES|QL) provides a SQL-like interface for querying your {es} data. | ||
| ||
To utilize the ES|QL feature with this plugin, the following version requirements must be met: | ||
[cols="1,2",options="header"] | ||
|=== | ||
|Component |Minimum version | ||
|{es} |8.11.0 or newer | ||
|{ls} |8.17.4 or newer | ||
|This plugin |4.23.0+ (4.x series) or 5.2.0+ (5.x series) | ||
mashhurs marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
|=== | ||
| ||
To configure ES|QL query in the plugin, set the `response_type` to `esql` and provide your ES|QL query in the `query` parameter. | ||
| ||
| ||
IMPORTANT: We recommend understanding https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-limitations.html[ES|QL current limitations] before using it in production environments. | ||
mashhurs marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| ||
The following is a basic scheduled ES|QL query that runs hourly: | ||
[source, ruby] | ||
input { | ||
elasticsearch { | ||
id => hourly_cron_job | ||
hosts => [ 'https://..'] | ||
api_key => '....' | ||
response_type => 'esql' | ||
query => ' | ||
FROM food-index | ||
| WHERE spicy_level = "hot" AND @timestamp > NOW() - 1 hour | ||
| LIMIT 500 | ||
' | ||
schedule => '0 * * * *' # every hour at min 0 | ||
} | ||
} | ||
| ||
Set `config.support_escapes: true` in `logstash.yml` if you need to escape special chars in the query. | ||
| ||
NOTE: With ES|QL query, {ls} doesn't generate `event.original` | ||
| ||
For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[official {es} documentation]. | ||
| ||
[id="plugins-{type}s-{plugin}-options"] | ||
==== Elasticsearch Input configuration options | ||
| ||
| @@ -257,7 +297,7 @@ Please check out <<plugins-{type}s-{plugin}-obsolete-options>> for details. | |
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No | ||
| <<plugins-{type}s-{plugin}-proxy>> |<<uri,uri>>|No | ||
| <<plugins-{type}s-{plugin}-query>> |<<string,string>>|No | ||
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations"]`|No | ||
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations","esql"]`|No | ||
| <<plugins-{type}s-{plugin}-request_timeout_seconds>> | <<number,number>>|No | ||
| <<plugins-{type}s-{plugin}-schedule>> |<<string,string>>|No | ||
| <<plugins-{type}s-{plugin}-schedule_overlap>> |<<boolean,boolean>>|No | ||
| @@ -507,17 +547,50 @@ the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the qu | |
[id="plugins-{type}s-{plugin}-response_type"] | ||
===== `response_type` | ||
| ||
* Value can be any of: `hits`, `aggregations` | ||
* Value can be any of: `hits`, `aggregations`, `esql` | ||
* Default value is `hits` | ||
| ||
Which part of the result to transform into Logstash events when processing the | ||
response from the query. | ||
| ||
The default `hits` will generate one event per returned document (i.e. "hit"). | ||
| ||
When set to `aggregations`, a single Logstash event will be generated with the | ||
contents of the `aggregations` object of the query's response. In this case the | ||
`hits` object will be ignored. The parameter `size` will be always be set to | ||
0 regardless of the default or user-defined value set in this plugin. | ||
| ||
When using the `esql` setting, the query parameter must be a valid plaintext ES|QL string. | ||
When this setting is active, `target`, `size`, `slices` and `search_api` parameters are ignored. | ||
mashhurs marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
ES|QL returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries). | ||
The plugin maps each value entry to an event, populating corresponding fields. | ||
For example, a query might produce a table like: | ||
| ||
[cols="2,1,1,2",options="header"] | ||
|=== | ||
|`timestamp` |`user_id` | `action` | `status_code` | ||
| ||
|2025-04-10T12:00:00 |123 |login |200 | ||
|2025-04-10T12:05:00 |456 |purchase |403 | ||
|=== | ||
| ||
For this case, the plugin emits two events look like | ||
[source, json] | ||
[ | ||
{ | ||
"timestamp": "2025-04-10T12:00:00", | ||
"user_id": 123, | ||
"action": "login", | ||
"status_code": 200 | ||
}, | ||
{ | ||
"timestamp": "2025-04-10T12:05:00", | ||
"user_id": 456, | ||
"action": "purchase", | ||
"status_code": 403 | ||
} | ||
] | ||
| ||
[id="plugins-{type}s-{plugin}-request_timeout_seconds"] | ||
===== `request_timeout_seconds` | ||
| ||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
| @@ -74,6 +74,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base | |||||||
require 'logstash/inputs/elasticsearch/paginated_search' | ||||||||
require 'logstash/inputs/elasticsearch/aggregation' | ||||||||
require 'logstash/inputs/elasticsearch/cursor_tracker' | ||||||||
require 'logstash/inputs/elasticsearch/esql' | ||||||||
| ||||||||
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) | ||||||||
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck | ||||||||
| @@ -96,15 +97,18 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base | |||||||
# The index or alias to search. | ||||||||
config :index, :validate => :string, :default => "logstash-*" | ||||||||
| ||||||||
# The query to be executed. Read the Elasticsearch query DSL documentation | ||||||||
# for more info | ||||||||
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html | ||||||||
# The query to be executed. DSL or ES|QL (when `response_type => 'esql'`) query type is accepted. | ||||||||
# Read the following documentations for more info | ||||||||
# Query DSL: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html | ||||||||
# ES|QL: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html | ||||||||
config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }' | ||||||||
| ||||||||
# This allows you to speccify the response type: either hits or aggregations | ||||||||
# where hits: normal search request | ||||||||
# aggregations: aggregation request | ||||||||
config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits' | ||||||||
# This allows you to speccify the response type: one of [hits, aggregations, esql] | ||||||||
# where | ||||||||
# hits: normal search request | ||||||||
# aggregations: aggregation request | ||||||||
# esql: ES|QL request | ||||||||
config :response_type, :validate => %w[hits aggregations esql], :default => 'hits' | ||||||||
|
config :response_type, :validate => %w[hits aggregations esql], :default => 'hits' | |
config :response_type, :validate => %w[hits aggregations], :deprecated => "use `query_type`" | |
config :query_type, :validate => %w[hits aggregations esql] # default depends on query shape |
def register + @query_type = normalize_config("query_type") do |normalizer| + normalizer.with_deprecated_alias("response_type") + end || (@query.start_with?('{') ? 'hits' : 'esql')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking to add the deprecation right after this ES|QL change.
One agreement we need to decide is naming. I personally do not like hits
, aggregations
along with esql
. They indicate different contexts. I had options dsl_search
, dsl_aggregation
and esql
.
Let me please know your opinion: I can either apply with change if we quickly come with agreement or create an issue follow up right after this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking to add the deprecation right after this ES|QL change.
If someone starts using this feature, I would rather that their never-possible-before configuration feels "stable" and doesn't require them to go back and deal with deprecation warnings for things that we knew about before shipping the feature.
They indicate different contexts
This is a very good point.
The current response_type
only makes sense in the context of DSL-based queries.
So: what if we were to keep response_type
, but constrain its use to query_type => dsl
?
This would mean:
query_type => dsl
: allows use ofresponse_type
query_type => esql
: prohibits use ofresponse_type
- unspecified
query_type
could have a sensible default based on the shape ofquery
:- if it looks like JSON, then it's
dsl
- if it looks like ES|QL then it's
esql
- else we error helpfully
- if it looks like JSON, then it's
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introducing query_type
and keep using response_type
was my initial design and we with @jsvd thinking if we can still simplify without introducing new param (and came to agreement in our 1:1 to support wth response_type
and deprecate it in the future).
However, considering the behavior and user experience, I do also strongly support this (introducing query_type
at high level which other params follow) structural (query type at the high level, then depth details such as what response shape going to be parsed, etc..) logic.
I have applied it with this commit.
FYI: current CI snapshot unit test steps are broken (CIs with release versions are fine) due to core openssl.jar
and uri
gem miss but I have run on my local with local LS to verify change and unit/integration tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review note: moved to private area
mashhurs marked this conversation as resolved. Outdated Show resolved Hide resolved
mashhurs marked this conversation as resolved. Outdated Show resolved Hide resolved
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
require 'logstash/helpers/loggable_try' | ||
| ||
module LogStash | ||
module Inputs | ||
class Elasticsearch | ||
class Esql | ||
include LogStash::Util::Loggable | ||
| ||
ESQL_JOB = "ES|QL job" | ||
| ||
# Initialize the ESQL query executor | ||
# @param client [Elasticsearch::Client] The Elasticsearch client instance | ||
# @param plugin [LogStash::Inputs::Elasticsearch] The parent plugin instance | ||
def initialize(client, plugin) | ||
@client = client | ||
@plugin = plugin | ||
mashhurs marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
@retries = plugin.params["retries"] | ||
| ||
@query = plugin.params["query"] | ||
unless @query.include?('METADATA') | ||
logger.warn("The query doesn't have METADATA keyword. Including it makes _id and _version available in the documents", {:query => @query}) | ||
end | ||
jsvd marked this conversation as resolved. Show resolved Hide resolved | ||
end | ||
| ||
# Execute the ESQL query and process results | ||
# @param output_queue [Queue] The queue to push processed events to | ||
# @param query A query (to obey interface definition) | ||
def do_run(output_queue, query) | ||
logger.info("ES|QL executor starting") | ||
response = retryable(ESQL_JOB) do | ||
@client.esql.query({ body: { query: @query }, format: 'json' }) | ||
end | ||
# retriable already printed error details | ||
return if response == false | ||
| ||
mashhurs marked this conversation as resolved. Show resolved Hide resolved | ||
if response&.headers&.dig("warning") | ||
mashhurs marked this conversation as resolved. Show resolved Hide resolved | ||
logger.warn("ES|QL executor received warning", {:message => response.headers["warning"]}) | ||
mashhurs marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
end | ||
if response['values'] && response['columns'] | ||
process_response(response['values'], response['columns'], output_queue) | ||
end | ||
jsvd marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
end | ||
| ||
# Execute a retryable operation with proper error handling | ||
# @param job_name [String] Name of the job for logging purposes | ||
# @yield The block to execute | ||
# @return [Boolean] true if successful, false otherwise | ||
def retryable(job_name, &block) | ||
stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name) | ||
stud_try.try((@retries + 1).times) { yield } | ||
rescue => e | ||
error_details = {:message => e.message, :cause => e.cause} | ||
error_details[:backtrace] = e.backtrace if logger.debug? | ||
logger.error("#{job_name} failed with ", error_details) | ||
false | ||
end | ||
| ||
private | ||
| ||
# Process the ESQL response and push events to the output queue | ||
# @param values [Array[Array]] The ESQL query response hits | ||
# @param columns [Array[Hash]] The ESQL query response columns | ||
# @param output_queue [Queue] The queue to push processed events to | ||
def process_response(values, columns, output_queue) | ||
values.each do |value| | ||
mapped_data = map_column_and_values(columns, value) | ||
@plugin.decorate_and_push_to_queue(output_queue, mapped_data) | ||
end | ||
end | ||
| ||
# Map column names to their corresponding values | ||
# @param columns [Array] Array of column definitions | ||
# @param values [Array] Array of values for the current row | ||
# @return [Hash] Mapped data with column names as keys | ||
def map_column_and_values(columns, values) | ||
columns.each_with_index.with_object({}) do |(column, index), mapped_data| | ||
mapped_data[column["name"]] = values[index] | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end |
Uh oh!
There was an error while loading. Please reload this page.