Skip to content

Commit 64c27a8

Browse files
authored
Add message related properties to transactions and spans (#1525)
Closes #1512
1 parent cbd4871 commit 64c27a8

File tree

10 files changed

+218
-17
lines changed

10 files changed

+218
-17
lines changed

src/Elastic.Apm.Azure.ServiceBus/AzureMessagingServiceBusDiagnosticListener.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ private void OnProcessStart(KeyValuePair<string, object> kv, string action)
112112
var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
113113
transaction.Context.Service = new Service(null, null) { Framework = _framework };
114114

115+
if (queueName != null)
116+
transaction.Context.Message = new Message { Queue = new Queue { Name = queueName } };
117+
115118
// transaction creation will create an activity, so use this as the key.
116119
var activityId = Activity.Current.Id;
117120

@@ -158,11 +161,15 @@ private void OnReceiveStart(KeyValuePair<string, object> kv, string action)
158161
{
159162
var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
160163
transaction.Context.Service = new Service(null, null) { Framework = _framework };
164+
if (queueName != null)
165+
transaction.Context.Message = new Message { Queue = new Queue { Name = queueName } };
161166
segment = transaction;
162167
}
163168
else
164169
{
165170
var span = ApmAgent.GetCurrentExecutionSegment().StartSpan(transactionName, ApiConstants.TypeMessaging, ServiceBus.SubType, action);
171+
if (queueName != null)
172+
span.Context.Message = new Message { Queue = new Queue { Name = queueName } };
166173
segment = span;
167174
}
168175

@@ -249,6 +256,9 @@ private void OnSendStart(KeyValuePair<string, object> kv, string action)
249256
}
250257
};
251258

259+
if (queueName != null)
260+
span.Context.Message = new Message { Queue = new Queue { Name = queueName } };
261+
252262
if (!_processingSegments.TryAdd(activity.Id, span))
253263
{
254264
Logger.Trace()?.Log(

src/Elastic.Apm.Azure.ServiceBus/MicrosoftAzureServiceBusDiagnosticListener.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ private void OnProcessStart(KeyValuePair<string, object> kv, string action, Prop
116116
var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
117117
transaction.Context.Service = new Service(null, null) { Framework = _framework };
118118

119+
if (queueName != null)
120+
transaction.Context.Message = new Message { Queue = new Queue { Name = queueName } };
121+
119122
// transaction creation will create an activity, so use this as the key.
120123
var activityId = Activity.Current.Id;
121124

@@ -150,11 +153,15 @@ private void OnReceiveStart(KeyValuePair<string, object> kv, string action, Prop
150153
{
151154
var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
152155
transaction.Context.Service = new Service(null, null) { Framework = _framework };
156+
if (queueName != null)
157+
transaction.Context.Message = new Message { Queue = new Queue { Name = queueName } };
153158
segment = transaction;
154159
}
155160
else
156161
{
157162
var span = ApmAgent.GetCurrentExecutionSegment().StartSpan(transactionName, ApiConstants.TypeMessaging, ServiceBus.SubType, action);
163+
if (queueName != null)
164+
span.Context.Message = new Message { Queue = new Queue { Name = queueName } };
158165
segment = span;
159166
}
160167

@@ -229,6 +236,9 @@ private void OnSendStart(KeyValuePair<string, object> kv, string action, Propert
229236
}
230237
};
231238

239+
if (queueName != null)
240+
span.Context.Message = new Message { Queue = new Queue { Name = queueName } };
241+
232242
if (!_processingSegments.TryAdd(activity.Id, span))
233243
{
234244
Logger.Trace()?.Log(

src/Elastic.Apm/Api/Context.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ public class Context
2727
[JsonProperty("tags")]
2828
public Dictionary<string, string> Labels => InternalLabels.Value;
2929

30+
/// <summary>
31+
/// Holds details related to message receiving and publishing if the captured event integrates with a messaging system
32+
/// </summary>
33+
public Message Message { get; set; }
34+
3035
/// <summary>
3136
/// If a log record was generated as a result of a http request, the http interface can be used to collect this
3237
/// information.

src/Elastic.Apm/Api/Message.cs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Licensed to Elasticsearch B.V under
2+
// one or more agreements.
3+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
4+
// See the LICENSE file in the project root for more information
5+
6+
using System.Collections.Generic;
7+
using Elastic.Apm.Api.Constraints;
8+
9+
namespace Elastic.Apm.Api
10+
{
11+
/// <summary>
12+
/// Holds details related to message receiving and publishing if the captured event integrates with a messaging system
13+
/// </summary>
14+
public class Message
15+
{
16+
/// <summary>
17+
/// Body of the received message
18+
/// </summary>
19+
public string Body { get; set; }
20+
21+
/// <summary>
22+
/// Headers received with the message
23+
/// </summary>
24+
public Dictionary<string, string> Headers { get; set; }
25+
26+
/// <inheritdoc cref="Api.Age"/>
27+
public Age Age { get; set; }
28+
29+
/// <see cref="Api.Queue"/>
30+
public Queue Queue { get; set; }
31+
32+
/// <summary>
33+
/// optional routing key of the received message as set on the queuing system, such as in RabbitMQ.
34+
/// </summary>
35+
public string RoutingKey { get; set; }
36+
}
37+
38+
/// <summary>
39+
/// Age of the message. If the monitored messaging framework provides a timestamp for the message, agents may use it.
40+
/// Otherwise, the sending agent can add a timestamp in milliseconds since the Unix epoch to the message's metadata to be retrieved by the
41+
/// receiving agent. If a timestamp is not available, agents should omit this field.
42+
/// </summary>
43+
public class Age
44+
{
45+
/// <summary>
46+
/// Age of the message in milliseconds.
47+
/// </summary>
48+
public long Ms { get; set; }
49+
}
50+
51+
/// <summary>
52+
/// Information about the message queue where the message is received.
53+
/// </summary>
54+
public class Queue
55+
{
56+
/// <summary>
57+
/// Name of the message queue where the message is received
58+
/// </summary>
59+
[MaxLength]
60+
public string Name { get; set; }
61+
}
62+
}

src/Elastic.Apm/Api/Request.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
using System.Linq;
88
using Elastic.Apm.Helpers;
99
using Elastic.Apm.Libraries.Newtonsoft.Json;
10-
using Elastic.Apm.Libraries.Newtonsoft.Json.Serialization;
1110

1211
namespace Elastic.Apm.Api
1312
{

src/Elastic.Apm/Api/SpanContext.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class SpanContext
1717
public Database Db { get; set; }
1818
public Destination Destination { get; set; }
1919
public Http Http { get; set; }
20+
public Message Message { get; set; }
2021

2122
/// <summary>
2223
/// <seealso cref="ShouldSerializeLabels" />

src/Elastic.Apm/Filters/ErrorContextSanitizerFilter.cs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,24 @@ internal class ErrorContextSanitizerFilter
1818
{
1919
public IError Filter(IError error)
2020
{
21-
if (error is Error realError && realError.Context?.Request?.Headers != null && realError.Configuration != null)
21+
if (error is Error realError && realError.Configuration != null)
2222
{
23-
foreach (var key in realError.Context.Request.Headers.Keys.ToList())
23+
if (realError.Context?.Request?.Headers != null)
2424
{
25-
if (WildcardMatcher.IsAnyMatch(realError.Configuration.SanitizeFieldNames, key))
26-
realError.Context.Request.Headers[key] = Consts.Redacted;
25+
foreach (var key in realError.Context.Request.Headers.Keys.ToList())
26+
{
27+
if (WildcardMatcher.IsAnyMatch(realError.Configuration.SanitizeFieldNames, key))
28+
realError.Context.Request.Headers[key] = Consts.Redacted;
29+
}
30+
}
31+
32+
if (realError.Context?.Message?.Headers != null)
33+
{
34+
foreach (var key in realError.Context.Message.Headers.Keys.ToList())
35+
{
36+
if (WildcardMatcher.IsAnyMatch(realError.Configuration.SanitizeFieldNames, key))
37+
realError.Context.Message.Headers[key] = Consts.Redacted;
38+
}
2739
}
2840
}
2941

src/Elastic.Apm/Filters/HeaderDictionarySanitizerFilter.cs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,24 @@ public ITransaction Filter(ITransaction transaction)
2020
{
2121
if (transaction is Transaction realTransaction)
2222
{
23-
if (realTransaction.IsContextCreated && realTransaction.Context.Request?.Headers != null)
23+
if (realTransaction.IsContextCreated)
2424
{
25-
foreach (var key in realTransaction.Context?.Request?.Headers?.Keys.ToList())
25+
if (realTransaction.Context.Request?.Headers != null)
2626
{
27-
if (WildcardMatcher.IsAnyMatch(realTransaction.Configuration.SanitizeFieldNames, key))
28-
realTransaction.Context.Request.Headers[key] = Consts.Redacted;
27+
foreach (var key in realTransaction.Context.Request.Headers.Keys.ToList())
28+
{
29+
if (WildcardMatcher.IsAnyMatch(realTransaction.Configuration.SanitizeFieldNames, key))
30+
realTransaction.Context.Request.Headers[key] = Consts.Redacted;
31+
}
32+
}
33+
34+
if (realTransaction.Context.Message?.Headers != null)
35+
{
36+
foreach (var key in realTransaction.Context.Message.Headers.Keys.ToList())
37+
{
38+
if (WildcardMatcher.IsAnyMatch(realTransaction.Configuration.SanitizeFieldNames, key))
39+
realTransaction.Context.Message.Headers[key] = Consts.Redacted;
40+
}
2941
}
3042
}
3143
}

test/Elastic.Apm.Azure.ServiceBus.Tests/AzureMessagingServiceBusDiagnosticListenerTests.cs

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message"
6464
destination.Service.Name.Should().Be(ServiceBus.SubType);
6565
destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.QueueName}");
6666
destination.Service.Type.Should().Be(ApiConstants.TypeMessaging);
67+
68+
span.Context.Message.Should().NotBeNull();
69+
span.Context.Message.Queue.Should().NotBeNull();
70+
span.Context.Message.Queue.Name.Should().Be(scope.QueueName);
6771
}
6872

6973
[AzureCredentialsFact]
@@ -93,6 +97,10 @@ await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message"
9397
destination.Service.Name.Should().Be(ServiceBus.SubType);
9498
destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.TopicName}");
9599
destination.Service.Type.Should().Be(ApiConstants.TypeMessaging);
100+
101+
span.Context.Message.Should().NotBeNull();
102+
span.Context.Message.Queue.Should().NotBeNull();
103+
span.Context.Message.Queue.Name.Should().Be(scope.TopicName);
96104
}
97105

98106
[AzureCredentialsFact]
@@ -124,6 +132,10 @@ await sender.ScheduleMessageAsync(
124132
destination.Service.Name.Should().Be(ServiceBus.SubType);
125133
destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.QueueName}");
126134
destination.Service.Type.Should().Be(ApiConstants.TypeMessaging);
135+
136+
span.Context.Message.Should().NotBeNull();
137+
span.Context.Message.Queue.Should().NotBeNull();
138+
span.Context.Message.Queue.Name.Should().Be(scope.QueueName);
127139
}
128140

129141
[AzureCredentialsFact]
@@ -155,6 +167,10 @@ await sender.ScheduleMessageAsync(
155167
destination.Service.Name.Should().Be(ServiceBus.SubType);
156168
destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.TopicName}");
157169
destination.Service.Type.Should().Be(ApiConstants.TypeMessaging);
170+
171+
span.Context.Message.Should().NotBeNull();
172+
span.Context.Message.Queue.Should().NotBeNull();
173+
span.Context.Message.Queue.Name.Should().Be(scope.TopicName);
158174
}
159175

160176
[AzureCredentialsFact]
@@ -182,6 +198,10 @@ await _agent.Tracer.CaptureTransaction("Receive messages", ApiConstants.TypeMess
182198
span.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}");
183199
span.Type.Should().Be(ApiConstants.TypeMessaging);
184200
span.Subtype.Should().Be(ServiceBus.SubType);
201+
202+
span.Context.Message.Should().NotBeNull();
203+
span.Context.Message.Queue.Should().NotBeNull();
204+
span.Context.Message.Queue.Name.Should().Be(scope.QueueName);
185205
}
186206

187207
[AzureCredentialsFact]
@@ -204,6 +224,10 @@ await sender.SendMessageAsync(
204224

205225
transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}");
206226
transaction.Type.Should().Be(ApiConstants.TypeMessaging);
227+
228+
transaction.Context.Message.Should().NotBeNull();
229+
transaction.Context.Message.Queue.Should().NotBeNull();
230+
transaction.Context.Message.Queue.Name.Should().Be(scope.QueueName);
207231
}
208232

209233
[AzureCredentialsFact]
@@ -224,9 +248,13 @@ await sender.SendMessageAsync(
224248

225249
_sender.Transactions.Should().HaveCount(1);
226250
var transaction = _sender.FirstTransaction;
227-
228-
transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}");
251+
var subscription = $"{scope.TopicName}/Subscriptions/{scope.SubscriptionName}";
252+
transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {subscription}");
229253
transaction.Type.Should().Be(ApiConstants.TypeMessaging);
254+
255+
transaction.Context.Message.Should().NotBeNull();
256+
transaction.Context.Message.Queue.Should().NotBeNull();
257+
transaction.Context.Message.Queue.Name.Should().Be(subscription);
230258
}
231259

232260
[AzureCredentialsFact]
@@ -242,8 +270,6 @@ await sender.SendMessageAsync(
242270

243271
var message = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
244272
await receiver.DeferMessageAsync(message).ConfigureAwait(false);
245-
246-
247273
await receiver.ReceiveDeferredMessageAsync(message.SequenceNumber).ConfigureAwait(false);
248274

249275
if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2), count: 2))
@@ -255,6 +281,10 @@ await sender.SendMessageAsync(
255281
transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}");
256282
transaction.Type.Should().Be(ApiConstants.TypeMessaging);
257283

284+
transaction.Context.Message.Should().NotBeNull();
285+
transaction.Context.Message.Queue.Should().NotBeNull();
286+
transaction.Context.Message.Queue.Name.Should().Be(scope.QueueName);
287+
258288
var secondTransaction = _sender.Transactions[1];
259289
secondTransaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVEDEFERRED from {scope.QueueName}");
260290
secondTransaction.Type.Should().Be(ApiConstants.TypeMessaging);
@@ -282,9 +312,14 @@ await sender.SendMessageAsync(
282312
_sender.Transactions.Should().HaveCount(2);
283313

284314
var transaction = _sender.FirstTransaction;
285-
transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}");
315+
var subscription = $"{scope.TopicName}/Subscriptions/{scope.SubscriptionName}";
316+
transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {subscription}");
286317
transaction.Type.Should().Be(ApiConstants.TypeMessaging);
287318

319+
transaction.Context.Message.Should().NotBeNull();
320+
transaction.Context.Message.Queue.Should().NotBeNull();
321+
transaction.Context.Message.Queue.Name.Should().Be(subscription);
322+
288323
var secondTransaction = _sender.Transactions[1];
289324
secondTransaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVEDEFERRED from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}");
290325
secondTransaction.Type.Should().Be(ApiConstants.TypeMessaging);
@@ -353,6 +388,10 @@ public async Task Capture_Transaction_When_ProcessMessage_From_Queue()
353388

354389
foreach (var transaction in processTransactions)
355390
{
391+
transaction.Context.Message.Should().NotBeNull();
392+
transaction.Context.Message.Queue.Should().NotBeNull();
393+
transaction.Context.Message.Queue.Name.Should().Be(scope.QueueName);
394+
356395
var spans = _sender.Spans.Where(s => s.TransactionId == transaction.Id).ToList();
357396
spans.Should().HaveCount(1);
358397
}
@@ -409,6 +448,10 @@ public async Task Capture_Transaction_When_ProcessSessionMessage_From_Queue()
409448

410449
foreach (var transaction in processTransactions)
411450
{
451+
transaction.Context.Message.Should().NotBeNull();
452+
transaction.Context.Message.Queue.Should().NotBeNull();
453+
transaction.Context.Message.Queue.Name.Should().Be(scope.QueueName);
454+
412455
var spans = _sender.Spans.Where(s => s.TransactionId == transaction.Id).ToList();
413456
spans.Should().HaveCount(1);
414457
}

0 commit comments

Comments
 (0)