FastMSSQL is an async Python library for Microsoft SQL Server (MSSQL), built in Rust. Unlike pyodbc or pymssql, it uses a native SQL Server client—no ODBC required—simplifying installation on Windows, macOS, and Linux. Great for data ingestion, bulk inserts, and large-scale query workloads.
- High performance: optimized for very high RPS and low overhead
- Rust core: memory‑safe and reliable, tuned Tokio runtime
- No ODBC: native SQL Server client, no external drivers needed
- Connection pooling: bb8‑based, smart defaults (default max_size=10, min_idle=2)
- Async first: clean async/await API with
async withcontext managers - Strong typing: fast conversions for common SQL Server types
- Thread‑safe: safe to use in concurrent apps
- Cross‑platform: Windows, macOS, Linux
- Batch operations: high-performance bulk inserts and batch query execution
Core methods for individual operations:
query()— SELECT statements that return rowsexecute()— INSERT/UPDATE/DELETE/DDL that return affected row count
# Use query() for SELECT statements result = await conn.query("SELECT * FROM users WHERE age > @P1", [25]) rows = result.rows() # Use execute() for data modification affected = await conn.execute("INSERT INTO users (name) VALUES (@P1)", ["John"])pip install fastmssql- Python 3.9 to 3.14
- Microsoft SQL Server (any recent version)
import asyncio from fastmssql import Connection async def main(): conn_str = "Server=localhost;Database=master;User Id=myuser;Password=mypass" async with Connection(conn_str) as conn: # SELECT: use query() -> rows() result = await conn.query("SELECT @@VERSION as version") for row in result.rows(): print(row['version']) # Pool statistics (tuple: connected, connections, idle, max_size, min_idle) connected, connections, idle, max_size, min_idle = await conn.pool_stats() print(f"Pool: connected={connected}, size={connections}/{max_size}, idle={idle}, min_idle={min_idle}") asyncio.run(main())When not utilizing Python's context manager (async with), FastMssql uses lazy connection initialization:
if you call query() or execute() on a new Connection, the underlying pool is created if not already present.
For more control, you can explicitly connect and disconnect:
import asyncio from fastmssql import Connection async def main(): conn_str = "Server=localhost;Database=master;User Id=myuser;Password=mypass" conn = Connection(conn_str) # Explicitly connect await conn.connect() assert await conn.is_connected() # Run queries result = await conn.query("SELECT 42 as answer") print(result.rows()[0]["answer"]) # -> 42 # Explicitly disconnect await conn.disconnect() assert not await conn.is_connected() asyncio.run(main())You can connect either with a connection string or individual parameters.
- Connection string
import asyncio from fastmssql import Connection async def main(): conn_str = "Server=localhost;Database=master;User Id=myuser;Password=mypass" async with Connection(connection_string=conn_str) as conn: rows = (await conn.query("SELECT DB_NAME() as db")).rows() print(rows[0]['db']) asyncio.run(main())- Individual parameters
import asyncio from fastmssql import Connection async def main(): async with Connection( server="localhost", database="master", username="myuser", password="mypassword" ) as conn: rows = (await conn.query("SELECT SUSER_SID() as sid")).rows() print(rows[0]['sid']) asyncio.run(main())Note: Windows authentication (Trusted Connection) is currently not supported. Use SQL authentication (username/password).
import asyncio from fastmssql import Connection async def main(): async with Connection("Server=.;Database=MyDB;User Id=sa;Password=StrongPwd;") as conn: # SELECT (returns rows) users = (await conn.query( "SELECT id, name, email FROM users WHERE active = 1" )).rows() for u in users: print(f"User {u['id']}: {u['name']} ({u['email']})") # INSERT / UPDATE / DELETE (returns affected row count) inserted = await conn.execute( "INSERT INTO users (name, email) VALUES (@P1, @P2)", ["Jane", "jane@example.com"], ) print(f"Inserted {inserted} row(s)") updated = await conn.execute( "UPDATE users SET last_login = GETDATE() WHERE id = @P1", [123], ) print(f"Updated {updated} row(s)") asyncio.run(main())Parameters use positional placeholders: @P1, @P2, ... Provide values as a list in the same order.
For high-throughput scenarios, use batch methods to reduce network round-trips:
import asyncio from fastmssql import Connection async def main_fetching(): # Replace with your actual connection string async with Connection("Server=.;Database=MyDB;User Id=sa;Password=StrongPwd;") as conn: # --- 1. Prepare Data for Demonstration --- columns = ["name", "email", "age"] data_rows = [ ["Alice Johnson", "alice@example.com", 28], ["Bob Smith", "bob@example.com", 32], ["Carol Davis", "carol@example.com", 25], ["David Lee", "david@example.com", 35], ["Eva Green", "eva@example.com", 29] ] await conn.bulk_insert("users", columns, data_rows) # --- 2. Execute Query and Retrieve the Result Object --- print("\n--- Result Object Fetching (fetchone, fetchmany, fetchall) ---") # The Result object is returned after the awaitable query executes. result = await conn.query("SELECT name, age FROM users ORDER BY age DESC") # fetchone(): Retrieves the next single row synchronously. oldest_user = result.fetchone() if oldest_user: print(f"1. fetchone: Oldest user is {oldest_user['name']} (Age: {oldest_user['age']})") # fetchmany(2): Retrieves the next set of rows synchronously. next_two_users = result.fetchmany(2) print(f"2. fetchmany: Retrieved {len(next_two_users)} users: {[r['name'] for r in next_two_users]}.") # fetchall(): Retrieves all remaining rows synchronously. remaining_users = result.fetchall() print(f"3. fetchall: Retrieved all {len(remaining_users)} remaining users: {[r['name'] for r in remaining_users]}.") # Exhaustion Check: Subsequent calls return None/[] print(f"4. Exhaustion Check (fetchone): {result.fetchone()}") print(f"5. Exhaustion Check (fetchmany): {result.fetchmany(1)}") # --- 3. Batch Commands for multiple operations --- print("\n--- Batch Commands (execute_batch) ---") commands = [ ("UPDATE users SET last_login = GETDATE() WHERE name = @P1", ["Alice Johnson"]), ("INSERT INTO user_logs (action, user_name) VALUES (@P1, @P2)", ["login", "Alice Johnson"]) ] affected_counts = await conn.execute_batch(commands) print(f"Updated {affected_counts[0]} users, inserted {affected_counts[1]} logs") asyncio.run(main_fetching())Tune the pool to fit your workload. Constructor signature:
from fastmssql import PoolConfig # PoolConfig(max_size=10, min_idle=2, max_lifetime_secs=None, idle_timeout_secs=None, connection_timeout_secs=30) config = PoolConfig( max_size=20, # max connections in pool min_idle=5, # keep at least this many idle max_lifetime_secs=3600, # recycle connections after 1h idle_timeout_secs=600, # close idle connections after 10m connection_timeout_secs=30 )Presets:
high = PoolConfig.high_throughput() # ~ max_size=50, min_idle=15 low = PoolConfig.low_resource() # ~ max_size=3, min_idle=1 dev = PoolConfig.development() # ~ max_size=5, min_idle=1 maxp = PoolConfig.maximum_performance() # ~ max_size=100, min_idle=30 ultra = PoolConfig.ultra_high_concurrency() # ~ max_size=200, min_idle=50Apply to a connection:
async with Connection(conn_str, pool_config=high) as conn: rows = (await conn.query("SELECT 1 AS ok")).rows()Default pool (if omitted): max_size=10, min_idle=2.
from fastmssql import SslConfig, EncryptionLevel, Connection ssl = SslConfig( encryption_level=EncryptionLevel.REQUIRED, # or "Required" trust_server_certificate=False, ) async with Connection(conn_str, ssl_config=ssl) as conn: ...Helpers:
SslConfig.development()– encrypt, trust all (dev only)SslConfig.with_ca_certificate(path)– use custom CASslConfig.login_only()/SslConfig.disabled()– legacy modes
For maximum throughput in highly concurrent scenarios, use multiple Connection instances (each with its own pool) and batch your work:
import asyncio from fastmssql import Connection, PoolConfig async def worker(conn_str, cfg): async with Connection(conn_str, pool_config=cfg) as conn: for _ in range(1000): _ = (await conn.query("SELECT 1 as v")).rows() async def main(): conn_str = "Server=.;Database=master;User Id=sa;Password=StrongPwd;" cfg = PoolConfig.high_throughput() await asyncio.gather(*[asyncio.create_task(worker(conn_str, cfg)) for _ in range(32)]) asyncio.run(main())- Examples:
examples/comprehensive_example.py - Benchmarks:
benchmarks/(MIT licensed)
- Import/build: ensure Rust toolchain and
maturinare installed if building from source - Connection: verify connection string; Windows auth not supported
- Timeouts: increase pool size or tune
connection_timeout_secs - Parameters: use
@P1, @P2, ...and pass a list of values
Contributions are welcome. Please open an issue or PR.
FastMSSQL is dual‑licensed:
- GPL‑3.0 (for open source projects)
- Commercial (for proprietary use). Contact: riverb514@gmail.com
See the LICENSE file for details.
examples/andbenchmarks/are under the MIT License. See files inlicenses/.
Built on excellent open source projects: Tiberius, PyO3, pyo3‑asyncio, bb8, tokio, serde, pytest, maturin, and more. See licenses/NOTICE.txt for the full list. The full texts of Apache‑2.0 and MIT are in licenses/.
Thanks to the maintainers of Tiberius, PyO3, pyo3‑asyncio, Tokio, pytest, maturin, and the broader open source community.