Skip to content
13 changes: 13 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ endif::[]
//===== Bug fixes
//

=== Unreleased

// Unreleased changes go here
// When the next release happens, nest these changes under the "Python Agent version 5.x" heading
[float]
===== Features

* Add additional context information about elasticsearch client requests {pull}1108[#1108]

[float]
===== Bug fixes


[[release-notes-6.x]]
=== Python Agent version 6.x

Expand Down
4 changes: 4 additions & 0 deletions docs/supported-technologies.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ Celery tasks will be recorded automatically with Django and Flask only.

Instrumented methods:

* `elasticsearch.transport.Transport.perform_request`
* `elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request`
* `elasticsearch.connection.http_requests.RequestsHttpConnection.perform_request`
* `elasticsearch._async.transport.AsyncTransport.perform_request`
* `elasticsearch_async.connection.AIOHttpConnection.perform_request`

Additionally, the instrumentation wraps the following methods of the `Elasticsearch` client class:
Expand All @@ -127,6 +129,8 @@ Collected trace data:

* the query string (if available)
* the `query` element from the request body (if available)
* the response status code
* the count of affected rows (if available)

We recommend using keyword arguments only with elasticsearch-py, as recommended by
https://elasticsearch-py.readthedocs.io/en/master/api.html#api-documentation[the elasticsearch-py docs].
Expand Down
43 changes: 35 additions & 8 deletions elasticapm/instrumentation/packages/asyncio/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@

import elasticapm
from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule
from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin
from elasticapm.instrumentation.packages.elasticsearch import (
ElasticsearchConnectionInstrumentation,
ElasticsearchTransportInstrumentation,
)
from elasticapm.traces import execution_context


class ElasticSearchAsyncConnection(ElasticSearchConnectionMixin, AsyncAbstractInstrumentedModule):
class ElasticSearchAsyncConnection(ElasticsearchConnectionInstrumentation, AsyncAbstractInstrumentedModule):
name = "elasticsearch_connection"

instrument_list = [
Expand All @@ -42,16 +46,39 @@ class ElasticSearchAsyncConnection(ElasticSearchConnectionMixin, AsyncAbstractIn
]

async def call(self, module, method, wrapped, instance, args, kwargs):
signature = self.get_signature(args, kwargs)
context = self.get_context(instance, args, kwargs)
span = execution_context.get_span()

self._update_context_by_request_data(span.context, instance, args, kwargs)

status_code, headers, raw_data = await wrapped(*args, **kwargs)

span.context["http"] = {"status_code": status_code}

return status_code, headers, raw_data


class ElasticsearchAsyncTransportInstrumentation(
ElasticsearchTransportInstrumentation, AsyncAbstractInstrumentedModule
):
name = "elasticsearch_connection"

instrument_list = [
("elasticsearch._async.transport", "AsyncTransport.perform_request"),
]

async def call(self, module, method, wrapped, instance, args, kwargs):
async with elasticapm.async_capture_span(
signature,
self._get_signature(args, kwargs),
span_type="db",
span_subtype="elasticsearch",
span_action="query",
extra=context,
extra={},
skip_frames=2,
leaf=True,
):
return await wrapped(*args, **kwargs)
) as span:
result_data = await wrapped(*args, **kwargs)

if isinstance(result_data, dict) and "hits" in result_data:
span.context["db"]["rows_affected"] = result_data["hits"]["total"]["value"]

return result_data
60 changes: 40 additions & 20 deletions elasticapm/instrumentation/packages/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,42 @@

import elasticapm
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
from elasticapm.traces import execution_context
from elasticapm.utils.logging import get_logger

logger = get_logger("elasticapm.instrument")

should_capture_body_re = re.compile("/(_search|_msearch|_count|_async_search|_sql|_eql)(/|$)")


class ElasticSearchConnectionMixin(object):
query_methods = ("search", "count", "delete_by_query")
class ElasticsearchConnectionInstrumentation(AbstractInstrumentedModule):
name = "elasticsearch_connection"

def get_signature(self, args, kwargs):
args_len = len(args)
http_method = args[0] if args_len else kwargs.get("method")
http_path = args[1] if args_len > 1 else kwargs.get("url")
instrument_list = [
("elasticsearch.connection.http_urllib3", "Urllib3HttpConnection.perform_request"),
("elasticsearch.connection.http_requests", "RequestsHttpConnection.perform_request"),
]

return "ES %s %s" % (http_method, http_path)
def call(self, module, method, wrapped, instance, args, kwargs):
span = execution_context.get_span()

self._update_context_by_request_data(span.context, instance, args, kwargs)

status_code, headers, raw_data = wrapped(*args, **kwargs)

span.context["http"] = {"status_code": status_code}

def get_context(self, instance, args, kwargs):
return status_code, headers, raw_data

def _update_context_by_request_data(self, context, instance, args, kwargs):
args_len = len(args)
url = args[1] if args_len > 1 else kwargs.get("url")
params = args[2] if args_len > 2 else kwargs.get("params")
body_serialized = args[3] if args_len > 3 else kwargs.get("body")

should_capture_body = bool(should_capture_body_re.search(url))

context = {"db": {"type": "elasticsearch"}}
context["db"] = {"type": "elasticsearch"}
if should_capture_body:
query = []
# using both q AND body is allowed in some API endpoints / ES versions,
Expand All @@ -76,32 +86,42 @@ def get_context(self, instance, args, kwargs):
query.append(body_serialized)
if query:
context["db"]["statement"] = "\n\n".join(query)

context["destination"] = {
"address": instance.host,
"service": {"name": "elasticsearch", "resource": "elasticsearch", "type": "db"},
}
return context


class ElasticsearchConnectionInstrumentation(ElasticSearchConnectionMixin, AbstractInstrumentedModule):
class ElasticsearchTransportInstrumentation(AbstractInstrumentedModule):
name = "elasticsearch_connection"

instrument_list = [
("elasticsearch.connection.http_urllib3", "Urllib3HttpConnection.perform_request"),
("elasticsearch.connection.http_requests", "RequestsHttpConnection.perform_request"),
("elasticsearch.transport", "Transport.perform_request"),
]

def call(self, module, method, wrapped, instance, args, kwargs):
signature = self.get_signature(args, kwargs)
context = self.get_context(instance, args, kwargs)

with elasticapm.capture_span(
signature,
self._get_signature(args, kwargs),
span_type="db",
span_subtype="elasticsearch",
span_action="query",
extra=context,
extra={},
skip_frames=2,
leaf=True,
):
return wrapped(*args, **kwargs)
) as span:
result_data = wrapped(*args, **kwargs)

try:
span.context["db"]["rows_affected"] = result_data["hits"]["total"]["value"]
except (KeyError, TypeError):
pass

return result_data

def _get_signature(self, args, kwargs):
args_len = len(args)
http_method = args[0] if args_len else kwargs.get("method")
http_path = args[1] if args_len > 1 else kwargs.get("url")

return "ES %s %s" % (http_method, http_path)
2 changes: 2 additions & 0 deletions elasticapm/instrumentation/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"elasticapm.instrumentation.packages.sqlite.SQLiteInstrumentation",
"elasticapm.instrumentation.packages.urllib3.Urllib3Instrumentation",
"elasticapm.instrumentation.packages.elasticsearch.ElasticsearchConnectionInstrumentation",
"elasticapm.instrumentation.packages.elasticsearch.ElasticsearchTransportInstrumentation",
"elasticapm.instrumentation.packages.cassandra.CassandraInstrumentation",
"elasticapm.instrumentation.packages.pymssql.PyMSSQLInstrumentation",
"elasticapm.instrumentation.packages.pyodbc.PyODBCInstrumentation",
Expand All @@ -73,6 +74,7 @@
"elasticapm.instrumentation.packages.asyncio.aiohttp_client.AioHttpClientInstrumentation",
"elasticapm.instrumentation.packages.asyncio.httpx.HttpxAsyncClientInstrumentation",
"elasticapm.instrumentation.packages.asyncio.elasticsearch.ElasticSearchAsyncConnection",
"elasticapm.instrumentation.packages.asyncio.elasticsearch.ElasticsearchAsyncTransportInstrumentation",
"elasticapm.instrumentation.packages.asyncio.aiopg.AioPGInstrumentation",
"elasticapm.instrumentation.packages.asyncio.asyncpg.AsyncPGInstrumentation",
"elasticapm.instrumentation.packages.tornado.TornadoRequestExecuteInstrumentation",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ async def test_info(instrument, elasticapm_client, async_elasticsearch):
assert span["subtype"] == "elasticsearch"
assert span["action"] == "query"
assert span["sync"] is False
assert span["context"]["http"]["status_code"] == 200


async def test_create(instrument, elasticapm_client, async_elasticsearch):
Expand Down Expand Up @@ -124,6 +125,7 @@ async def test_create(instrument, elasticapm_client, async_elasticsearch):
assert span["action"] == "query"
assert span["context"]["db"]["type"] == "elasticsearch"
assert "statement" not in span["context"]["db"]
assert span["context"]["http"]["status_code"] == 201


async def test_search_body(instrument, elasticapm_client, async_elasticsearch):
Expand Down Expand Up @@ -152,6 +154,9 @@ async def test_search_body(instrument, elasticapm_client, async_elasticsearch):
'{"query": {"term": {"user": "kimchy"}}, "sort": ["userid"]}'
)
assert span["sync"] is False
if ES_VERSION[0] >= 7:
assert span["context"]["db"]["rows_affected"] == 1
assert span["context"]["http"]["status_code"] == 200


async def test_count_body(instrument, elasticapm_client, async_elasticsearch):
Expand All @@ -175,3 +180,4 @@ async def test_count_body(instrument, elasticapm_client, async_elasticsearch):
assert span["context"]["db"]["type"] == "elasticsearch"
assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query": {"term": {"user": "kimchy"}}}')
assert span["sync"] is False
assert span["context"]["http"]["status_code"] == 200
Loading