Skip to content

Commit 8bc7922

Browse files
authored
add instrumentation for redis.asyncio (#1807)
* add instrumentation for redis.asyncio * properly await in asyncio redis instrumentation * update changelog
1 parent 8b5609e commit 8bc7922

File tree

4 files changed

+300
-0
lines changed

4 files changed

+300
-0
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ endif::[]
4141
* Add `transport_json_serializer` configuration option {pull}1777[#1777]
4242
* Add S3 bucket and key name to OTel attributes {pull}1790[#1790]
4343
* Implement partial transaction support in AWS lambda {pull}1784[#1784]
44+
* Add instrumentation for redis.asyncio {pull}1807[#1807]
4445
4546
[float]
4647
===== Bug fixes
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# BSD 3-Clause License
2+
#
3+
# Copyright (c) 2019, Elasticsearch BV
4+
# All rights reserved.
5+
#
6+
# Redistribution and use in source and binary forms, with or without
7+
# modification, are permitted provided that the following conditions are met:
8+
#
9+
# * Redistributions of source code must retain the above copyright notice, this
10+
# list of conditions and the following disclaimer.
11+
#
12+
# * Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# * Neither the name of the copyright holder nor the names of its
17+
# contributors may be used to endorse or promote products derived from
18+
# this software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
31+
from __future__ import absolute_import
32+
33+
from elasticapm.contrib.asyncio.traces import async_capture_span
34+
from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule
35+
36+
37+
class RedisAsyncioInstrumentation(AsyncAbstractInstrumentedModule):
38+
name = "redis"
39+
40+
instrument_list = [
41+
("redis.asyncio.client", "Redis.execute_command"),
42+
("redis.asyncio.client", "PubSub.execute_command"),
43+
]
44+
45+
async def call(self, module, method, wrapped, instance, args, kwargs):
46+
if len(args) > 0:
47+
wrapped_name = args[0]
48+
if isinstance(wrapped_name, bytes):
49+
wrapped_name = wrapped_name.decode()
50+
else:
51+
wrapped_name = self.get_wrapped_name(wrapped, instance, method)
52+
53+
async with async_capture_span(
54+
wrapped_name, span_type="db", span_subtype="redis", span_action="query", leaf=True
55+
) as span:
56+
if span.context is not None:
57+
span.context["destination"] = _get_destination_info(instance)
58+
59+
return await wrapped(*args, **kwargs)
60+
61+
62+
class RedisPipelineInstrumentation(AsyncAbstractInstrumentedModule):
63+
name = "redis"
64+
65+
instrument_list = [("redis.asyncio.client", "Pipeline.execute")]
66+
67+
async def call(self, module, method, wrapped, instance, args, kwargs):
68+
wrapped_name = self.get_wrapped_name(wrapped, instance, method)
69+
70+
async with async_capture_span(
71+
wrapped_name, span_type="db", span_subtype="redis", span_action="query", leaf=True
72+
) as span:
73+
if span.context is not None:
74+
span.context["destination"] = _get_destination_info(instance)
75+
76+
return await wrapped(*args, **kwargs)
77+
78+
79+
def _get_destination_info(connection):
80+
destination_info = {"service": {"name": "", "resource": "redis", "type": ""}}
81+
if connection.connection_pool:
82+
destination_info["port"] = connection.connection_pool.connection_kwargs.get("port")
83+
destination_info["address"] = connection.connection_pool.connection_kwargs.get("host")
84+
85+
return destination_info

elasticapm/instrumentation/register.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@
9191
"elasticapm.instrumentation.packages.asyncio.aiomysql.AioMySQLInstrumentation",
9292
"elasticapm.instrumentation.packages.asyncio.aiobotocore.AioBotocoreInstrumentation",
9393
"elasticapm.instrumentation.packages.asyncio.starlette.StarletteServerErrorMiddlewareInstrumentation",
94+
"elasticapm.instrumentation.packages.asyncio.redis_asyncio.RedisAsyncioInstrumentation",
95+
"elasticapm.instrumentation.packages.asyncio.redis_asyncio.RedisPipelineInstrumentation",
9496
]
9597
)
9698

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
# BSD 3-Clause License
2+
#
3+
# Copyright (c) 2019, Elasticsearch BV
4+
# All rights reserved.
5+
#
6+
# Redistribution and use in source and binary forms, with or without
7+
# modification, are permitted provided that the following conditions are met:
8+
#
9+
# * Redistributions of source code must retain the above copyright notice, this
10+
# list of conditions and the following disclaimer.
11+
#
12+
# * Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# * Neither the name of the copyright holder nor the names of its
17+
# contributors may be used to endorse or promote products derived from
18+
# this software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
31+
import pytest # isort:skip
32+
33+
import elasticapm
34+
35+
redis = pytest.importorskip("redis.asyncio") # isort:skip
36+
37+
import os
38+
39+
import pytest_asyncio
40+
41+
from elasticapm.conf.constants import TRANSACTION
42+
from elasticapm.traces import capture_span
43+
44+
pytestmark = [pytest.mark.asyncio, pytest.mark.redis]
45+
46+
if "REDIS_HOST" not in os.environ:
47+
pytestmark.append(pytest.mark.skip("Skipping redis tests, no REDIS_HOST environment variable set"))
48+
49+
50+
@pytest_asyncio.fixture()
51+
async def redis_async_conn():
52+
_host = os.environ["REDIS_HOST"]
53+
_port = os.environ.get("REDIS_PORT", 6379)
54+
conn = await redis.Redis.from_url(f"redis://{_host}:{_port}")
55+
56+
yield conn
57+
58+
await conn.close(close_connection_pool=True)
59+
60+
61+
@pytest.mark.integrationtest
62+
async def test_ping(instrument, elasticapm_client, redis_async_conn):
63+
# The PING command is sent as a byte string, so this tests if we can handle
64+
# the command both as a str and as a bytes. See #1307
65+
elasticapm_client.begin_transaction("transaction.test")
66+
await redis_async_conn.ping()
67+
elasticapm_client.end_transaction("test")
68+
transaction = elasticapm_client.events[TRANSACTION][0]
69+
span = elasticapm_client.spans_for_transaction(transaction)[0]
70+
assert span["name"] == "PING"
71+
assert span["duration"] > 0.2 # sanity test to ensure we measure the actual call
72+
73+
74+
@pytest.mark.integrationtest
75+
async def test_pipeline(instrument, elasticapm_client, redis_async_conn):
76+
elasticapm_client.begin_transaction("transaction.test")
77+
with capture_span("test_pipeline", "test"):
78+
pipeline = redis_async_conn.pipeline()
79+
pipeline.rpush("mykey", "a", "b")
80+
pipeline.expire("mykey", 1000)
81+
await pipeline.execute()
82+
elasticapm_client.end_transaction("MyView")
83+
84+
transactions = elasticapm_client.events[TRANSACTION]
85+
spans = elasticapm_client.spans_for_transaction(transactions[0])
86+
87+
assert spans[0]["name"] == "Pipeline.execute"
88+
assert spans[0]["type"] == "db"
89+
assert spans[0]["subtype"] == "redis"
90+
assert spans[0]["action"] == "query"
91+
assert spans[0]["context"]["destination"] == {
92+
"address": os.environ.get("REDIS_HOST", "localhost"),
93+
"port": int(os.environ.get("REDIS_PORT", 6379)),
94+
"service": {"name": "", "resource": "redis", "type": ""},
95+
}
96+
assert spans[0]["duration"] > 0.2 # sanity test to ensure we measure the actual call
97+
98+
assert spans[1]["name"] == "test_pipeline"
99+
assert spans[1]["type"] == "test"
100+
101+
assert len(spans) == 2
102+
103+
104+
@pytest.mark.integrationtest
105+
async def test_redis_client(instrument, elasticapm_client, redis_async_conn):
106+
elasticapm_client.begin_transaction("transaction.test")
107+
with capture_span("test_redis_client", "test"):
108+
await redis_async_conn.rpush("mykey", "a", "b")
109+
await redis_async_conn.expire("mykey", 1000)
110+
elasticapm_client.end_transaction("MyView")
111+
112+
transactions = elasticapm_client.events[TRANSACTION]
113+
spans = elasticapm_client.spans_for_transaction(transactions[0])
114+
115+
spans = sorted(spans, key=lambda x: x["name"])
116+
117+
assert {t["name"] for t in spans} == {"test_redis_client", "RPUSH", "EXPIRE"}
118+
119+
assert spans[0]["name"] == "EXPIRE"
120+
assert spans[0]["type"] == "db"
121+
assert spans[0]["subtype"] == "redis"
122+
assert spans[0]["action"] == "query"
123+
assert spans[0]["context"]["destination"] == {
124+
"address": os.environ.get("REDIS_HOST", "localhost"),
125+
"port": int(os.environ.get("REDIS_PORT", 6379)),
126+
"service": {"name": "", "resource": "redis", "type": ""},
127+
}
128+
assert spans[0]["duration"] > 0.2 # sanity test to ensure we measure the actual call
129+
130+
assert spans[1]["name"] == "RPUSH"
131+
assert spans[1]["type"] == "db"
132+
assert spans[1]["subtype"] == "redis"
133+
assert spans[1]["action"] == "query"
134+
assert spans[1]["context"]["destination"] == {
135+
"address": os.environ.get("REDIS_HOST", "localhost"),
136+
"port": int(os.environ.get("REDIS_PORT", 6379)),
137+
"service": {"name": "", "resource": "redis", "type": ""},
138+
}
139+
assert spans[1]["duration"] > 0.2 # sanity test to ensure we measure the actual call
140+
141+
assert spans[2]["name"] == "test_redis_client"
142+
assert spans[2]["type"] == "test"
143+
144+
assert len(spans) == 3
145+
146+
147+
@pytest.mark.integrationtest
148+
async def test_publish_subscribe_async(instrument, elasticapm_client, redis_async_conn):
149+
elasticapm_client.begin_transaction("transaction.test")
150+
pubsub = redis_async_conn.pubsub()
151+
with capture_span("test_publish_subscribe", "test"):
152+
# publish
153+
await redis_async_conn.publish("mykey", "a")
154+
155+
# subscribe
156+
await pubsub.subscribe("mykey")
157+
158+
elasticapm_client.end_transaction("MyView")
159+
160+
transactions = elasticapm_client.events[TRANSACTION]
161+
spans = elasticapm_client.spans_for_transaction(transactions[0])
162+
163+
expected_signatures = {"test_publish_subscribe", "PUBLISH", "SUBSCRIBE"}
164+
165+
assert {t["name"] for t in spans} == expected_signatures
166+
167+
assert spans[0]["name"] == "PUBLISH"
168+
assert spans[0]["type"] == "db"
169+
assert spans[0]["subtype"] == "redis"
170+
assert spans[0]["action"] == "query"
171+
assert spans[0]["context"]["destination"] == {
172+
"address": os.environ.get("REDIS_HOST", "localhost"),
173+
"port": int(os.environ.get("REDIS_PORT", 6379)),
174+
"service": {"name": "", "resource": "redis", "type": ""},
175+
}
176+
assert spans[0]["duration"] > 0.2 # sanity test to ensure we measure the actual call
177+
178+
assert spans[1]["name"] == "SUBSCRIBE"
179+
assert spans[1]["type"] == "db"
180+
assert spans[1]["subtype"] == "redis"
181+
assert spans[1]["action"] == "query"
182+
assert spans[1]["context"]["destination"] == {
183+
"address": os.environ.get("REDIS_HOST", "localhost"),
184+
"port": int(os.environ.get("REDIS_PORT", 6379)),
185+
"service": {"name": "", "resource": "redis", "type": ""},
186+
}
187+
assert spans[1]["duration"] > 0.2 # sanity test to ensure we measure the actual call
188+
189+
assert spans[2]["name"] == "test_publish_subscribe"
190+
assert spans[2]["type"] == "test"
191+
192+
assert len(spans) == 3
193+
194+
195+
@pytest.mark.parametrize(
196+
"elasticapm_client",
197+
[
198+
pytest.param({"transaction_max_spans": 1}),
199+
],
200+
indirect=True,
201+
)
202+
@pytest.mark.integrationtest
203+
async def test_dropped(instrument, elasticapm_client, redis_async_conn):
204+
# Test that our instrumentation doesn't blow up for dropped spans
205+
elasticapm_client.begin_transaction("transaction.test")
206+
async with elasticapm.async_capture_span("bla"):
207+
pass
208+
await redis_async_conn.ping()
209+
elasticapm_client.end_transaction("test")
210+
transaction = elasticapm_client.events[TRANSACTION][0]
211+
span = elasticapm_client.spans_for_transaction(transaction)[0]
212+
assert span["name"] == "bla"

0 commit comments

Comments
 (0)