Skip to content

Commit e351ba1

Browse files
committed
fix issues when transaction is unsampled in Kafka instrumentation
closes #1578
1 parent 1873b31 commit e351ba1

File tree

2 files changed

+37
-12
lines changed

2 files changed

+37
-12
lines changed

elasticapm/instrumentation/packages/kafka.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from elasticapm.conf import constants
3737
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
3838
from elasticapm.traces import DroppedSpan, capture_span, execution_context
39-
from elasticapm.utils.disttracing import TraceParent
39+
from elasticapm.utils.disttracing import TraceParent, TracingOptions
4040

4141

4242
class KafkaInstrumentation(AbstractInstrumentedModule):
@@ -48,6 +48,7 @@ class KafkaInstrumentation(AbstractInstrumentedModule):
4848
]
4949
provider_name = "kafka"
5050
name = "kafka"
51+
creates_transactions = True
5152

5253
def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs):
5354
topic = args[0] if args else kwargs["topic"]
@@ -68,7 +69,10 @@ def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs)
6869
) as span:
6970
transaction = execution_context.get_transaction()
7071
if transaction:
71-
tp = transaction.trace_parent.copy_from(span_id=span.id)
72+
tp = transaction.trace_parent.copy_from(
73+
span_id=span.id if span else transaction.id,
74+
trace_options=None if span else TracingOptions(recorded=False),
75+
)
7276
if headers:
7377
headers.append((constants.TRACEPARENT_BINARY_HEADER_NAME, tp.to_binary()))
7478
else:
@@ -79,22 +83,17 @@ def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs)
7983
else:
8084
kwargs["headers"] = headers
8185
result = wrapped(*args, **kwargs)
82-
if instance and instance._metadata.controller and not isinstance(span, DroppedSpan):
86+
if span and instance and instance._metadata.controller and not isinstance(span, DroppedSpan):
8387
address = instance._metadata.controller[1]
8488
port = instance._metadata.controller[2]
8589
span.context["destination"]["address"] = address
8690
span.context["destination"]["port"] = port
8791
return result
8892

89-
def call_if_sampling(self, module, method, wrapped, instance, args, kwargs):
90-
# Contrasting to the superclass implementation, we *always* want to
91-
# return a proxied connection, even if there is no ongoing elasticapm
92-
# transaction yet. This ensures that we instrument the cursor once
93-
# the transaction started.
94-
return self.call(module, method, wrapped, instance, args, kwargs)
95-
9693
def call(self, module, method, wrapped, instance, args, kwargs):
9794
client = get_client()
95+
if client is None:
96+
return wrapped(*args, **kwargs)
9897
destination_info = {
9998
"service": {"name": "kafka", "resource": "kafka/", "type": "messaging"},
10099
}
@@ -118,7 +117,7 @@ def call(self, module, method, wrapped, instance, args, kwargs):
118117
"destination": destination_info,
119118
},
120119
) as span:
121-
if not isinstance(span, DroppedSpan) and instance._subscription.subscription:
120+
if span and not isinstance(span, DroppedSpan) and instance._subscription.subscription:
122121
span.name += " from " + ", ".join(sorted(instance._subscription.subscription))
123122
results = wrapped(*args, **kwargs)
124123
return results
@@ -146,7 +145,7 @@ def call(self, module, method, wrapped, instance, args, kwargs):
146145
except StopIteration:
147146
span.cancel()
148147
raise
149-
if not isinstance(span, DroppedSpan):
148+
if span and not isinstance(span, DroppedSpan):
150149
topic = result[0]
151150
if client.should_ignore_topic(topic):
152151
span.cancel()

tests/instrumentation/kafka_tests.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,3 +207,29 @@ def delayed_send():
207207
spans = elasticapm_client.events[SPAN]
208208
assert len(spans) == 1
209209
assert spans[0]["name"] == "Kafka POLL from bar, foo, test"
210+
211+
212+
def test_kafka_no_client(instrument, producer, consumer, topics):
213+
assert elasticapm.get_client() is None
214+
# the following code shouldn't trigger any errors
215+
producer.send("test", key=b"foo", value=b"bar")
216+
for item in consumer:
217+
pass
218+
219+
220+
def test_kafka_send_unsampled_transaction(instrument, elasticapm_client, producer, topics):
221+
transaction_object = elasticapm_client.begin_transaction("transaction")
222+
transaction_object.is_sampled = False
223+
producer.send("test", key=b"foo", value=b"bar")
224+
elasticapm_client.end_transaction("foo")
225+
spans = elasticapm_client.events[SPAN]
226+
assert len(spans) == 0
227+
228+
229+
def test_kafka_poll_unsampled_transaction(instrument, elasticapm_client, consumer, topics):
230+
transaction_object = elasticapm_client.begin_transaction("transaction")
231+
transaction_object.is_sampled = False
232+
consumer.poll(timeout_ms=50)
233+
elasticapm_client.end_transaction("foo")
234+
spans = elasticapm_client.events[SPAN]
235+
assert len(spans) == 0

0 commit comments

Comments
 (0)