from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, StringType, LongType import os  # Your API key and index name api_key = "PINECONE_API_KEY" index_name = "PINECONE_INDEX_NAME" source_tag = "PINECONE_SOURCE_TAG"  COMMON_SCHEMA = StructType([  StructField("id", StringType(), False),  StructField("namespace", StringType(), True),  StructField("values", ArrayType(FloatType(), False), False),  StructField("metadata", StringType(), True),  StructField("sparse_values", StructType([  StructField("indices", ArrayType(LongType(), False), False),  StructField("values", ArrayType(FloatType(), False), False)  ]), True) ])  # Initialize Spark session spark = SparkSession.builder \  .appName("StreamUpsertExample") \  .config("spark.sql.shuffle.partitions", 3) \  .master("local") \  .getOrCreate()  # Read the stream of JSON files, applying the schema from the input directory lines = spark.readStream \  .option("multiLine", True) \  .option("mode", "PERMISSIVE") \  .schema(COMMON_SCHEMA) \  .json("path/to/input/directory/")  # Write the stream to Pinecone using the defined options upsert = lines.writeStream \  .format("io.pinecone.spark.pinecone.Pinecone") \  .option("pinecone.apiKey", api_key) \  .option("pinecone.indexName", index_name) \  .option("pinecone.sourceTag", source_tag) \  .option("checkpointLocation", "path/to/checkpoint/dir") \  .outputMode("append") \  .start()  upsert.awaitTermination()