Skip to content
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ All warnings for upcoming changes in pandas will have the base class :class:`pan

Other enhancements
^^^^^^^^^^^^^^^^^^
- :func:`DataFrame.to_sql` now accepts a ``hints`` parameter to pass database-specific query hints for optimizing insert performance. The hints are specified as a dictionary mapping dialect names to hint strings (e.g., ``{'oracle': '/*+ APPEND PARALLEL(4) */', 'mysql': 'DELAYED'}``). Users are responsible for providing correctly formatted hint strings for their target database (:issue:`61370`)
- :func:`pandas.merge` propagates the ``attrs`` attribute to the result if all
inputs have identical ``attrs``, as has so far already been the case for
:func:`pandas.concat`.
Expand Down
17 changes: 17 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2798,6 +2798,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
hints: dict[str, str] | None = None,
) -> int | None:
"""
Write records stored in a DataFrame to a SQL database.
Expand Down Expand Up @@ -2861,6 +2862,21 @@ def to_sql(

Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.
hints : dict[str, str], optional
Dictionary of SQL hints to optimize insertion performance, keyed by
database dialect name (e.g., 'oracle', 'mysql', 'postgresql', 'mssql').
Each value should be a complete hint string formatted exactly as required
by the target database. The user is responsible for providing correctly
formatted hint strings.

Examples: ``{'oracle': '/*+ APPEND PARALLEL(4) */', 'mysql': 'DELAYED'}``

.. note::
- Hints are database-specific and ignored for unsupported dialects.
- SQLite raises a ``UserWarning`` (hints not supported).
- ADBC connections raise ``NotImplementedError``.

.. versionadded:: 3.0.0

Returns
-------
Expand Down Expand Up @@ -3044,6 +3060,7 @@ def to_sql(
chunksize=chunksize,
dtype=dtype,
method=method,
hints=hints,
)

@final
Expand Down
108 changes: 96 additions & 12 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
datetime,
time,
)
from functools import partial
import re
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -235,6 +234,18 @@ def _wrap_result_adbc(
return df


def _process_sql_hints(hints: dict[str, str] | None, dialect_name: str) -> str | None:
if hints is None:
return None

dialect_name = dialect_name.lower()
for key, value in hints.items():
if key.lower() == dialect_name:
return value

return None


# -----------------------------------------------------------------------------
# -- Read and write to DataFrames

Expand Down Expand Up @@ -753,6 +764,7 @@ def to_sql(
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
engine: str = "auto",
hints: dict[str, str] | None = None,
**engine_kwargs,
) -> int | None:
"""
Expand Down Expand Up @@ -813,6 +825,23 @@ def to_sql(

.. versionadded:: 1.3.0

hints : dict[str, str], optional
SQL hints to optimize insertion performance, keyed by database dialect name.
Each hint should be a complete string formatted exactly as required by the
target database. The user is responsible for constructing dialect-specific
syntax.

Examples: ``{'oracle': '/*+ APPEND PARALLEL(4) */'}``
``{'mysql': 'DELAYED'}``
``{'mssql': 'WITH (TABLOCK)'}``

.. note::
- Hints are database-specific and will be ignored for unsupported dialects
- SQLite will raise a UserWarning (hints not supported)
- ADBC connections will raise NotImplementedError

.. versionadded:: 3.0.0

**engine_kwargs
Any additional kwargs are passed to the engine.

Expand Down Expand Up @@ -855,6 +884,7 @@ def to_sql(
dtype=dtype,
method=method,
engine=engine,
hints=hints,
**engine_kwargs,
)

Expand Down Expand Up @@ -1004,7 +1034,13 @@ def create(self) -> None:
else:
self._execute_create()

def _execute_insert(self, conn, keys: list[str], data_iter) -> int:
def _execute_insert(
self,
conn,
keys: list[str],
data_iter,
hint_str: str | None = None,
) -> int:
"""
Execute SQL statement inserting data

Expand All @@ -1016,11 +1052,23 @@ def _execute_insert(self, conn, keys: list[str], data_iter) -> int:
data_iter : generator of list
Each item contains a list of values to be inserted
"""
data = [dict(zip(keys, row, strict=True)) for row in data_iter]
result = self.pd_sql.execute(self.table.insert(), data)
data = [dict(zip(keys, row, strict=False)) for row in data_iter]

if hint_str:
stmt = self.table.insert().prefix_with(hint_str)
else:
stmt = self.table.insert()

result = self.pd_sql.execute(stmt, data)
return result.rowcount

def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:
def _execute_insert_multi(
self,
conn,
keys: list[str],
data_iter,
hint_str: str | None = None,
) -> int:
"""
Alternative to _execute_insert for DBs support multi-value INSERT.

Expand All @@ -1029,11 +1077,15 @@ def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:
but performance degrades quickly with increase of columns.

"""

from sqlalchemy import insert

data = [dict(zip(keys, row, strict=True)) for row in data_iter]
stmt = insert(self.table).values(data)
data = [dict(zip(keys, row, strict=False)) for row in data_iter]

if hint_str:
stmt = insert(self.table).values(data).prefix_with(hint_str)
else:
stmt = insert(self.table).values(data)

result = self.pd_sql.execute(stmt)
return result.rowcount

Expand Down Expand Up @@ -1090,14 +1142,20 @@ def insert(
self,
chunksize: int | None = None,
method: Literal["multi"] | Callable | None = None,
hints: dict[str, str] | None = None,
dialect_name: str | None = None,
) -> int | None:
# set insert method
if method is None:
exec_insert = self._execute_insert
elif method == "multi":
exec_insert = self._execute_insert_multi
elif callable(method):
exec_insert = partial(method, self)

def callable_wrapper(conn, keys, data_iter, hint_str=None):
return method(self, conn, keys, data_iter)

exec_insert = callable_wrapper
else:
raise ValueError(f"Invalid parameter `method`: {method}")

Expand All @@ -1114,6 +1172,9 @@ def insert(
raise ValueError("chunksize argument should be non-zero")

chunks = (nrows // chunksize) + 1

hint_str = _process_sql_hints(hints, dialect_name) if dialect_name else None

total_inserted = None
with self.pd_sql.run_transaction() as conn:
for i in range(chunks):
Expand All @@ -1125,7 +1186,7 @@ def insert(
chunk_iter = zip(
*(arr[start_i:end_i] for arr in data_list), strict=True
)
num_inserted = exec_insert(conn, keys, chunk_iter)
num_inserted = exec_insert(conn, keys, chunk_iter, hint_str)
# GH 46891
if num_inserted is not None:
if total_inserted is None:
Expand Down Expand Up @@ -1509,6 +1570,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
hints: dict[str, str] | None = None,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -1545,6 +1607,8 @@ def insert_records(
schema=None,
chunksize: int | None = None,
method=None,
hints: dict[str, str] | None = None,
dialect_name: str | None = None,
**engine_kwargs,
) -> int | None:
"""
Expand All @@ -1569,6 +1633,8 @@ def insert_records(
schema=None,
chunksize: int | None = None,
method=None,
hints: dict[str, str] | None = None,
dialect_name: str | None = None,
**engine_kwargs,
) -> int | None:
from sqlalchemy import exc
Expand Down Expand Up @@ -1980,6 +2046,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
hints: dict[str, str] | None = None,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -2053,6 +2120,8 @@ def to_sql(
schema=schema,
chunksize=chunksize,
method=method,
hints=hints,
dialect_name=self.con.dialect.name,
**engine_kwargs,
)

Expand Down Expand Up @@ -2344,6 +2413,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
hints: dict[str, str] | None = None,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -2394,6 +2464,8 @@ def to_sql(
raise NotImplementedError(
"engine != 'auto' not implemented for ADBC drivers"
)
if hints:
raise NotImplementedError("'hints' is not implemented for ADBC drivers")

if schema:
table_name = f"{schema}.{name}"
Expand Down Expand Up @@ -2575,7 +2647,9 @@ def insert_statement(self, *, num_rows: int) -> str:
)
return insert_statement

def _execute_insert(self, conn, keys, data_iter) -> int:
def _execute_insert(
self, conn, keys: list[str], data_iter, hint_str: str | None = None
) -> int:
from sqlite3 import Error

data_list = list(data_iter)
Expand All @@ -2585,7 +2659,9 @@ def _execute_insert(self, conn, keys, data_iter) -> int:
raise DatabaseError("Execution failed") from exc
return conn.rowcount

def _execute_insert_multi(self, conn, keys, data_iter) -> int:
def _execute_insert_multi(
self, conn, keys: list[str], data_iter, hint_str: str | None = None
) -> int:
data_list = list(data_iter)
flattened_data = [x for row in data_list for x in row]
conn.execute(self.insert_statement(num_rows=len(data_list)), flattened_data)
Expand Down Expand Up @@ -2821,6 +2897,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
hints: dict[str, str] | None = None,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -2863,6 +2940,13 @@ def to_sql(
Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.
"""
if hints:
warnings.warn(
"SQL hints are not supported for SQLite and will be ignored.",
UserWarning,
stacklevel=find_stack_level(),
)

if dtype:
if not is_dict_like(dtype):
# error: Value expression in dictionary comprehension has incompatible
Expand Down
Loading
Loading