Skip to content

Commit 4766164

Browse files
authored
feat(zb-experimental): Add AsyncMultiRangeDownloader (#1550)
feat(zb-experimental): Add AsyncMultiRangeDownloader and it's init method.
1 parent fc8461b commit 4766164

File tree

2 files changed

+244
-0
lines changed

2 files changed

+244
-0
lines changed
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
from typing import Any, List, Optional, Tuple
18+
19+
from google.cloud.storage._experimental.asyncio.async_read_object_stream import (
20+
_AsyncReadObjectStream,
21+
)
22+
from google.cloud.storage._experimental.asyncio.async_grpc_client import (
23+
AsyncGrpcClient,
24+
)
25+
26+
from io import BytesIO
27+
28+
29+
class AsyncMultiRangeDownloader:
30+
"""Provides an interface for downloading multiple ranges of a GCS ``Object``
31+
concurrently.
32+
33+
Example usage:
34+
35+
.. code-block:: python
36+
37+
client = AsyncGrpcClient().grpc_client
38+
mrd = await AsyncMultiRangeDownloader.create_mrd(
39+
client, bucket_name="chandrasiri-rs", object_name="test_open9"
40+
)
41+
my_buff1 = BytesIO()
42+
my_buff2 = BytesIO()
43+
my_buff3 = BytesIO()
44+
my_buff4 = BytesIO()
45+
buffers = [my_buff1, my_buff2, my_buff3, my_buff4]
46+
await mrd.download_ranges(
47+
[
48+
(0, 100, my_buff1),
49+
(100, 200, my_buff2),
50+
(200, 300, my_buff3),
51+
(300, 400, my_buff4),
52+
]
53+
)
54+
for buff in buffers:
55+
print("downloaded bytes", buff.getbuffer().nbytes)
56+
57+
"""
58+
59+
@classmethod
60+
async def create_mrd(
61+
cls,
62+
client: AsyncGrpcClient.grpc_client,
63+
bucket_name: str,
64+
object_name: str,
65+
generation_number: Optional[int] = None,
66+
read_handle: Optional[bytes] = None,
67+
) -> AsyncMultiRangeDownloader:
68+
"""Initializes a MultiRangeDownloader and opens the underlying bidi-gRPC
69+
object for reading.
70+
71+
:type client: :class:`~google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client`
72+
:param client: The asynchronous client to use for making API requests.
73+
74+
:type bucket_name: str
75+
:param bucket_name: The name of the bucket containing the object.
76+
77+
:type object_name: str
78+
:param object_name: The name of the object to be read.
79+
80+
:type generation_number: int
81+
:param generation_number: (Optional) If present, selects a specific
82+
revision of this object.
83+
84+
:type read_handle: bytes
85+
:param read_handle: (Optional) An existing handle for reading the object.
86+
If provided, opening the bidi-gRPC connection will be faster.
87+
88+
:rtype: :class:`~google.cloud.storage._experimental.asyncio.async_multi_range_downloader.AsyncMultiRangeDownloader`
89+
:returns: An initialized AsyncMultiRangeDownloader instance for reading.
90+
"""
91+
mrd = cls(client, bucket_name, object_name, generation_number, read_handle)
92+
await mrd.open()
93+
return mrd
94+
95+
def __init__(
96+
self,
97+
client: AsyncGrpcClient.grpc_client,
98+
bucket_name: str,
99+
object_name: str,
100+
generation_number: Optional[int] = None,
101+
read_handle: Optional[bytes] = None,
102+
) -> None:
103+
"""Constructor for AsyncMultiRangeDownloader, clients are not adviced to
104+
use it directly. Instead it's adviced to use the classmethod `create_mrd`.
105+
106+
:type client: :class:`~google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client`
107+
:param client: The asynchronous client to use for making API requests.
108+
109+
:type bucket_name: str
110+
:param bucket_name: The name of the bucket containing the object.
111+
112+
:type object_name: str
113+
:param object_name: The name of the object to be read.
114+
115+
:type generation_number: int
116+
:param generation_number: (Optional) If present, selects a specific revision of
117+
this object.
118+
119+
:type read_handle: bytes
120+
:param read_handle: (Optional) An existing read handle.
121+
"""
122+
self.client = client
123+
self.bucket_name = bucket_name
124+
self.object_name = object_name
125+
self.generation_number = generation_number
126+
self.read_handle = read_handle
127+
self.read_obj_str: _AsyncReadObjectStream = None
128+
129+
async def open(self) -> None:
130+
"""Opens the bidi-gRPC connection to read from the object.
131+
132+
This method initializes and opens an `_AsyncReadObjectStream` (bidi-gRPC stream) to
133+
for downloading ranges of data from GCS ``Object``.
134+
135+
"Opening" constitutes fetching object metadata such as generation number
136+
and read handle and sets them as attributes if not already set.
137+
"""
138+
self.read_obj_str = _AsyncReadObjectStream(
139+
client=self.client,
140+
bucket_name=self.bucket_name,
141+
object_name=self.object_name,
142+
generation_number=self.generation_number,
143+
read_handle=self.read_handle,
144+
)
145+
await self.read_obj_str.open()
146+
if self.generation_number is None:
147+
self.generation_number = self.read_obj_str.generation_number
148+
self.read_handle = self.read_obj_str.read_handle
149+
return
150+
151+
async def download_ranges(self, read_ranges: List[Tuple[int, int, BytesIO]]) -> Any:
152+
"""Downloads multiple byte ranges from the object into the buffers
153+
provided by user.
154+
155+
:type read_ranges: List[Tuple[int, int, "BytesIO"]]
156+
:param read_ranges: A list of tuples, where each tuple represents a
157+
byte range (start_byte, end_byte, buffer) to download. Buffer has to
158+
be provided by the user, and user has to make sure appropriate
159+
memory is available in the application to avoid out-of-memory crash.
160+
161+
162+
Raises:
163+
NotImplementedError: This method is not yet implemented.
164+
"""
165+
raise NotImplementedError("TODO")
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pytest
16+
from unittest import mock
17+
from unittest.mock import AsyncMock
18+
19+
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
20+
AsyncMultiRangeDownloader,
21+
)
22+
from io import BytesIO
23+
24+
25+
_TEST_BUCKET_NAME = "test-bucket"
26+
_TEST_OBJECT_NAME = "test-object"
27+
_TEST_GENERATION_NUMBER = 123456789
28+
_TEST_READ_HANDLE = b"test-handle"
29+
30+
31+
@mock.patch(
32+
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
33+
)
34+
@mock.patch(
35+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
36+
)
37+
@pytest.mark.asyncio
38+
async def test_create_mrd(mock_async_grpc_client, async_read_object_stream):
39+
# Arrange
40+
mock_stream_instance = async_read_object_stream.return_value
41+
mock_stream_instance.open = AsyncMock()
42+
mock_stream_instance.generation_number = _TEST_GENERATION_NUMBER
43+
mock_stream_instance.read_handle = _TEST_READ_HANDLE
44+
45+
# act
46+
mrd = await AsyncMultiRangeDownloader.create_mrd(
47+
mock_async_grpc_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME
48+
)
49+
50+
# Assert
51+
async_read_object_stream.assert_called_once_with(
52+
client=mock_async_grpc_client,
53+
bucket_name=_TEST_BUCKET_NAME,
54+
object_name=_TEST_OBJECT_NAME,
55+
generation_number=None,
56+
read_handle=None,
57+
)
58+
mock_stream_instance.open.assert_called_once()
59+
60+
assert mrd.client == mock_async_grpc_client
61+
assert mrd.bucket_name == _TEST_BUCKET_NAME
62+
assert mrd.object_name == _TEST_OBJECT_NAME
63+
assert mrd.generation_number == _TEST_GENERATION_NUMBER
64+
assert mrd.read_handle == _TEST_READ_HANDLE
65+
assert mrd.read_obj_str is mock_stream_instance
66+
67+
68+
@mock.patch(
69+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
70+
)
71+
@pytest.mark.asyncio
72+
async def test_download_ranges(mock_async_grpc_client):
73+
"""Test that download_ranges() raises NotImplementedError."""
74+
mrd = AsyncMultiRangeDownloader(
75+
mock_async_grpc_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME
76+
)
77+
78+
with pytest.raises(NotImplementedError):
79+
await mrd.download_ranges([(0, 100, BytesIO())])

0 commit comments

Comments
 (0)