Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Important
This feature is in Public Preview in the following regions: westus
, westus2
, eastus
, eastus2
, centralus
, southcentralus
, northeurope
, westeurope
, australiaeast
, brazilsouth
, canadacentral
, centralindia
, southeastasia
, uksouth
.
This page contains code examples that show you how to access your Lakebase database instance through Azure Databricks notebooks and run queries using Python and Scala.
The examples cover different connection strategies to suit different use cases:
- Single connection: Used for simple scripts where a single database connection is opened, used, and closed.
- Connection pool: Used for high-concurrency workloads, where a pool of reusable connections is maintained.
- Rotating M2M OAuth token: Uses short-lived, automatically refreshed OAuth tokens for authentication.
The following examples programmatically generate secure credentials. Avoid directly putting credentials in a notebook. Databricks recommends using one of the following secure methods:
- Store Postgres passwords in Azure Databricks secrets.
- Generate OAuth tokens using M2M OAuth.
Before you begin
Ensure you meet the following requirements before accessing your database instance:
- You have a corresponding Postgres role to log in to the database instance. See Manage Postgres roles.
- Your Postgres role is granted the necessary permissions to access the database, schema, or table.
- You can authenticate to the database instance. If you must manually obtain an OAuth token for your database instance, see Authenticate to a database instance.
Python
The Azure Databricks Python SDK can be used to obtain an OAuth token for a respective database instance.
Connect to your database instance from a Azure Databricks notebook using the following Python libraries:
psycopg2
psycopg3
SQLAlchemy
psycopg2
The code examples demonstrate a single connection and the use of a connection pool. For more on how to obtain the database instance and credentials programmatically, see how to obtain an OAuth token using the Python SDK.
Single connection
import psycopg2 from databricks.sdk import WorkspaceClient import uuid w = WorkspaceClient() instance_name = "<YOUR INSTANCE>" instance = w.database.get_database_instance(name=instance_name) cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name]) # Connection parameters conn = psycopg2.connect( host = instance.read_write_dns, dbname = "databricks_postgres", user = "<YOUR USER>", password = cred.token, sslmode = "require" ) # Execute query with conn.cursor() as cur: cur.execute("SELECT version()") version = cur.fetchone()[0] print(version) conn.close()
Connection pool
import psycopg2 from psycopg2 import sql, pool from pyspark.sql.functions import udf from databricks.sdk import WorkspaceClient import uuid w = WorkspaceClient() instance_name = "<YOUR INSTANCE>" instance = w.database.get_database_instance(name=instance_name) cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name]) # Create a connection pool connection_pool = psycopg2.pool.ThreadedConnectionPool( minconn = 1, # Minimum number of connections in the pool maxconn = 10, # Maximum number of connections in the pool user = "<YOUR USER>", password = cred.token, host = instance.read_write_dns, port = '5432', database = 'databricks_postgres' ) if connection_pool: print("Connection pool created successfully") def executeWithPgConnection(execFn): connection = None try: # Get a connection from the pool connection = connection_pool.getconn() if connection: print("Successfully received a connection from the pool") execFn(connection) finally: # Release the connection back to the pool if connection: connection_pool.putconn(connection) print("Connection returned to the pool") def printVersion(connection): cursor = connection.cursor() cursor.execute("SELECT version()") version = cursor.fetchone() print(f"Connected to PostgreSQL database. Version: {version}") executeWithPgConnection(printVersion)
psycopg3
The code example demonstrates the use of a connection pool with a rotating M2M OAuth. It uses generate_database_credential()
. For more on how to obtain the database instance and credentials programmatically, see how to obtain an OAuth token using the Python SDK.
%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient import uuid import psycopg import string from psycopg_pool import ConnectionPool w = WorkspaceClient() class CustomConnection(psycopg.Connection): global w def __init__(self, *args, **kwargs): # Call the parent class constructor super().__init__(*args, **kwargs) @classmethod def connect(cls, conninfo='', **kwargs): # Append the new password to kwargs cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name]) kwargs['password'] = cred.token # Call the superclass's connect method with updated kwargs return super().connect(conninfo, **kwargs) username = "<YOUR USER>" instance_name = "<YOUR INSTANCE>" instance = w.database.get_database_instance(name=instance_name) host = instance.read_write_dns port = 5432 database = "databricks_postgres" pool = ConnectionPool( conninfo=f"dbname={database} user={username} host={host}", connection_class=CustomConnection, min_size=1, max_size=10, open=True ) with pool.connection() as conn: with conn.cursor() as cursor: cursor.execute("SELECT version()") for record in cursor: print(record)
SQLAlchemy
The code examples demonstrate a single connection and the use of a connection pool with a rotating M2M OAuth token. For more on how to obtain the database instance and credentials programmatically, see how to obtain an OAuth token using the Python SDK.
Single connection
%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text from databricks.sdk import WorkspaceClient import uuid w = WorkspaceClient() instance_name = "<YOUR INSTANCE>" instance = w.database.get_database_instance(name=instance_name) cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name]) user = "<YOUR USER>" host = instance.read_write_dns port = 5432 database = "databricks_postgres" password = cred.token connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require") with connection_pool.connect() as conn: result = conn.execute(text("SELECT version()")) for row in result: print(f"Connected to PostgreSQL database. Version: {row}")
Connection pool & rotating M2M OAuth
%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient import uuid import time from sqlalchemy import create_engine, text, event w = WorkspaceClient() instance_name = "<YOUR INSTANCE>" instance = w.database.get_database_instance(name=instance_name) username = "<YOUR USER>" host = instance.read_write_dns port = 5432 database = "databricks_postgres" # sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes. connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}") postgres_password = None last_password_refresh = time.time() @event.listens_for(connection_pool, "do_connect") def provide_token(dialect, conn_rec, cargs, cparams): global postgres_password, last_password_refresh, host if postgres_password is None or time.time() - last_password_refresh > 900: print("Refreshing PostgreSQL OAuth token") cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name]) postgres_password = cred.token last_password_refresh = time.time() cparams["password"] = postgres_password with connection_pool.connect() as conn: result = conn.execute(text("SELECT version()")) for row in result: print(f"Connected to PostgreSQL database. Version: {row}")
Scala
The code examples show how to programmatically obtain the database instance and credentials, and how to connect to a database instance using a single connection or a connection pool.
Step 1: Use the Azure Databricks Java SDK to obtain an OAuth token
For details on how to obtain the database instance and credentials programmatically, see how to obtain an OAuth token using the Java SDK.
Step 2: Connect to a database instance
Single connection
import java.sql.{Connection, DriverManager, ResultSet, Statement} Class.forName("org.postgresql.Driver") val user = "<YOUR USER>" val host = instance.getName() val port = "5432" val database = "databricks_postgres" val password = cred.getToken() val url = f"jdbc:postgresql://${host}:${port}/${database}" val connection = DriverManager.getConnection(url, user, password) println("Connected to PostgreSQL database!") val statement = connection.createStatement() val resultSet = statement.executeQuery("SELECT version()") if (resultSet.next()) { val version = resultSet.getString(1) println(s"PostgreSQL version: $version") }
Connection pool
import com.zaxxer.hikari.{HikariConfig, HikariDataSource} import java.sql.Connection // Configure HikariCP val config = new HikariConfig() config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres") config.setUsername("<YOUR USER>") config.setPassword(cred.getToken()) config.setMaximumPoolSize(10) // Create a data source val dataSource = new HikariDataSource(config) // Function to get a connection and execute a query def runQuery(): Unit = { var connection: Connection = null try { // Get a connection from the pool connection = dataSource.getConnection() // Create a statement val statement = connection.createStatement() // Execute a query val resultSet = statement.executeQuery("SELECT version() AS v;") // Process the result set while (resultSet.next()) { val v = resultSet.getString("v") println(s"*******Connected to PostgreSQL database. Version: $v") } } catch { case e: Exception => e.printStackTrace() } finally { // Close the connection which returns it to the pool if (connection != null) connection.close() } } // Run the query runQuery() // Close the data source dataSource.close()