Skip to content

Commit 49dfd9c

Browse files
Address review comments
1 parent c5982ca commit 49dfd9c

File tree

6 files changed

+51
-42
lines changed

6 files changed

+51
-42
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2024, Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.

google/cloud/pubsub_v1/open_telemetry/context_propagation.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2017, Google LLC All rights reserved.
1+
# Copyright 2024, Google LLC All rights reserved.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -14,17 +14,26 @@
1414

1515
from opentelemetry.propagators.textmap import Setter
1616

17-
from google.pubsub_v1 import types as gapic_types
17+
from google.pubsub_v1 import PubsubMessage
1818

1919

2020
class OpenTelemetryContextSetter(Setter):
2121
"""
2222
Used by Open Telemetry for context propagation.
2323
"""
2424

25-
def set(self, carrier: gapic_types.PubsubMessage, key: str, value: str) -> None:
25+
def set(self, carrier: PubsubMessage, key: str, value: str) -> None:
2626
"""
2727
Injects trace context into Pub/Sub message attributes with
2828
"googclient_" prefix.
29+
30+
Args:
31+
carrier(PubsubMessage): The Pub/Sub message which is the carrier of Open Telemetry
32+
data.
33+
key(str): The key for which the Open Telemetry context data needs to be set.
34+
value(str): The Open Telemetry context value to be set.
35+
36+
Returns:
37+
None
2938
"""
3039
carrier.attributes["googclient_" + key] = value

google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
import sys
1616
from datetime import datetime
17-
import warnings
1817
from typing import Optional
1918

2019
from opentelemetry import trace
@@ -28,7 +27,7 @@
2827

2928

3029
class PublishMessageWrapper:
31-
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1.publisher"
30+
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
3231
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
3332
_OPEN_TELEMETRY_PUBLISHER_BATCHING = "publisher batching"
3433

@@ -58,6 +57,7 @@ def __eq__(self, other): # pragma: NO COVER
5857

5958
def start_create_span(self, topic: str, ordering_key: str) -> None:
6059
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
60+
assert len(topic.split("/")) == 4
6161
topic_short_name = topic.split("/")[3]
6262
with tracer.start_as_current_span(
6363
name=f"{topic_short_name} create",
@@ -68,7 +68,9 @@ def start_create_span(self, topic: str, ordering_key: str) -> None:
6868
"messaging.gcp_pubsub.message.ordering_key": ordering_key,
6969
"messaging.operation": "create",
7070
"gcp.project_id": topic.split("/")[1],
71-
"messaging.message.body.size": sys.getsizeof(self._message.data),
71+
"messaging.message.body.size": sys.getsizeof(
72+
self._message.data
73+
), # sys.getsizeof() used since the attribute expects size of message body in bytes
7274
},
7375
kind=trace.SpanKind.PRODUCER,
7476
end_on_exit=False,
@@ -86,12 +88,7 @@ def start_create_span(self, topic: str, ordering_key: str) -> None:
8688
)
8789

8890
def end_create_span(self, exc: Optional[BaseException] = None) -> None:
89-
if self._create_span is None: # pragma: NO COVER
90-
warnings.warn(
91-
message="publish create span is None. Hence, not ending it",
92-
category=RuntimeWarning,
93-
)
94-
return
91+
assert self._create_span is not None
9592
if exc:
9693
self._create_span.record_exception(exception=exc)
9794
self._create_span.set_status(
@@ -101,12 +98,7 @@ def end_create_span(self, exc: Optional[BaseException] = None) -> None:
10198

10299
def start_publisher_flow_control_span(self) -> None:
103100
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
104-
if self._create_span is None: # pragma: NO COVER
105-
warnings.warn(
106-
message="publish create span is None. Hence, not starting publish flow control span",
107-
category=RuntimeWarning,
108-
)
109-
return
101+
assert self._create_span is not None
110102
with tracer.start_as_current_span(
111103
name=self._PUBLISH_FLOW_CONTROL,
112104
kind=trace.SpanKind.INTERNAL,
@@ -118,12 +110,7 @@ def start_publisher_flow_control_span(self) -> None:
118110
def end_publisher_flow_control_span(
119111
self, exc: Optional[BaseException] = None
120112
) -> None:
121-
if self._flow_control_span is None: # pragma: NO COVER
122-
warnings.warn(
123-
message="publish flow control span is None. Hence, not ending it",
124-
category=RuntimeWarning,
125-
)
126-
return
113+
assert self._flow_control_span is not None
127114
if exc:
128115
self._flow_control_span.record_exception(exception=exc)
129116
self._flow_control_span.set_status(
@@ -132,12 +119,7 @@ def end_publisher_flow_control_span(
132119
self._flow_control_span.end()
133120

134121
def start_publisher_batching_span(self) -> None:
135-
if self._create_span is None: # pragma: NO COVER
136-
warnings.warn(
137-
message="publish create span is None. Hence, not starting publisher batching span",
138-
category=RuntimeWarning,
139-
)
140-
return
122+
assert self._create_span is not None
141123
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
142124
with tracer.start_as_current_span(
143125
name=self._OPEN_TELEMETRY_PUBLISHER_BATCHING,
@@ -148,12 +130,7 @@ def start_publisher_batching_span(self) -> None:
148130
self._batching_span = batching_span
149131

150132
def end_publisher_batching_span(self, exc: Optional[BaseException] = None) -> None:
151-
if self._batching_span is None: # pragma: NO COVER
152-
warnings.warn(
153-
message="publisher batching span is None. Hence, not ending it",
154-
category=RuntimeWarning,
155-
)
156-
return
133+
assert self._batching_span is not None
157134
if exc:
158135
self._batching_span.record_exception(exception=exc)
159136
self._batching_span.set_status(

google/cloud/pubsub_v1/publisher/_batch/thread.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class Batch(base.Batch):
9191
timeout is used.
9292
"""
9393

94-
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1.publisher"
94+
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
9595
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
9696

9797
def __init__(
@@ -246,7 +246,7 @@ def _start_publish_rpc_span(self) -> None:
246246
for wrapper in self._message_wrappers:
247247
span = wrapper.create_span
248248
# Add links only for sampled spans.
249-
if span.is_recording():
249+
if span.get_span_context().trace_flags.sampled:
250250
links.append(trace.Link(span.get_span_context()))
251251
topic_short_name = self._topic.split("/")[3]
252252
with tracer.start_as_current_span(
@@ -266,7 +266,7 @@ def _start_publish_rpc_span(self) -> None:
266266
ctx = rpc_span.get_span_context()
267267
for wrapper in self._message_wrappers:
268268
span = wrapper.create_span
269-
if span.is_recording():
269+
if span.get_span_context().trace_flags.sampled:
270270
span.add_link(ctx)
271271
self._rpc_span = rpc_span
272272

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
"grpcio-status >= 1.33.2",
4848
"opentelemetry-api",
4949
"opentelemetry-sdk",
50+
"opentelemetry-semantic-conventions",
5051
]
5152
extras = {"libcst": "libcst >= 0.3.10"}
5253
url = "https://github.com/googleapis/python-pubsub"

tests/unit/pubsub_v1/publisher/batch/test_thread.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import pytest
2727

2828
from opentelemetry import trace
29+
from opentelemetry.trace import SpanContext
2930

3031
import google.api_core.exceptions
3132
from google.api_core import gapic_v1
@@ -830,21 +831,29 @@ def test_opentelemetry_commit_sampling(span_exporter):
830831
message=gapic_types.PubsubMessage(data=b"foo"),
831832
)
832833
message1.start_create_span(topic=TOPIC, ordering_key=None)
833-
message1.create_span.is_recording = mock.Mock(return_value=False)
834834

835835
message2 = PublishMessageWrapper(
836836
message=gapic_types.PubsubMessage(data=b"bar"),
837837
)
838838
message2.start_create_span(topic=TOPIC, ordering_key=None)
839839

840+
# Mock the 'get_span_context' method to return a mock SpanContext
841+
mock_span_context = mock.Mock(spec=SpanContext)
842+
mock_span_context.trace_flags.sampled = False
843+
840844
batch.publish(message1)
841845
batch.publish(message2)
842846

843847
publish_response = gapic_types.PublishResponse(message_ids=["a", "b"])
848+
849+
# Patch the 'create_span' method to return the mock SpanContext
844850
with mock.patch.object(
845-
type(batch.client), "_gapic_publish", return_value=publish_response
851+
message1.create_span, "get_span_context", return_value=mock_span_context
846852
):
847-
batch._commit()
853+
with mock.patch.object(
854+
type(batch.client), "_gapic_publish", return_value=publish_response
855+
):
856+
batch._commit()
848857

849858
spans = span_exporter.get_finished_spans()
850859

0 commit comments

Comments
 (0)