Skip to content

Commit a05a3f2

Browse files
authored
feat: closed subscriber as context manager raises (#488)
Closes #484. **PR checklist:** - [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-pubsub/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [x] Appropriate docs were updated (if necessary)
1 parent 633e91b commit a05a3f2

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

google/cloud/pubsub_v1/subscriber/client.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ def __init__(self, **kwargs):
8585
# Instantiate the underlying GAPIC client.
8686
self._api = subscriber_client.SubscriberClient(**kwargs)
8787
self._target = self._api._transport._host
88+
self._closed = False
8889

8990
@classmethod
9091
def from_service_account_file(cls, filename, **kwargs):
@@ -120,6 +121,14 @@ def api(self):
120121
"""The underlying gapic API client."""
121122
return self._api
122123

124+
@property
125+
def closed(self) -> bool:
126+
"""Return whether the client has been closed and cannot be used anymore.
127+
128+
.. versionadded:: 2.8.0
129+
"""
130+
return self._closed
131+
123132
def subscribe(
124133
self,
125134
subscription,
@@ -252,8 +261,11 @@ def close(self):
252261
This method is idempotent.
253262
"""
254263
self.api._transport.grpc_channel.close()
264+
self._closed = True
255265

256266
def __enter__(self):
267+
if self._closed:
268+
raise RuntimeError("Closed subscriber cannot be used as context manager.")
257269
return self
258270

259271
def __exit__(self, exc_type, exc_val, exc_tb):

tests/unit/pubsub_v1/subscriber/test_subscriber_client.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ def test_init_default_client_info(creds):
5050
assert expected_client_info in user_agent
5151

5252

53+
def test_init_default_closed_state(creds):
54+
client = subscriber.Client(credentials=creds)
55+
assert not client.closed
56+
57+
5358
def test_init_w_custom_transport(creds):
5459
transport = SubscriberGrpcTransport(credentials=creds)
5560
client = subscriber.Client(transport=transport)
@@ -185,6 +190,7 @@ def test_close(creds):
185190
client.close()
186191

187192
patched_close.assert_called()
193+
assert client.closed
188194

189195

190196
def test_closes_channel_as_context_manager(creds):
@@ -198,6 +204,18 @@ def test_closes_channel_as_context_manager(creds):
198204
patched_close.assert_called()
199205

200206

207+
def test_context_manager_raises_if_closed(creds):
208+
client = subscriber.Client(credentials=creds)
209+
210+
with mock.patch.object(client.api._transport.grpc_channel, "close"):
211+
client.close()
212+
213+
expetect_msg = r"(?i).*closed.*cannot.*context manager.*"
214+
with pytest.raises(RuntimeError, match=expetect_msg):
215+
with client:
216+
pass
217+
218+
201219
def test_streaming_pull_gapic_monkeypatch(creds):
202220
client = subscriber.Client(credentials=creds)
203221

0 commit comments

Comments
 (0)