Skip to main content
Use the spark-pinecone connector to efficiently create, ingest, and update vector embeddings at scale with Databricks and Pinecone.

Install the Spark-Pinecone connector

  • Databricks platform
  • Databricks on AWS
  • Databricks on GCP / Azure
  1. Install the Spark-Pinecone connector as a library.
  2. Configure the library as follows:
    1. Select File path/S3 as the Library Source.
    2. Enter the S3 URI for the Pinecone assembly JAR file:
      s3://pinecone-jars/1.1.0/spark-pinecone-uberjar.jar  
      Databricks platform users must use the Pinecone assembly jar listed above to ensure that the proper dependecies are installed.
    3. Click Install.

Batch upsert

To batch upsert embeddings to Pinecone:
from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType  # Your API key and index name api_key = "PINECONE_API_KEY" index_name = "PINECONE_INDEX_NAME" source_tag = "PINECONE_SOURCE_TAG"  COMMON_SCHEMA = StructType([  StructField("id", StringType(), False),  StructField("namespace", StringType(), True),  StructField("values", ArrayType(FloatType(), False), False),  StructField("metadata", StringType(), True),  StructField("sparse_values", StructType([  StructField("indices", ArrayType(LongType(), False), False),  StructField("values", ArrayType(FloatType(), False), False)  ]), True) ])  # Initialize Spark spark = SparkSession.builder.getOrCreate()  # Read the file and apply the schema df = spark.read \  .option("multiLine", value = True) \  .option("mode", "PERMISSIVE") \  .schema(COMMON_SCHEMA) \  .json("src/test/resources/sample.jsonl")  # Show if the read was successful df.show()  # Write the dataFrame to Pinecone in batches  df.write \  .option("pinecone.apiKey", api_key) \  .option("pinecone.indexName", index_name) \  .option("pinecone.sourceTag", source_tag) \  .format("io.pinecone.spark.pinecone.Pinecone") \  .mode("append") \  .save() 
For a guide on how to set up batch upserts, refer to the Databricks integration page.

Stream upsert

To stream upsert embeddings to Pinecone:
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType import os  # Your API key and index name api_key = "PINECONE_API_KEY" index_name = "PINECONE_INDEX_NAME" source_tag = "PINECONE_SOURCE_TAG"  COMMON_SCHEMA = StructType([  StructField("id", StringType(), False),  StructField("namespace", StringType(), True),  StructField("values", ArrayType(FloatType(), False), False),  StructField("metadata", StringType(), True),  StructField("sparse_values", StructType([  StructField("indices", ArrayType(LongType(), False), False),  StructField("values", ArrayType(FloatType(), False), False)  ]), True) ])  # Initialize Spark session spark = SparkSession.builder \  .appName("StreamUpsertExample") \  .config("spark.sql.shuffle.partitions", 3) \  .master("local") \  .getOrCreate()  # Read the stream of JSON files, applying the schema from the input directory lines = spark.readStream \  .option("multiLine", True) \  .option("mode", "PERMISSIVE") \  .schema(COMMON_SCHEMA) \  .json("path/to/input/directory/")  # Write the stream to Pinecone using the defined options upsert = lines.writeStream \  .format("io.pinecone.spark.pinecone.Pinecone") \  .option("pinecone.apiKey", api_key) \  .option("pinecone.indexName", index_name) \  .option("pinecone.sourceTag", source_tag) \  .option("checkpointLocation", "path/to/checkpoint/dir") \  .outputMode("append") \  .start()  upsert.awaitTermination() 

Learn more