Skip to content

Commit 7d06c96

Browse files
authored
FEAT: Streaming support in fetchone, fetchmany, fetchall for varbinarymax data type. (#232)
### Work Item / Issue Reference <!-- IMPORTANT: Please follow the PR template guidelines below. For mssql-python maintainers: Insert your ADO Work Item ID below (e.g. AB#37452) For external contributors: Insert Github Issue number below (e.g. #149) Only one reference is required - either GitHub issue OR ADO Work Item. --> <!-- mssql-python maintainers: ADO Work Item --> > [AB#33395](https://sqlclientdrivers.visualstudio.com/c6d89619-62de-46a0-8b46-70b92a84d85e/_workitems/edit/33395) <!-- External contributors: GitHub Issue --> > GitHub Issue: #<ISSUE_NUMBER> ------------------------------------------------------------------- ### Summary <!-- Insert your summary of changes below. Minimum 10 characters required. --> This pull request significantly improves support for streaming and fetching large binary (VARBINARY(MAX)) and large text columns in the MSSQL Python driver. The main changes include robust chunked retrieval of large objects (LOBs), correct handling of edge cases (such as empty or null values), and enhanced test coverage for these scenarios. **LOB Streaming and Fetching Enhancements:** * Added a new helper function `FetchLobColumnData` to efficiently stream and assemble large binary/text columns (LOBs) from the database, handling chunking, null/empty values, and correct type conversion for both binary and (wide/narrow) string columns. * Updated the logic in `SQLGetData_wrap`, `FetchBatchData`, `FetchMany_wrap`, and `FetchAll_wrap` to detect LOB columns and use the new streaming path for fetching them, including proper fallback to row-by-row fetching when LOBs are present. [[1]](diffhunk://#diff-dde2297345718ec449a14e7dff91b7bb2342b008ecc071f562233646d71144a1L2062-R2184) [[2]](diffhunk://#diff-dde2297345718ec449a14e7dff91b7bb2342b008ecc071f562233646d71144a1L2542-R2631) [[3]](diffhunk://#diff-dde2297345718ec449a14e7dff91b7bb2342b008ecc071f562233646d71144a1R2760-R2788) [[4]](diffhunk://#diff-dde2297345718ec449a14e7dff91b7bb2342b008ecc071f562233646d71144a1L2699-R2803) [[5]](diffhunk://#diff-dde2297345718ec449a14e7dff91b7bb2342b008ecc071f562233646d71144a1R2882-R2910) [[6]](diffhunk://#diff-dde2297345718ec449a14e7dff91b7bb2342b008ecc071f562233646d71144a1L2792-R2925) * Modified the batch fetch function signatures and logic to propagate LOB column information and ensure correct handling during bulk fetches. **Testing Improvements:** * Replaced and expanded the test for large binary data with a new, comprehensive test (`test_varbinarymax_insert_fetch`) that verifies insertion and retrieval of empty, small, and large VARBINARY(MAX) values (including edge cases around the 8000-byte threshold) using `fetchone`, `fetchall`, and `fetchmany`. These changes ensure that the driver can reliably handle large binary and text columns in all fetch scenarios, improving correctness and robustness for users working with LOB data. <!-- ### PR Title Guide > For feature requests FEAT: (short-description) > For non-feature requests like test case updates, config updates , dependency updates etc CHORE: (short-description) > For Fix requests FIX: (short-description) > For doc update requests DOC: (short-description) > For Formatting, indentation, or styling update STYLE: (short-description) > For Refactor, without any feature changes REFACTOR: (short-description) > For release related changes, without any feature changes RELEASE: #<RELEASE_VERSION> (short-description) ### Contribution Guidelines External contributors: - Create a GitHub issue first: https://github.com/microsoft/mssql-python/issues/new - Link the GitHub issue in the "GitHub Issue" section above - Follow the PR title format and provide a meaningful summary mssql-python maintainers: - Create an ADO Work Item following internal processes - Link the ADO Work Item in the "ADO Work Item" section above - Follow the PR title format and provide a meaningful summary -->
1 parent 61ed764 commit 7d06c96

File tree

2 files changed

+121
-55
lines changed

2 files changed

+121
-55
lines changed

mssql_python/pybind/ddbc_bindings.cpp

Lines changed: 29 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2155,45 +2155,40 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p
21552155
case SQL_BINARY:
21562156
case SQL_VARBINARY:
21572157
case SQL_LONGVARBINARY: {
2158-
// TODO: revisit
2159-
HandleZeroColumnSizeAtFetch(columnSize);
2160-
std::unique_ptr<SQLCHAR[]> dataBuffer(new SQLCHAR[columnSize]);
2161-
SQLLEN dataLen;
2162-
ret = SQLGetData_ptr(hStmt, i, SQL_C_BINARY, dataBuffer.get(), columnSize, &dataLen);
2158+
// Use streaming for large VARBINARY (columnSize unknown or > 8000)
2159+
if (columnSize == SQL_NO_TOTAL || columnSize == 0 || columnSize > 8000) {
2160+
LOG("Streaming LOB for column {} (VARBINARY)", i);
2161+
row.append(FetchLobColumnData(hStmt, i, SQL_C_BINARY, false, true));
2162+
} else {
2163+
// Small VARBINARY, fetch directly
2164+
std::vector<SQLCHAR> dataBuffer(columnSize);
2165+
SQLLEN dataLen;
2166+
ret = SQLGetData_ptr(hStmt, i, SQL_C_BINARY, dataBuffer.data(), columnSize, &dataLen);
21632167

2164-
if (SQL_SUCCEEDED(ret)) {
2165-
// TODO: Refactor these if's across other switches to avoid code duplication
2166-
if (dataLen > 0) {
2167-
if (static_cast<size_t>(dataLen) <= columnSize) {
2168-
row.append(py::bytes(reinterpret_cast<const char*>(
2169-
dataBuffer.get()), dataLen));
2170-
} else {
2171-
// In this case, buffer size is smaller, and data to be retrieved is longer
2172-
// TODO: Revisit
2168+
if (SQL_SUCCEEDED(ret)) {
2169+
if (dataLen > 0) {
2170+
if (static_cast<size_t>(dataLen) <= columnSize) {
2171+
row.append(py::bytes(reinterpret_cast<const char*>(dataBuffer.data()), dataLen));
2172+
} else {
2173+
LOG("VARBINARY column {} data truncated, using streaming LOB", i);
2174+
row.append(FetchLobColumnData(hStmt, i, SQL_C_BINARY, false, true));
2175+
}
2176+
} else if (dataLen == SQL_NULL_DATA) {
2177+
row.append(py::none());
2178+
} else if (dataLen == 0) {
2179+
row.append(py::bytes(""));
2180+
} else {
21732181
std::ostringstream oss;
2174-
oss << "Buffer length for fetch (" << columnSize << ") is smaller, & data "
2175-
<< "to be retrieved is longer (" << dataLen << "). ColumnID - "
2176-
<< i << ", datatype - " << dataType;
2182+
oss << "Unexpected negative length (" << dataLen << ") returned by SQLGetData. ColumnID="
2183+
<< i << ", dataType=" << dataType << ", bufferSize=" << columnSize;
2184+
LOG("Error: {}", oss.str());
21772185
ThrowStdException(oss.str());
21782186
}
2179-
} else if (dataLen == SQL_NULL_DATA) {
2180-
row.append(py::none());
2181-
} else if (dataLen == 0) {
2182-
// Empty bytes
2183-
row.append(py::bytes(""));
2184-
} else if (dataLen < 0) {
2185-
// This is unexpected
2186-
LOG("SQLGetData returned an unexpected negative data length. "
2187-
"Raising exception. Column ID - {}, Data Type - {}, Data Length - {}",
2188-
i, dataType, dataLen);
2189-
ThrowStdException("SQLGetData returned an unexpected negative data length");
2187+
} else {
2188+
LOG("Error retrieving VARBINARY data for column {}. SQLGetData rc = {}", i, ret);
2189+
row.append(py::none());
21902190
}
2191-
} else {
2192-
LOG("Error retrieving data for column - {}, data type - {}, SQLGetData return "
2193-
"code - {}. Returning NULL value instead",
2194-
i, dataType, ret);
2195-
row.append(py::none());
2196-
}
2191+
}
21972192
break;
21982193
}
21992194
case SQL_TINYINT: {

tests/test_004_cursor.py

Lines changed: 92 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6113,34 +6113,71 @@ def test_binary_data_over_8000_bytes(cursor, db_connection):
61136113
drop_table_if_exists(cursor, "#pytest_small_binary")
61146114
db_connection.commit()
61156115

6116-
def test_binary_data_large(cursor, db_connection):
6117-
"""Test insertion of binary data larger than 8000 bytes with streaming support."""
6116+
def test_varbinarymax_insert_fetch(cursor, db_connection):
6117+
"""Test for VARBINARY(MAX) insert and fetch (streaming support) using execute per row"""
61186118
try:
6119-
drop_table_if_exists(cursor, "#pytest_large_binary")
6119+
# Create test table
6120+
drop_table_if_exists(cursor, "#pytest_varbinarymax")
61206121
cursor.execute("""
6121-
CREATE TABLE #pytest_large_binary (
6122-
id INT PRIMARY KEY,
6123-
large_binary VARBINARY(MAX)
6122+
CREATE TABLE #pytest_varbinarymax (
6123+
id INT,
6124+
binary_data VARBINARY(MAX)
61246125
)
61256126
""")
6126-
6127-
# Large binary data > 8000 bytes
6128-
large_data = b'A' * 10000 # 10 KB
6129-
cursor.execute("INSERT INTO #pytest_large_binary (id, large_binary) VALUES (?, ?)", (1, large_data))
6127+
6128+
# Prepare test data
6129+
test_data = [
6130+
(2, b''), # Empty bytes
6131+
(3, b'1234567890'), # Small binary
6132+
(4, b'A' * 9000), # Large binary > 8000 (streaming)
6133+
(5, b'B' * 20000), # Large binary > 8000 (streaming)
6134+
(6, b'C' * 8000), # Edge case: exactly 8000 bytes
6135+
(7, b'D' * 8001), # Edge case: just over 8000 bytes
6136+
]
6137+
6138+
# Insert each row using execute
6139+
for row_id, binary in test_data:
6140+
cursor.execute("INSERT INTO #pytest_varbinarymax VALUES (?, ?)", (row_id, binary))
61306141
db_connection.commit()
6131-
print("Inserted large binary data (>8000 bytes) successfully.")
6132-
6133-
# commented out for now
6134-
# cursor.execute("SELECT large_binary FROM #pytest_large_binary WHERE id=1")
6135-
# result = cursor.fetchone()
6136-
# assert result[0] == large_data, f"Large binary data mismatch, got {len(result[0])} bytes"
6137-
6138-
# print("Large binary data (>8000 bytes) inserted and verified successfully.")
6139-
6142+
6143+
# ---------- FETCHONE TEST (multi-column) ----------
6144+
cursor.execute("SELECT id, binary_data FROM #pytest_varbinarymax ORDER BY id")
6145+
rows = []
6146+
while True:
6147+
row = cursor.fetchone()
6148+
if row is None:
6149+
break
6150+
rows.append(row)
6151+
6152+
assert len(rows) == len(test_data), f"Expected {len(test_data)} rows, got {len(rows)}"
6153+
6154+
# Validate each row
6155+
for i, (expected_id, expected_data) in enumerate(test_data):
6156+
fetched_id, fetched_data = rows[i]
6157+
assert fetched_id == expected_id, f"Row {i+1} ID mismatch: expected {expected_id}, got {fetched_id}"
6158+
assert isinstance(fetched_data, bytes), f"Row {i+1} expected bytes, got {type(fetched_data)}"
6159+
assert fetched_data == expected_data, f"Row {i+1} data mismatch"
6160+
6161+
# ---------- FETCHALL TEST ----------
6162+
cursor.execute("SELECT id, binary_data FROM #pytest_varbinarymax ORDER BY id")
6163+
all_rows = cursor.fetchall()
6164+
assert len(all_rows) == len(test_data)
6165+
6166+
# ---------- FETCHMANY TEST ----------
6167+
cursor.execute("SELECT id, binary_data FROM #pytest_varbinarymax ORDER BY id")
6168+
batch_size = 2
6169+
batches = []
6170+
while True:
6171+
batch = cursor.fetchmany(batch_size)
6172+
if not batch:
6173+
break
6174+
batches.extend(batch)
6175+
assert len(batches) == len(test_data)
6176+
61406177
except Exception as e:
6141-
pytest.fail(f"Large binary data insertion test failed: {e}")
6178+
pytest.fail(f"VARBINARY(MAX) insert/fetch test failed: {e}")
61426179
finally:
6143-
drop_table_if_exists(cursor, "#pytest_large_binary")
6180+
drop_table_if_exists(cursor, "#pytest_varbinarymax")
61446181
db_connection.commit()
61456182

61466183

@@ -6303,6 +6340,40 @@ def test_binary_mostly_small_one_large(cursor, db_connection):
63036340
drop_table_if_exists(cursor, "#pytest_mixed_size_binary")
63046341
db_connection.commit()
63056342

6343+
def test_varbinarymax_insert_fetch_null(cursor, db_connection):
6344+
"""Test insertion and retrieval of NULL value in VARBINARY(MAX) column."""
6345+
try:
6346+
drop_table_if_exists(cursor, "#pytest_varbinarymax_null")
6347+
cursor.execute("""
6348+
CREATE TABLE #pytest_varbinarymax_null (
6349+
id INT,
6350+
binary_data VARBINARY(MAX)
6351+
)
6352+
""")
6353+
6354+
# Insert a row with NULL for binary_data
6355+
cursor.execute(
6356+
"INSERT INTO #pytest_varbinarymax_null VALUES (?, CAST(NULL AS VARBINARY(MAX)))",
6357+
(1,)
6358+
)
6359+
db_connection.commit()
6360+
6361+
# Fetch the row
6362+
cursor.execute("SELECT id, binary_data FROM #pytest_varbinarymax_null")
6363+
row = cursor.fetchone()
6364+
6365+
assert row is not None, "No row fetched"
6366+
fetched_id, fetched_data = row
6367+
assert fetched_id == 1, "ID mismatch"
6368+
assert fetched_data is None, "Expected NULL for binary_data"
6369+
6370+
except Exception as e:
6371+
pytest.fail(f"VARBINARY(MAX) NULL insert/fetch test failed: {e}")
6372+
6373+
finally:
6374+
drop_table_if_exists(cursor, "#pytest_varbinarymax_null")
6375+
db_connection.commit()
6376+
63066377
def test_only_null_and_empty_binary(cursor, db_connection):
63076378
"""Test table with only NULL and empty binary values to ensure fallback doesn't produce size=0"""
63086379
try:

0 commit comments

Comments
 (0)