Skip to content

Commit c563500

Browse files
Address review comments
1 parent c5982ca commit c563500

File tree

6 files changed

+70
-52
lines changed

6 files changed

+70
-52
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2024, Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.

google/cloud/pubsub_v1/open_telemetry/context_propagation.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2017, Google LLC All rights reserved.
1+
# Copyright 2024, Google LLC All rights reserved.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -14,17 +14,26 @@
1414

1515
from opentelemetry.propagators.textmap import Setter
1616

17-
from google.pubsub_v1 import types as gapic_types
17+
from google.pubsub_v1 import PubsubMessage
1818

1919

2020
class OpenTelemetryContextSetter(Setter):
2121
"""
2222
Used by Open Telemetry for context propagation.
2323
"""
2424

25-
def set(self, carrier: gapic_types.PubsubMessage, key: str, value: str) -> None:
25+
def set(self, carrier: PubsubMessage, key: str, value: str) -> None:
2626
"""
2727
Injects trace context into Pub/Sub message attributes with
2828
"googclient_" prefix.
29+
30+
Args:
31+
carrier(PubsubMessage): The Pub/Sub message which is the carrier of Open Telemetry
32+
data.
33+
key(str): The key for which the Open Telemetry context data needs to be set.
34+
value(str): The Open Telemetry context value to be set.
35+
36+
Returns:
37+
None
2938
"""
3039
carrier.attributes["googclient_" + key] = value

google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py

Lines changed: 20 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,16 @@
1414

1515
import sys
1616
from datetime import datetime
17-
import warnings
1817
from typing import Optional
1918

2019
from opentelemetry import trace
2120
from opentelemetry.trace.propagation import set_span_in_context
2221
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
22+
from opentelemetry.semconv.trace import SpanAttributes
23+
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
24+
MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY,
25+
MESSAGING_MESSAGE_BODY_SIZE,
26+
)
2327

2428
from google.pubsub_v1 import types as gapic_types
2529
from google.cloud.pubsub_v1.open_telemetry.context_propagation import (
@@ -28,7 +32,7 @@
2832

2933

3034
class PublishMessageWrapper:
31-
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1.publisher"
35+
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
3236
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
3337
_OPEN_TELEMETRY_PUBLISHER_BATCHING = "publisher batching"
3438

@@ -58,17 +62,20 @@ def __eq__(self, other): # pragma: NO COVER
5862

5963
def start_create_span(self, topic: str, ordering_key: str) -> None:
6064
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
65+
assert len(topic.split("/")) == 4
6166
topic_short_name = topic.split("/")[3]
6267
with tracer.start_as_current_span(
6368
name=f"{topic_short_name} create",
6469
attributes={
65-
"messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
66-
"messaging.destination.name": topic_short_name,
67-
"code.function": "publish",
68-
"messaging.gcp_pubsub.message.ordering_key": ordering_key,
69-
"messaging.operation": "create",
70+
SpanAttributes.MESSAGING_SYSTEM: self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
71+
SpanAttributes.MESSAGING_DESTINATION_NAME: topic_short_name,
72+
SpanAttributes.CODE_FUNCTION: "publish",
73+
MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY: ordering_key,
74+
SpanAttributes.MESSAGING_OPERATION: "create",
7075
"gcp.project_id": topic.split("/")[1],
71-
"messaging.message.body.size": sys.getsizeof(self._message.data),
76+
MESSAGING_MESSAGE_BODY_SIZE: sys.getsizeof(
77+
self._message.data
78+
), # sys.getsizeof() used since the attribute expects size of message body in bytes
7279
},
7380
kind=trace.SpanKind.PRODUCER,
7481
end_on_exit=False,
@@ -86,12 +93,7 @@ def start_create_span(self, topic: str, ordering_key: str) -> None:
8693
)
8794

8895
def end_create_span(self, exc: Optional[BaseException] = None) -> None:
89-
if self._create_span is None: # pragma: NO COVER
90-
warnings.warn(
91-
message="publish create span is None. Hence, not ending it",
92-
category=RuntimeWarning,
93-
)
94-
return
96+
assert self._create_span is not None
9597
if exc:
9698
self._create_span.record_exception(exception=exc)
9799
self._create_span.set_status(
@@ -101,12 +103,7 @@ def end_create_span(self, exc: Optional[BaseException] = None) -> None:
101103

102104
def start_publisher_flow_control_span(self) -> None:
103105
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
104-
if self._create_span is None: # pragma: NO COVER
105-
warnings.warn(
106-
message="publish create span is None. Hence, not starting publish flow control span",
107-
category=RuntimeWarning,
108-
)
109-
return
106+
assert self._create_span is not None
110107
with tracer.start_as_current_span(
111108
name=self._PUBLISH_FLOW_CONTROL,
112109
kind=trace.SpanKind.INTERNAL,
@@ -118,12 +115,7 @@ def start_publisher_flow_control_span(self) -> None:
118115
def end_publisher_flow_control_span(
119116
self, exc: Optional[BaseException] = None
120117
) -> None:
121-
if self._flow_control_span is None: # pragma: NO COVER
122-
warnings.warn(
123-
message="publish flow control span is None. Hence, not ending it",
124-
category=RuntimeWarning,
125-
)
126-
return
118+
assert self._flow_control_span is not None
127119
if exc:
128120
self._flow_control_span.record_exception(exception=exc)
129121
self._flow_control_span.set_status(
@@ -132,12 +124,7 @@ def end_publisher_flow_control_span(
132124
self._flow_control_span.end()
133125

134126
def start_publisher_batching_span(self) -> None:
135-
if self._create_span is None: # pragma: NO COVER
136-
warnings.warn(
137-
message="publish create span is None. Hence, not starting publisher batching span",
138-
category=RuntimeWarning,
139-
)
140-
return
127+
assert self._create_span is not None
141128
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
142129
with tracer.start_as_current_span(
143130
name=self._OPEN_TELEMETRY_PUBLISHER_BATCHING,
@@ -148,12 +135,7 @@ def start_publisher_batching_span(self) -> None:
148135
self._batching_span = batching_span
149136

150137
def end_publisher_batching_span(self, exc: Optional[BaseException] = None) -> None:
151-
if self._batching_span is None: # pragma: NO COVER
152-
warnings.warn(
153-
message="publisher batching span is None. Hence, not ending it",
154-
category=RuntimeWarning,
155-
)
156-
return
138+
assert self._batching_span is not None
157139
if exc:
158140
self._batching_span.record_exception(exception=exc)
159141
self._batching_span.set_status(

google/cloud/pubsub_v1/publisher/_batch/thread.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
from datetime import datetime
2323

2424
from opentelemetry import trace
25+
from opentelemetry.semconv.trace import SpanAttributes
26+
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
27+
MESSAGING_BATCH_MESSAGE_COUNT,
28+
)
2529
import google.api_core.exceptions
2630
from google.api_core import gapic_v1
2731

@@ -91,7 +95,7 @@ class Batch(base.Batch):
9195
timeout is used.
9296
"""
9397

94-
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1.publisher"
98+
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
9599
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
96100

97101
def __init__(
@@ -246,18 +250,18 @@ def _start_publish_rpc_span(self) -> None:
246250
for wrapper in self._message_wrappers:
247251
span = wrapper.create_span
248252
# Add links only for sampled spans.
249-
if span.is_recording():
253+
if span.get_span_context().trace_flags.sampled:
250254
links.append(trace.Link(span.get_span_context()))
251255
topic_short_name = self._topic.split("/")[3]
252256
with tracer.start_as_current_span(
253257
name=f"{topic_short_name} publish",
254258
attributes={
255-
"messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
256-
"messaging.destination.name": topic_short_name,
259+
SpanAttributes.MESSAGING_SYSTEM: self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
260+
SpanAttributes.MESSAGING_DESTINATION_NAME: topic_short_name,
257261
"gcp.project_id": self._topic.split("/")[1],
258-
"messaging.batch.message_count": len(self._message_wrappers),
259-
"messaging.operation": "publish",
260-
"code.function": "_commit",
262+
MESSAGING_BATCH_MESSAGE_COUNT: len(self._message_wrappers),
263+
SpanAttributes.MESSAGING_OPERATION: "publish",
264+
SpanAttributes.CODE_FUNCTION: "_commit",
261265
},
262266
links=links,
263267
kind=trace.SpanKind.CLIENT,
@@ -266,7 +270,7 @@ def _start_publish_rpc_span(self) -> None:
266270
ctx = rpc_span.get_span_context()
267271
for wrapper in self._message_wrappers:
268272
span = wrapper.create_span
269-
if span.is_recording():
273+
if span.get_span_context().trace_flags.sampled:
270274
span.add_link(ctx)
271275
self._rpc_span = rpc_span
272276

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
"grpcio-status >= 1.33.2",
4848
"opentelemetry-api",
4949
"opentelemetry-sdk",
50+
"opentelemetry-semantic-conventions",
5051
]
5152
extras = {"libcst": "libcst >= 0.3.10"}
5253
url = "https://github.com/googleapis/python-pubsub"

tests/unit/pubsub_v1/publisher/batch/test_thread.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import pytest
2727

2828
from opentelemetry import trace
29+
from opentelemetry.trace import SpanContext
2930

3031
import google.api_core.exceptions
3132
from google.api_core import gapic_v1
@@ -830,21 +831,29 @@ def test_opentelemetry_commit_sampling(span_exporter):
830831
message=gapic_types.PubsubMessage(data=b"foo"),
831832
)
832833
message1.start_create_span(topic=TOPIC, ordering_key=None)
833-
message1.create_span.is_recording = mock.Mock(return_value=False)
834834

835835
message2 = PublishMessageWrapper(
836836
message=gapic_types.PubsubMessage(data=b"bar"),
837837
)
838838
message2.start_create_span(topic=TOPIC, ordering_key=None)
839839

840+
# Mock the 'get_span_context' method to return a mock SpanContext
841+
mock_span_context = mock.Mock(spec=SpanContext)
842+
mock_span_context.trace_flags.sampled = False
843+
840844
batch.publish(message1)
841845
batch.publish(message2)
842846

843847
publish_response = gapic_types.PublishResponse(message_ids=["a", "b"])
848+
849+
# Patch the 'create_span' method to return the mock SpanContext
844850
with mock.patch.object(
845-
type(batch.client), "_gapic_publish", return_value=publish_response
851+
message1.create_span, "get_span_context", return_value=mock_span_context
846852
):
847-
batch._commit()
853+
with mock.patch.object(
854+
type(batch.client), "_gapic_publish", return_value=publish_response
855+
):
856+
batch._commit()
848857

849858
spans = span_exporter.get_finished_spans()
850859

0 commit comments

Comments
 (0)