Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions samples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@
import storage_set_bucket_default_kms_key
import storage_set_client_endpoint
import storage_set_metadata
import storage_transfer_manager
import storage_transfer_manager_download_all_blobs
import storage_transfer_manager_download_chunks_concurrently
import storage_transfer_manager_upload_directory
import storage_transfer_manager_upload_many_blobs
import storage_upload_file
import storage_upload_from_memory
import storage_upload_from_stream
Expand Down Expand Up @@ -686,7 +689,7 @@ def test_transfer_manager_snippets(test_bucket, capsys):
with open(os.path.join(uploads, name), "w") as f:
f.write(name)

storage_transfer_manager.upload_many_blobs_with_transfer_manager(
storage_transfer_manager_upload_many_blobs.upload_many_blobs_with_transfer_manager(
test_bucket.name,
BLOB_NAMES,
source_directory="{}/".format(uploads),
Expand All @@ -699,7 +702,7 @@ def test_transfer_manager_snippets(test_bucket, capsys):

with tempfile.TemporaryDirectory() as downloads:
# Download the files.
storage_transfer_manager.download_all_blobs_with_transfer_manager(
storage_transfer_manager_download_all_blobs.download_all_blobs_with_transfer_manager(
test_bucket.name,
destination_directory=os.path.join(downloads, ""),
threads=2,
Expand Down Expand Up @@ -729,11 +732,34 @@ def test_transfer_manager_directory_upload(test_bucket, capsys):
with open(os.path.join(uploads, name), "w") as f:
f.write(name)

storage_transfer_manager.upload_directory_with_transfer_manager(
storage_transfer_manager_upload_directory.upload_directory_with_transfer_manager(
test_bucket.name, source_directory="{}/".format(uploads)
)
out, _ = capsys.readouterr()

assert "Found {}".format(len(BLOB_NAMES)) in out
for name in BLOB_NAMES:
assert "Uploaded {}".format(name) in out


def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys):
BLOB_NAME = "test_file.txt"

with tempfile.NamedTemporaryFile() as file:
file.write(b"test")

storage_upload_file.upload_blob(
test_bucket.name, file.name, BLOB_NAME
)

with tempfile.TemporaryDirectory() as downloads:
# Download the file.
storage_transfer_manager_download_chunks_concurrently.download_chunks_concurrently(
test_bucket.name,
BLOB_NAME,
os.path.join(downloads, BLOB_NAME),
processes=8,
)
out, _ = capsys.readouterr()

assert "Downloaded {} to {}".format(BLOB_NAME, os.path.join(downloads, BLOB_NAME)) in out
184 changes: 0 additions & 184 deletions samples/snippets/storage_transfer_manager.py

This file was deleted.

65 changes: 65 additions & 0 deletions samples/snippets/storage_transfer_manager_download_all_blobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def download_all_blobs_with_transfer_manager(
bucket_name, destination_directory="", threads=4
):
"""Download all of the blobs in a bucket, concurrently in a thread pool.

The filename of each blob once downloaded is derived from the blob name and
the `destination_directory `parameter. For complete control of the filename
of each blob, use transfer_manager.download_many() instead.

Directories will be created automatically as needed, for instance to
accommodate blob names that include slashes.
"""

# The ID of your GCS bucket
# bucket_name = "your-bucket-name"

# The directory on your computer to which to download all of the files. This
# string is prepended (with os.path.join()) to the name of each blob to form
# the full path. Relative paths and absolute paths are both accepted. An
# empty string means "the current working directory". Note that this
# parameter allows accepts directory traversal ("../" etc.) and is not
# intended for unsanitized end user input.
# destination_directory = ""

# The number of threads to use for the operation. The performance impact of
# this value depends on the use case, but generally, smaller files benefit
# from more threads and larger files don't benefit from more threads. Too
# many threads can slow operations, especially with large files, due to
# contention over the Python GIL.
# threads=4

from google.cloud.storage import Client, transfer_manager

storage_client = Client()
bucket = storage_client.bucket(bucket_name)

blob_names = [blob.name for blob in bucket.list_blobs()]

results = transfer_manager.download_many_to_path(
bucket, blob_names, destination_directory=destination_directory, threads=threads
)

for name, result in zip(blob_names, results):
# The results list is either `None` or an exception for each blob in
# the input list, in order.

if isinstance(result, Exception):
print("Failed to download {} due to exception: {}".format(name, result))
else:
print("Downloaded {} to {}.".format(name, destination_directory + name))
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8):
"""Download a single file in chunks, concurrently."""

# The ID of your GCS bucket
# bucket_name = "your-bucket-name"

# The file to be downloaded
# blob_name = "target-file"

# The destination filename or path
# filename = ""

# The maximum number of worker processes that should be used to handle the
# workload of downloading the blob concurrently. PROCESS worker type uses more
# system resources (both memory and CPU) and can result in faster operations
# when working with large files. The optimal number of workers depends heavily
# on the specific use case. Refer to the docstring of the underlining method
# for more details.
# processes=8

from google.cloud.storage import Client, transfer_manager

storage_client = Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)

transfer_manager.download_chunks_concurrently(blob, filename, max_workers=processes)

print("Downloaded {} to {}.".format(blob_name, filename))
Loading