Description
Bug report
Bug description:
I ran into a data corruption bug that seems to be triggered by interleaving reads/seeks from different files inside of an uncompressed zip file. As far as I can tell from the docs, this is allowed by zipfile
. It works correctly in Python 3.7 and 3.9, but fails in 3.12.
I'm attaching a somewhat convoluted testcase (still working on a simpler one). It parses a dBase IV database by reading records from a .dbf file, and for each record, reading a corresponding record from a .dbt file.
When run using Python 3.9, you will see a bunch of data printed out. When run using Python 3.12, you will get an exception ValueError: Invalid dBase IV block: b'PK\x03\x04\n\x00\x00\x00'
. That block does not appear in the input file at all. (Though, when tested with a larger input, I got a block of bytes that appeared in the wrong file.)
For some context, here is a workaround I used in my project: I changed it to read the .dbf file first, then the .dbt.
Testcase:
#!/usr/bin/env python3 import datetime import pathlib import struct import zipfile from dataclasses import dataclass from typing import Any, BinaryIO, List, Tuple ZIP_PATH = pathlib.Path(__file__).parent / 'notams.zip' @dataclass class DbfHeader: SIZE = 32 VERSION = 3 info: int last_update: datetime.date num_records: int header_bytes: int record_bytes: int @classmethod def from_bytes(cls, data: bytes): info, year, month, day, num_records, header_bytes, record_bytes = struct.unpack('<4BIHH20x', data) version = info & 0x3 if version != cls.VERSION: raise ValueError(f"Unsupported DBF version: {version}") return cls(info, datetime.date(year + 1900, month, day), num_records, header_bytes, record_bytes) @dataclass class DbfField: SIZE = 32 name: str type: str length: int @classmethod def from_bytes(cls, data: bytes): name, typ, length = struct.unpack('<11sc4xB15x', data) return cls(name.rstrip(b'\x00').decode(), typ.decode(), length) class DbfFile: @classmethod def read_header(cls, fd: BinaryIO) -> Tuple[DbfHeader, List[DbfField]]: header = DbfHeader.from_bytes(fd.read(DbfHeader.SIZE)) num_fields = (header.header_bytes - 33) // 32 fields = [DbfField.from_bytes(fd.read(DbfField.SIZE)) for _ in range(num_fields)] if fd.read(1) != b'\x0D': raise ValueError("Missing array terminator") return header, fields @classmethod def read_record(cls, fd: BinaryIO, fields: List[DbfField]) -> List[Any]: fd.read(1) values = [] for field in fields: data = fd.read(field.length).decode('latin-1').strip(' ') if field.type == 'C': value = data elif field.type == 'D': s = data.strip(' ') if s: value = datetime.datetime.strptime(data, '%Y%m%d').date() else: value = None elif field.type == 'L': if len(data) != 1: raise ValueError(f"Incorrect length: {data!r}") if data in 'YyTt': value = True elif data in 'NnFf': value = False elif data == '?': value = None else: raise ValueError(f"Incorrect boolean: {data!r}") elif field.type in ('M', 'N'): value = int(data) if data else None else: raise ValueError(f"Unsupported field: {field.type}") values.append(value) return values @dataclass class DbtHeader: SIZE = 512 next_free_block: int dbf_filename: str reserved: int block_length: int @classmethod def from_bytes(cls, data: bytes): next_free_block, dbf_filename, reserved, block_length = struct.unpack('<I4x8sIH490x', data) return cls(next_free_block, dbf_filename.decode('latin-1'), reserved, block_length) class DbtFile: DBT3_BLOCK_SIZE = 512 DBT4_BLOCK_START = b'\xFF\xFF\x08\x00' @classmethod def read_header(cls, fd: BinaryIO) -> DbtHeader: fd.seek(0) block = fd.read(DbtHeader.SIZE) return DbtHeader.from_bytes(block) @classmethod def read_record(cls, fd: BinaryIO, header: DbtHeader, idx: int) -> str: fd.seek(header.block_length * idx) block_start = fd.read(8) if block_start[0:4] != cls.DBT4_BLOCK_START: raise ValueError(f"Invalid dBase IV block: {block_start}") length = int.from_bytes(block_start[4:8], 'little') data = fd.read(length - len(block_start)) return data.decode('latin-1') def main(): with zipfile.ZipFile(ZIP_PATH) as z: with z.open('notams.dbf') as dbf_in, z.open('notams.dbt') as dbt_in: dbf_header, dbf_fields = DbfFile.read_header(dbf_in) dbt_header = DbtFile.read_header(dbt_in) for _ in range(dbf_header.num_records): record = DbfFile.read_record(dbf_in, dbf_fields) print(record) memo = DbtFile.read_record(dbt_in, dbt_header, record[3]) print(memo) if __name__ == '__main__': main()
Input file:
notams.zip
CPython versions tested on:
3.9, 3.12
Operating systems tested on:
Linux
Linked PRs
Metadata
Metadata
Assignees
Labels
Projects
Status