Skip to content

Commit 9c8856a

Browse files
authored
feat(zb-experimental): Add support for closing MRD (#1554)
* add `is_stream_open` and check it's value before doing any of the following operation. `open` , `close` , `send` ,`recv` * Instantiate `socket_like_rpc` inside open, so that it is a new object every time we open. * Otherwise, consider this scenario where a user is opening after closing then `initial_request` would be None because close sets it to None.
1 parent e26888f commit 9c8856a

File tree

2 files changed

+175
-40
lines changed

2 files changed

+175
-40
lines changed

google/cloud/storage/_experimental/asyncio/async_read_object_stream.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,32 +83,41 @@ def __init__(
8383
self.rpc = self.client._client._transport._wrapped_methods[
8484
self.client._client._transport.bidi_read_object
8585
]
86-
first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
86+
self.first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
8787
read_object_spec=_storage_v2.BidiReadObjectSpec(
8888
bucket=self._full_bucket_name, object=object_name
8989
),
9090
)
9191
self.metadata = (("x-goog-request-params", f"bucket={self._full_bucket_name}"),)
92-
self.socket_like_rpc = AsyncBidiRpc(
93-
self.rpc, initial_request=first_bidi_read_req, metadata=self.metadata
94-
)
92+
self.socket_like_rpc: Optional[AsyncBidiRpc] = None
93+
self._is_stream_open: bool = False
9594

9695
async def open(self) -> None:
9796
"""Opens the bidi-gRPC connection to read from the object.
9897
9998
This method sends an initial request to start the stream and receives
10099
the first response containing metadata and a read handle.
101100
"""
101+
if self._is_stream_open:
102+
raise ValueError("Stream is already open")
103+
self.socket_like_rpc = AsyncBidiRpc(
104+
self.rpc, initial_request=self.first_bidi_read_req, metadata=self.metadata
105+
)
102106
await self.socket_like_rpc.open() # this is actually 1 send
103107
response = await self.socket_like_rpc.recv()
104108
if self.generation_number is None:
105109
self.generation_number = response.metadata.generation
106110

107111
self.read_handle = response.read_handle
108112

113+
self._is_stream_open = True
114+
109115
async def close(self) -> None:
110116
"""Closes the bidi-gRPC connection."""
117+
if not self._is_stream_open:
118+
raise ValueError("Stream is not open")
111119
await self.socket_like_rpc.close()
120+
self._is_stream_open = False
112121

113122
async def send(
114123
self, bidi_read_object_request: _storage_v2.BidiReadObjectRequest
@@ -120,6 +129,8 @@ async def send(
120129
The request message to send. This is typically used to specify
121130
the read offset and limit.
122131
"""
132+
if not self._is_stream_open:
133+
raise ValueError("Stream is not open")
123134
await self.socket_like_rpc.send(bidi_read_object_request)
124135

125136
async def recv(self) -> _storage_v2.BidiReadObjectResponse:
@@ -132,4 +143,10 @@ async def recv(self) -> _storage_v2.BidiReadObjectResponse:
132143
:class:`~google.cloud._storage_v2.types.BidiReadObjectResponse`:
133144
The response message from the server.
134145
"""
146+
if not self._is_stream_open:
147+
raise ValueError("Stream is not open")
135148
return await self.socket_like_rpc.recv()
149+
150+
@property
151+
def is_stream_open(self) -> bool:
152+
return self._is_stream_open

tests/unit/asyncio/test_async_read_object_stream.py

Lines changed: 154 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,42 @@
2727
_TEST_READ_HANDLE = b"test-read-handle"
2828

2929

30+
async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open=True):
31+
"""Helper to create an instance of _AsyncReadObjectStream and open it by default."""
32+
socket_like_rpc = AsyncMock()
33+
mock_cls_async_bidi_rpc.return_value = socket_like_rpc
34+
socket_like_rpc.open = AsyncMock()
35+
36+
recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse)
37+
recv_response.metadata = mock.MagicMock(spec=_storage_v2.Object)
38+
recv_response.metadata.generation = _TEST_GENERATION_NUMBER
39+
recv_response.read_handle = _TEST_READ_HANDLE
40+
socket_like_rpc.recv = AsyncMock(return_value=recv_response)
41+
42+
read_obj_stream = _AsyncReadObjectStream(
43+
client=mock_client,
44+
bucket_name=_TEST_BUCKET_NAME,
45+
object_name=_TEST_OBJECT_NAME,
46+
)
47+
48+
if open:
49+
await read_obj_stream.open()
50+
51+
return read_obj_stream
52+
53+
3054
@mock.patch(
3155
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
3256
)
3357
@mock.patch(
3458
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
3559
)
3660
def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc):
61+
# Arrange
62+
rpc_sentinel = mock.sentinel.A
3763
mock_client._client._transport.bidi_read_object = "bidi_read_object_rpc"
3864
mock_client._client._transport._wrapped_methods = {
39-
"bidi_read_object_rpc": mock.sentinel.A
65+
"bidi_read_object_rpc": rpc_sentinel,
4066
}
4167
full_bucket_name = f"projects/_/buckets/{_TEST_BUCKET_NAME}"
4268
first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
@@ -45,39 +71,36 @@ def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc):
4571
),
4672
)
4773

74+
# Act
4875
read_obj_stream = _AsyncReadObjectStream(
4976
client=mock_client,
5077
bucket_name=_TEST_BUCKET_NAME,
5178
object_name=_TEST_OBJECT_NAME,
5279
generation_number=_TEST_GENERATION_NUMBER,
80+
read_handle=_TEST_READ_HANDLE,
5381
)
5482

55-
mock_async_bidi_rpc.assert_called_once_with(
56-
mock.sentinel.A,
57-
initial_request=first_bidi_read_req,
58-
metadata=(("x-goog-request-params", f"bucket={full_bucket_name}"),),
59-
)
60-
assert read_obj_stream.socket_like_rpc is mock_async_bidi_rpc.return_value
83+
# Assert
84+
assert read_obj_stream.bucket_name == _TEST_BUCKET_NAME
85+
assert read_obj_stream.object_name == _TEST_OBJECT_NAME
86+
assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER
87+
assert read_obj_stream.read_handle == _TEST_READ_HANDLE
88+
assert read_obj_stream.first_bidi_read_req == first_bidi_read_req
89+
assert read_obj_stream.rpc == rpc_sentinel
6190

6291

92+
@mock.patch(
93+
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
94+
)
6395
@mock.patch(
6496
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
6597
)
6698
@pytest.mark.asyncio
67-
async def test_open(mock_client):
99+
async def test_open(mock_client, mock_cls_async_bidi_rpc):
68100
# arrange
69-
read_obj_stream = _AsyncReadObjectStream(
70-
client=mock_client,
71-
bucket_name=_TEST_BUCKET_NAME,
72-
object_name=_TEST_OBJECT_NAME,
101+
read_obj_stream = await instantiate_read_obj_stream(
102+
mock_client, mock_cls_async_bidi_rpc, open=False
73103
)
74-
read_obj_stream.socket_like_rpc.open = AsyncMock()
75-
76-
recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse)
77-
recv_response.metadata = mock.MagicMock(spec=_storage_v2.Object)
78-
recv_response.metadata.generation = _TEST_GENERATION_NUMBER
79-
recv_response.read_handle = _TEST_READ_HANDLE
80-
read_obj_stream.socket_like_rpc.recv = AsyncMock(return_value=recv_response)
81104

82105
# act
83106
await read_obj_stream.open()
@@ -88,40 +111,88 @@ async def test_open(mock_client):
88111

89112
assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER
90113
assert read_obj_stream.read_handle == _TEST_READ_HANDLE
114+
assert read_obj_stream.is_stream_open
91115

92116

117+
@mock.patch(
118+
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
119+
)
93120
@mock.patch(
94121
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
95122
)
96123
@pytest.mark.asyncio
97-
async def test_close(mock_client):
124+
async def test_open_when_already_open_should_raise_error(
125+
mock_client, mock_cls_async_bidi_rpc
126+
):
98127
# arrange
99-
read_obj_stream = _AsyncReadObjectStream(
100-
client=mock_client,
101-
bucket_name=_TEST_BUCKET_NAME,
102-
object_name=_TEST_OBJECT_NAME,
128+
read_obj_stream = await instantiate_read_obj_stream(
129+
mock_client, mock_cls_async_bidi_rpc, open=True
130+
)
131+
132+
# act + assert (pythonic)
133+
with pytest.raises(ValueError) as exc:
134+
await read_obj_stream.open()
135+
136+
# assert
137+
assert str(exc.value) == "Stream is already open"
138+
139+
140+
@mock.patch(
141+
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
142+
)
143+
@mock.patch(
144+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
145+
)
146+
@pytest.mark.asyncio
147+
async def test_close(mock_client, mock_cls_async_bidi_rpc):
148+
# arrange
149+
read_obj_stream = await instantiate_read_obj_stream(
150+
mock_client, mock_cls_async_bidi_rpc, open=True
103151
)
104-
read_obj_stream.socket_like_rpc.close = AsyncMock()
105152

106153
# act
107154
await read_obj_stream.close()
108155

109156
# assert
110157
read_obj_stream.socket_like_rpc.close.assert_called_once()
158+
assert not read_obj_stream.is_stream_open
111159

112160

161+
@mock.patch(
162+
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
163+
)
113164
@mock.patch(
114165
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
115166
)
116167
@pytest.mark.asyncio
117-
async def test_send(mock_client):
168+
async def test_close_without_open_should_raise_error(
169+
mock_client, mock_cls_async_bidi_rpc
170+
):
118171
# arrange
119-
read_obj_stream = _AsyncReadObjectStream(
120-
client=mock_client,
121-
bucket_name=_TEST_BUCKET_NAME,
122-
object_name=_TEST_OBJECT_NAME,
172+
read_obj_stream = await instantiate_read_obj_stream(
173+
mock_client, mock_cls_async_bidi_rpc, open=False
174+
)
175+
176+
# act + assert (pythonic)
177+
with pytest.raises(ValueError) as exc:
178+
await read_obj_stream.close()
179+
180+
# assert
181+
assert str(exc.value) == "Stream is not open"
182+
183+
184+
@mock.patch(
185+
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
186+
)
187+
@mock.patch(
188+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
189+
)
190+
@pytest.mark.asyncio
191+
async def test_send(mock_client, mock_cls_async_bidi_rpc):
192+
# arrange
193+
read_obj_stream = await instantiate_read_obj_stream(
194+
mock_client, mock_cls_async_bidi_rpc, open=True
123195
)
124-
read_obj_stream.socket_like_rpc.send = AsyncMock()
125196

126197
# act
127198
bidi_read_object_request = _storage_v2.BidiReadObjectRequest()
@@ -133,16 +204,40 @@ async def test_send(mock_client):
133204
)
134205

135206

207+
@mock.patch(
208+
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
209+
)
136210
@mock.patch(
137211
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
138212
)
139213
@pytest.mark.asyncio
140-
async def test_recv(mock_client):
214+
async def test_send_without_open_should_raise_error(
215+
mock_client, mock_cls_async_bidi_rpc
216+
):
141217
# arrange
142-
read_obj_stream = _AsyncReadObjectStream(
143-
client=mock_client,
144-
bucket_name=_TEST_BUCKET_NAME,
145-
object_name=_TEST_OBJECT_NAME,
218+
read_obj_stream = await instantiate_read_obj_stream(
219+
mock_client, mock_cls_async_bidi_rpc, open=False
220+
)
221+
222+
# act + assert (pythonic)
223+
with pytest.raises(ValueError) as exc:
224+
await read_obj_stream.send(_storage_v2.BidiReadObjectRequest())
225+
226+
# assert
227+
assert str(exc.value) == "Stream is not open"
228+
229+
230+
@mock.patch(
231+
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
232+
)
233+
@mock.patch(
234+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
235+
)
236+
@pytest.mark.asyncio
237+
async def test_recv(mock_client, mock_cls_async_bidi_rpc):
238+
# arrange
239+
read_obj_stream = await instantiate_read_obj_stream(
240+
mock_client, mock_cls_async_bidi_rpc, open=True
146241
)
147242
bidi_read_object_response = _storage_v2.BidiReadObjectResponse()
148243
read_obj_stream.socket_like_rpc.recv = AsyncMock(
@@ -155,3 +250,26 @@ async def test_recv(mock_client):
155250
# assert
156251
read_obj_stream.socket_like_rpc.recv.assert_called_once()
157252
assert response == bidi_read_object_response
253+
254+
255+
@mock.patch(
256+
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
257+
)
258+
@mock.patch(
259+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
260+
)
261+
@pytest.mark.asyncio
262+
async def test_recv_without_open_should_raise_error(
263+
mock_client, mock_cls_async_bidi_rpc
264+
):
265+
# arrange
266+
read_obj_stream = await instantiate_read_obj_stream(
267+
mock_client, mock_cls_async_bidi_rpc, open=False
268+
)
269+
270+
# act + assert (pythonic)
271+
with pytest.raises(ValueError) as exc:
272+
await read_obj_stream.recv()
273+
274+
# assert
275+
assert str(exc.value) == "Stream is not open"

0 commit comments

Comments
 (0)