|
10 | 10 | import boto3.session |
11 | 11 | import pytz |
12 | 12 | import smart_open |
13 | | -from airbyte_cdk.sources.file_based.exceptions import ErrorListingFiles, FileBasedSourceError |
| 13 | +from airbyte_cdk.models import FailureType |
| 14 | +from airbyte_cdk.sources.file_based.exceptions import CustomFileBasedException, ErrorListingFiles, FileBasedSourceError |
14 | 15 | from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode |
15 | 16 | from airbyte_cdk.sources.file_based.remote_file import RemoteFile |
16 | 17 | from botocore.client import BaseClient |
17 | 18 | from botocore.client import Config as ClientConfig |
| 19 | +from botocore.exceptions import ClientError |
18 | 20 | from source_s3.v4.config import Config |
19 | 21 | from source_s3.v4.zip_reader import DecompressedStream, RemoteFileInsideArchive, ZipContentReader, ZipFileHandler |
20 | 22 |
|
@@ -68,25 +70,30 @@ def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: lo |
68 | 70 | total_n_keys = 0 |
69 | 71 |
|
70 | 72 | try: |
71 | | - if prefixes: |
72 | | - for prefix in prefixes: |
73 | | - for remote_file in self._page(s3, globs, self.config.bucket, prefix, seen, logger): |
74 | | - total_n_keys += 1 |
75 | | - yield remote_file |
76 | | - else: |
77 | | - for remote_file in self._page(s3, globs, self.config.bucket, None, seen, logger): |
| 73 | + for current_prefix in prefixes if prefixes else [None]: |
| 74 | + for remote_file in self._page(s3, globs, self.config.bucket, current_prefix, seen, logger): |
78 | 75 | total_n_keys += 1 |
79 | 76 | yield remote_file |
80 | 77 |
|
81 | 78 | logger.info(f"Finished listing objects from S3. Found {total_n_keys} objects total ({len(seen)} unique objects).") |
| 79 | + except ClientError as exc: |
| 80 | + if exc.response["Error"]["Code"] == "NoSuchBucket": |
| 81 | + raise CustomFileBasedException( |
| 82 | + f"The bucket {self.config.bucket} does not exist.", failure_type=FailureType.config_error, exception=exc |
| 83 | + ) |
| 84 | + self._raise_error_listing_files(globs, exc) |
82 | 85 | except Exception as exc: |
83 | | - raise ErrorListingFiles( |
84 | | - FileBasedSourceError.ERROR_LISTING_FILES, |
85 | | - source="s3", |
86 | | - bucket=self.config.bucket, |
87 | | - globs=globs, |
88 | | - endpoint=self.config.endpoint, |
89 | | - ) from exc |
| 86 | + self._raise_error_listing_files(globs, exc) |
| 87 | + |
| 88 | + def _raise_error_listing_files(self, globs: List[str], exc: Optional[Exception] = None): |
| 89 | + """Helper method to raise the ErrorListingFiles exception.""" |
| 90 | + raise ErrorListingFiles( |
| 91 | + FileBasedSourceError.ERROR_LISTING_FILES, |
| 92 | + source="s3", |
| 93 | + bucket=self.config.bucket, |
| 94 | + globs=globs, |
| 95 | + endpoint=self.config.endpoint, |
| 96 | + ) from exc |
90 | 97 |
|
91 | 98 | def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> IOBase: |
92 | 99 | try: |
|
0 commit comments