.zarr.zip on s3 #1613
Replies: 7 comments 6 replies
-
| This is great @jeffpeck10x! Would you be willing to add this to the tutorial? I wonder whether the built-in ZipStore has any use for reading from S3. Or is fsspec's ZipFileSystem always required here? |
Beta Was this translation helpful? Give feedback.
-
| An additional option would be to use With thanks to @caviere (see https://github.com/zarr-developers/outreachy_2022_testing_zipstore/blob/main/real%20%20world%20data/main.py#L54) Regardless, 👍 for your version and/or this one being in the tutorial location that would have helped you find them! Thanks. |
Beta Was this translation helpful? Give feedback.
-
| made a small pr to update docs: #1615 |
Beta Was this translation helpful? Give feedback.
-
| With Zarr v3, I used the following code to create a read-only Zarr store from a zipped Zarr in S3: import zarr import s3fs class S3ZipStore(zarr.storage.ZipStore): def __init__(self, path: s3fs.S3File) -> None: super().__init__(path="", mode="r") self.path = path s3 = s3fs.S3FileSystem(anon=True, endpoint_url=S3_ENDPOINT, asynchronous=False) file = s3.open(f"s3://{S3_BUCKET}/{ZIP_PATH}") zarr_store = S3ZipStore(file)It relies on that ZipStore only checks that Edit: A load time benchmark shows that ZipStore created from a local zip file suffers a similar performance hit compared to LocalStore: --- config: xyChart: width: 900 height: 300 themeVariables: xyChart: backgroundColor: "#000" titleColor: "#fff" xAxisLabelColor: "#fff" xAxisTitleColor: "#fff" xAxisTickColor: "#fff" xAxisLineColor: "#fff" yAxisLabelColor: "#fff" yAxisTitleColor: "#fff" yAxisTickColor: "#fff" yAxisLineColor: "#fff" plotColorPalette: "#fff8, #000" --- xychart-beta title "Random Sentinel 2 patch time series load time benchmark (5100 m x 5100 m, 1 year)" x-axis ["S3 Zarr", "S3 zipped Zarr", "NVMe Zarr", "NVMe zipped Zarr"] y-axis "Mean load time (s)" 0 --> 26 bar [7.53, 23.9, 1.12, 3.21] bar [0, 0, 0, 0, 0, 0, 0, 0, 0] |
Beta Was this translation helpful? Give feedback.
-
| Here's a rudimentary async fsspec read-only file system for uncompressed ("store" compression) zip files that minimizes the number of reads (Initially 2 reads, then 1 read per file). With this async file system, reading from a zipped Zarr v3 in S3 or local storage is no longer slower than reading from a Zarr. I think it could be made even faster by combining reads for files that are consecutive in the zip, but the FsspecStore doesn't receive any The code is also in our benchmark repo. --- config: xyChart: width: 900 height: 300 themeVariables: xyChart: backgroundColor: "#000" titleColor: "#fff" xAxisLabelColor: "#fff" xAxisTitleColor: "#fff" xAxisTickColor: "#fff" xAxisLineColor: "#fff" yAxisLabelColor: "#fff" yAxisTitleColor: "#fff" yAxisTickColor: "#fff" yAxisLineColor: "#fff" plotColorPalette: "#fff8, #000" --- xychart-beta title "Random Sentinel 2 patch time series load time benchmark (5100 m x 5100 m, 1 year)" x-axis ["S3 Zarr", "S3 zipped Zarr (async)", "NVMe Zarr", "NVMe zipped Zarr (async)"] y-axis "Mean load time (s)" 0 --> 8 bar [7.716204173564911, 7.546070046424866, 1.1104609894752502, 1.3312029433250427] bar [0, 0, 0, 0] Usage: # Async S3 zipped Zarr import s3fs s3 = s3fs.S3FileSystem(anon=True, endpoint_url=S3_ENDPOINT, asynchronous=True) zipfs = ReadOnlyZipFileSystem(s3, f"{S3_BUCKET}/{ZIP_PATH}") zarr_store = zarr.storage.FsspecStore(fs=zipfs, read_only=True, path="") # Async local zipped Zarr from fsspec.implementations.local import LocalFileSystem from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper local_fs = LocalFileSystem() async_local_fs = AsyncFileSystemWrapper(local_fs) zipfs = ReadOnlyZipFileSystem(async_local_fs, ZIP_PATH) zarr_store = zarr.storage.FsspecStore(fs=zipfs, read_only=True, path="")Codefrom fsspec.asyn import AsyncFileSystem import asyncio import posixpath from typing import List, Optional import struct class ReadOnlyZipFileSystem(AsyncFileSystem): """An async read-only file system for uncompressed zip files using fsspec. Mounts an uncompressed zip file as a read-only file system. Supports ZIP64. Only supports ZIP_STORED (uncompressed). Multi-disk zip files are not supported. Written for reading a zipped Zarr from S3 storage and therefore only includes methods used by Zarr. Reads 64KB from the end of the zip file to capture the ZIP64 end of central directory (EOCD), the ZIP64 EOCD locator, and the standard EOCD, using _cat_file with negative start offset and therefore not needing to query the file size first. Reads the CD using _cat_file with positive start offset. Parses the CD for the file names and the header offsets. Requires that the CD contains an entry of each directory (except for the autogenerated root dir) before the entries of their files and subdirectories. Requires that the CD entries appear in the same order as the file headers and the files. Assumes that there are no gaps between file headers and files. This way the file headers need not be read, because the file offset will be the offset of the next file header (or CD for the last file) minus the file size. File datetimes are not available. While not currently supported, support for compressed zip files could be implemented by reading the file headers (ending the read at the next header or CD) and by decompression. """ protocol = "zipfs" MAX_ZIP_TAIL_READ = 64 * 1024 def __init__(self, fs: AsyncFileSystem, path: str, **kwargs): """Initialize the ReadOnlyZipFileSystem. Args: fs: The underlying AsyncFileSystem containing the zip file. path: Path to the zip file in the underlying file system. **kwargs: Additional arguments passed to AsyncFileSystem. """ super().__init__(**kwargs) self.asynchronous = True self.fs = fs self.path = path self._files = None self._lock = asyncio.Lock() async def _initialize(self): """Initialize self._files by reading and parsing the central directory. All other methods that require self._files first await this method. They never modify self._files. Locking is used to ensure that the initialization is thread-safe. The other methods need not lock explicitly. """ async with self._lock: if self._files is not None: return # Read tail of file (up to MAX_ZIP_TAIL_READ) from the end data = await self.fs._cat_file(self.path, start=-self.MAX_ZIP_TAIL_READ, end=None) if len(data) < 22: # Minimum size for standard EOCD raise ValueError(f"EOCD doesn't fit in {self.path}: {len(data)} bytes") # EOCD variables and their lengths (order matters) # These dicts use exact wording from https://pkwaredownloads.blob.core.windows.net/pkware-general/Documentation/APPNOTE-6.3.9.TXT eocd_var_lengths = { 'end of central dir signature': 4, # Signature (0x06054b50) 'number of this disk': 2, # Disk number 'number of the disk with the start of the central directory': 2, # Disk number of the central directory 'total number of entries in the central directory on this disk': 2, # Number of entries on this disk 'total number of entries in the central directory': 2, # Total number of entries 'size of the central directory offset of start of central directory with respect to': 4, # Size of the central directory 'the starting disk number': 4, # Offset of the start of the central directory '.ZIP file comment length': 2 # Length of the comment } # ZIP64 EOCD variables and their lengths (order matters) zip64_eocd_var_lengths = { 'zip64 end of central dir signature': 4, 'size of zip64 end of central directory record': 8, 'version made by': 2, 'version needed to extract': 2, 'number of this disk': 4, 'number of the disk with the start of the central directory': 4, 'total number of entries in the central directory on this disk': 8, 'total number of entries in the central directory': 8, 'size of the central directory': 8, 'offset of start of central directory with respect to the starting disk number': 8 } # Map lengths to struct formats var_length_to_format = { 2: 'H', # Unsigned short (2 bytes) 4: 'L', # Unsigned long (4 bytes) 8: 'Q' # Unsigned long long (8 bytes) } # Parse EOCD eocd = {} is_zip64 = False eocd_pos = data[:-20+4].rfind(b'\x50\x4b\x05\x06') # 20 bytes for EOCD, 4 bytes for EOCD signature if eocd_pos == -1: raise ValueError(f"No EOCD in the last {self.MAX_ZIP_TAIL_READ} bytes of {self.path}") pos = eocd_pos for var, length in eocd_var_lengths.items(): eocd[var] = struct.unpack_from(f'<{var_length_to_format[length]}', data, pos)[0] # Check for ZIP64 values 0xFF or 0xFFFFFFFF if eocd[var] == 2**(length*8) - 1: eocd[var] = None is_zip64 = True pos += length if is_zip64: if len(data) - 22 < 56 + 20: # 56 bytes for ZIP64 EOCD + 20 bytes for locator raise ValueError(f"ZIP64 EOCD and ZIP64 EOCD locator do not fit in {self.path}") # Find ZIP64 EOCD zip64_eocd_pos = data[:eocd_pos-56+4].rfind(b'\x50\x4b\x06\x06') # 20 bytes for EOCD, 56 bytes for ZIP64 EOCD, 4 bytes for ZIP64 EOCD signature if zip64_eocd_pos == -1: raise ValueError(f"No ZIP64 EOCD in the last {self.MAX_ZIP_TAIL_READ} bytes of {self.path}") pos = zip64_eocd_pos for var, length in zip64_eocd_var_lengths.items(): eocd[var] = struct.unpack_from(f'<{var_length_to_format[length]}', data, pos)[0] pos += length # Require single-disk zip if eocd['number of this disk'] != 0 or eocd['number of the disk with the start of the central directory'] != 0 or eocd['total number of entries in the central directory on this disk'] != eocd['total number of entries in the central directory']: raise ValueError(f"Unsupported multi-disk central directory in {self.path}") # Convenience variables cd_size = eocd['size of the central directory'] cd_offset = eocd['offset of start of central directory with respect to the starting disk number'] cd_entries = eocd['total number of entries in the central directory'] # Read and parse central directory if cd_size == 0: # No central directory, empty zip file return cd_data = await self.fs._cat_file(self.path, start=cd_offset, end=cd_offset + cd_size) # Save data to file if len(cd_data) != cd_size: raise ValueError(f"Failed to read central directory: expected {cd_size} bytes, got {len(cd_data)}") # Central directory file header variables and their lengths cd_file_header_var_lengths = { 'central file header signature': 4, 'version made by': 2, 'version needed to extract': 2, 'general purpose bit flag': 2, 'compression method': 2, 'last mod file time': 2, 'last mod file date': 2, 'crc-32': 4, 'compressed size': 4, 'uncompressed size': 4, 'file name length': 2, 'extra field length': 2, 'file comment length': 2, 'disk number start': 2, 'internal file attributes': 2, 'external file attributes': 4, 'relative offset of local header': 4 } # Central or local file header ZIP64 extended information extra field variables and their lengths # These share names with the standard fields but have different lengths zip64_file_header_var_lengths = { 'uncompressed size': 8, 'compressed size': 8, 'relative offset of local header': 8, 'disk number start': 4 } # Autocreate root dir self._files = { '': { 'children': [] } } # Parse central directory entries pos = 0 previous_file = None for file_index in range(cd_entries): if pos + 46 > len(cd_data): # 46 bytes for the central directory file header raise ValueError(f"Truncated central directory entry in {self.path}") cd_file_header = {} for var, length in cd_file_header_var_lengths.items(): cd_file_header[var] = struct.unpack_from(f'<{var_length_to_format[length]}', cd_data, pos)[0] # Check for ZIP64 values 0xFF or 0xFFFFFFFF if cd_file_header[var] == 2**(length*8) - 1: cd_file_header[var] = None pos += length if cd_file_header['central file header signature'] != 0x02014b50: raise ValueError(f"Invalid central directory header signature in {self.path}") if cd_file_header['compression method'] != 0 or cd_file_header['compressed size'] != cd_file_header['uncompressed size']: raise ValueError(f"File in {self.path} is not stored (uncompressed)") utf8 = cd_file_header['general purpose bit flag'] & 0x800 != 0 # Bit 11 # Convenience variables fname_len = cd_file_header['file name length'] extra_len = cd_file_header['extra field length'] comment_len = cd_file_header['file comment length'] # Read filename if pos + fname_len > len(cd_data): raise ValueError(f"Truncated filename in {self.path}") if utf8: fname = cd_data[pos:pos+fname_len].decode('utf-8') else: fname = cd_data[pos:pos+fname_len].decode('ascii') pos += fname_len # Parse extra field extra_end = pos + extra_len if extra_end > len(cd_data): raise ValueError(f"Truncated extra field in {self.path}") while pos < extra_end: if pos + 4 > extra_end: raise ValueError(f"Truncated extra field in {self.path}") tag, size = struct.unpack_from('<HH', cd_data, pos) pos += 4 if pos + size > extra_end: raise ValueError(f"Truncated extra field in {self.path}") backup_pos = pos if tag == 0x0001: # ZIP64 extended information extra field for var, length in zip64_file_header_var_lengths.items(): # Only parse variables that were marked as ZIP64 if cd_file_header[var] is None: cd_file_header[var] = struct.unpack_from(f'<{var_length_to_format[length]}', cd_data, pos)[0] pos += length else: if tag == 0x5455: # Extended Timestamp, not handled None elif tag == 0x7875: # Info-ZIP New Unix Extra Field, skip it None else: # Unknown extra field, skip it None pos += size if backup_pos + size != pos: raise ValueError(f"Invalid extra field size in {self.path}") pos = backup_pos + size # Convenience variables size = cd_file_header['compressed size'] offset = cd_file_header['relative offset of local header'] # Skip comment pos += comment_len # Detect directory is_dir = fname.endswith('/') if is_dir: # Remove trailing slash fname = fname[:-1] # Store file info if is_dir: # Dirs can be recognized by key 'children' existing file = { 'children': [] } else: # It's a file file = { 'size': size, 'offset': offset } self._files[fname] = file # Add file as the parent directory's child if fname != '': parent = posixpath.dirname(fname) if parent not in self._files: raise NotImplementedError("Autocreation of parent folder {parent} not implemented, in {self.path}") self._files[parent]['children'].append(fname) # Calculate offset of previous file if previous_file is not None and 'children' not in previous_file: # We require the files to be stored in the order of the central directory entries, # although sorting would be an option if offset <= previous_file['offset']: raise ValueError(f"Non-ascending order of local header offsets in {self.path}") # Previous file's offset = current file's offset - previous file's size # This should work unless there is empty space in the zip file, which is unlikely. previous_file['offset'] = offset - previous_file['size'] previous_file = file # Calculate offset of previous file if previous_file is not None and 'children' not in previous_file: if cd_offset <= previous_file['offset']: raise ValueError(f"Non-ascending order of local header offsets in {self.path}") # Last file ends where central directory begins previous_file['offset'] = cd_offset - previous_file['size'] async def _ls(self, path: str, detail: bool = True, **kwargs) -> List: """ List files and directories in the given path. If the path points to a file, list just the file. """ # Always await self._initialize() in functions needing self._files await self._initialize() # Internally we don't use a root slash, so strip it. Also strip any trailing slash. path = posixpath.normpath(path).lstrip('/').rstrip('/') # Helper function to get file name or details def get_file_listing(file, fname, detail): if detail: if 'children' in file: return { 'name': f'/{fname}', 'type': 'directory', 'size': 0, 'created': None, 'islink': False } else: return { 'name': f'/{fname}', 'type': 'file', 'size': file['size'], 'created': None, 'islink': False } else: return f'/{fname}' # List children results = [] if path not in self._files: raise FileNotFoundError(f"Path {path} not found") file = self._files[path] if 'children' in file: # Path points to a dir for child in file['children']: results.append(get_file_listing(self._files[child], child, detail)) else: # Path points to a file results = [get_file_listing(file, path, detail)] return results async def _cat_file(self, path: str, start: Optional[int] = None, end: Optional[int] = None, **kwargs) -> bytes: """Read the contents of a file in the zip.""" # Always await self._initialize() in functions needing self._files await self._initialize() # Internally we don't use a root slash, so strip it. Also strip any trailing slash. path = posixpath.normpath(path).lstrip('/').rstrip('/') # Check if the file is available if path not in self._files: raise FileNotFoundError(f"File {path} not found") elif 'children' in self._files[path]: raise FileNotFoundError(f"{path} is a directory") # Get offset and size of the file in the zip file info = self._files[path] offset = info['offset'] size = info['size'] # Set start to beginning of file if not specified start = start or 0 # Convert negative start (relative to end of file) to positive start if start < 0: start = max(0, size + start) # Clamp too large negative start to the beginning of file # Set end to end of file if not specified end = end or size # Convert negative start (relative to end of file) to positive start if end < 0: end = max(0, size + end) # Clamp too large negative start to the beginning of file # For start beyond the end of the file or the end, return empty data if start >= size or end <= start: return b'' # Calculate zip file read start and read size read_start = offset + start read_size = min(end, size) - start # Clamp too large end at size # Read data data = await self.fs._cat_file(self.path, start=read_start, end=read_start+read_size) return data |
Beta Was this translation helpful? Give feedback.
-
| Hi, My two cents on this issue. I ended up with the following code : class HttpZipStore(ZipStore): def __init__(self, path) -> None: super().__init__(path="", mode="r") self.path = path def _load_zip_zarr(**kwargs): fs = HTTPFileSystem(asynchronous=False, block_size=10000) zipfile = fs.open(LOCAL_ZARR_ZIP) store = HttpZipStore(zipfile) return open_datatree(store, engine="zarr", **kwargs)I think this is an important use case to support: Zarr files are hard to handle / move around if not zipped. It's easy to drop a zipped Zarr on some cloud services / Zenodo repository. As a user, I would not have to deal with custom store setup. open_datatree("https://myserver/dataset.zarr.zip")Or at least, there should be some example / code snippet for this use case in the documentation. Thanks for your work and this great project ! |
Beta Was this translation helpful? Give feedback.
-
| I am curious how many byte range requests reading a zip file over HTTP or S3 requires versus a Zarr array with a single shard. From above, it seems like making large requests and making some assumptions can speed up loading from a zip file. How often are those assumptions true and what do the common and worse case scenarios look like? |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
I searched and could not find an example of accessing a
.zarr.zipfrom an s3 endpoint without having to first download it entirely. The providedzarr.storage.ZipStoreonly works on a local path (right?).I experimented and found that this works:
I am wondering if this is alright. Is there anything that could be improved with this approach?
My use-case is read-only. I understand that this approach would not be able to handle updates without updating the entire
.zarr.zip.And more generally, if someone else is searching for a solution like this, I hope this helps!
Beta Was this translation helpful? Give feedback.
All reactions