Skip to content

Commit b080438

Browse files
Sparkyczbeniwohli
andauthored
Add Redis pub/sub instrumentation (#1129)
* Add instrumentation of redis pub/sub commands * Add instrumentation of aioredis pub/sub commands Co-authored-by: Benjamin Wohlwend <beni@elastic.co>
1 parent 4342f91 commit b080438

File tree

4 files changed

+96
-3
lines changed

4 files changed

+96
-3
lines changed

elasticapm/instrumentation/packages/asyncio/aioredis.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
class RedisConnectionPoolInstrumentation(AbstractInstrumentedModule):
3939
name = "aioredis"
4040

41-
instrument_list = [("aioredis.pool", "ConnectionsPool.execute")]
41+
instrument_list = [("aioredis.pool", "ConnectionsPool.execute"),
42+
("aioredis.pool", "ConnectionsPool.execute_pubsub")]
4243

4344
def call(self, module, method, wrapped, instance, args, kwargs):
4445
if len(args) > 0:
@@ -73,7 +74,8 @@ def call(self, module, method, wrapped, instance, args, kwargs):
7374
class RedisConnectionInstrumentation(AbstractInstrumentedModule):
7475
name = "aioredis"
7576

76-
instrument_list = (("aioredis.connection", "RedisConnection.execute"),)
77+
instrument_list = (("aioredis.connection", "RedisConnection.execute"),
78+
("aioredis.pool", "ConnectionsPool.execute_pubsub"))
7779

7880
def call(self, module, method, wrapped, instance, args, kwargs):
7981
span = execution_context.get_span()

elasticapm/instrumentation/packages/redis.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class RedisInstrumentation(Redis3CheckMixin, AbstractInstrumentedModule):
5353
name = "redis"
5454

5555
# no need to instrument StrictRedis in redis-py >= 3.0
56-
instrument_list_3 = [("redis.client", "Redis.execute_command")]
56+
instrument_list_3 = [("redis.client", "Redis.execute_command"), ("redis.client", "PubSub.execute_command")]
5757
instrument_list = [("redis.client", "Redis.execute_command"), ("redis.client", "StrictRedis.execute_command")]
5858

5959
def call(self, module, method, wrapped, instance, args, kwargs):

tests/instrumentation/asyncio_tests/aioredis_tests.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,48 @@ async def test_redis_client(instrument, elasticapm_client, redis_conn):
123123
assert spans[2]["type"] == "test"
124124

125125
assert len(spans) == 3
126+
127+
128+
@pytest.mark.integrationtest
129+
async def test_publish_subscribe(instrument, elasticapm_client, redis_conn):
130+
elasticapm_client.begin_transaction("transaction.test")
131+
with capture_span("test_publish_subscribe", "test"):
132+
# publish
133+
await redis_conn.publish("mykey", "a")
134+
135+
#subscribe
136+
await redis_conn.subscribe("mykey")
137+
138+
elasticapm_client.end_transaction("MyView")
139+
140+
transactions = elasticapm_client.events[TRANSACTION]
141+
spans = elasticapm_client.spans_for_transaction(transactions[0])
142+
143+
expected_signatures = {"test_publish_subscribe", "PUBLISH", "SUBSCRIBE"}
144+
145+
assert {t["name"] for t in spans} == expected_signatures
146+
147+
assert spans[0]["name"] == "PUBLISH"
148+
assert spans[0]["type"] == "db"
149+
assert spans[0]["subtype"] == "redis"
150+
assert spans[0]["action"] == "query"
151+
assert spans[0]["context"]["destination"] == {
152+
"address": os.environ.get("REDIS_HOST", "localhost"),
153+
"port": int(os.environ.get("REDIS_PORT", 6379)),
154+
"service": {"name": "aioredis", "resource": "redis", "type": "db"},
155+
}
156+
157+
assert spans[1]["name"] == "SUBSCRIBE"
158+
assert spans[1]["type"] == "db"
159+
assert spans[1]["subtype"] == "redis"
160+
assert spans[1]["action"] == "query"
161+
assert spans[1]["context"]["destination"] == {
162+
"address": os.environ.get("REDIS_HOST", "localhost"),
163+
"port": int(os.environ.get("REDIS_PORT", 6379)),
164+
"service": {"name": "aioredis", "resource": "redis", "type": "db"},
165+
}
166+
167+
assert spans[2]["name"] == "test_publish_subscribe"
168+
assert spans[2]["type"] == "test"
169+
170+
assert len(spans) == 3

tests/instrumentation/redis_tests.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,3 +163,49 @@ def test_unix_domain_socket_connection_destination_info():
163163
destination_info = get_destination_info(conn)
164164
assert destination_info["port"] is None
165165
assert destination_info["address"] == "unix:///some/path"
166+
167+
168+
@pytest.mark.integrationtest
169+
def test_publish_subscribe(instrument, elasticapm_client, redis_conn):
170+
elasticapm_client.begin_transaction("transaction.test")
171+
with capture_span("test_publish_subscribe", "test"):
172+
# publish
173+
redis_conn.publish("mykey", "a")
174+
175+
# subscribe
176+
with redis_conn.pubsub() as channel:
177+
channel.subscribe("mykey")
178+
179+
elasticapm_client.end_transaction("MyView")
180+
181+
transactions = elasticapm_client.events[TRANSACTION]
182+
spans = elasticapm_client.spans_for_transaction(transactions[0])
183+
184+
expected_signatures = {"test_publish_subscribe", "PUBLISH", "SUBSCRIBE"}
185+
186+
assert {t["name"] for t in spans} == expected_signatures
187+
188+
assert spans[0]["name"] == "PUBLISH"
189+
assert spans[0]["type"] == "db"
190+
assert spans[0]["subtype"] == "redis"
191+
assert spans[0]["action"] == "query"
192+
assert spans[0]["context"]["destination"] == {
193+
"address": os.environ.get("REDIS_HOST", "localhost"),
194+
"port": int(os.environ.get("REDIS_PORT", 6379)),
195+
"service": {"name": "redis", "resource": "redis", "type": "db"},
196+
}
197+
198+
assert spans[1]["name"] == "SUBSCRIBE"
199+
assert spans[1]["type"] == "db"
200+
assert spans[1]["subtype"] == "redis"
201+
assert spans[1]["action"] == "query"
202+
assert spans[1]["context"]["destination"] == {
203+
"address": os.environ.get("REDIS_HOST", "localhost"),
204+
"port": int(os.environ.get("REDIS_PORT", 6379)),
205+
"service": {"name": "redis", "resource": "redis", "type": "db"},
206+
}
207+
208+
assert spans[2]["name"] == "test_publish_subscribe"
209+
assert spans[2]["type"] == "test"
210+
211+
assert len(spans) == 3

0 commit comments

Comments
 (0)