Plug-and-play implementation of an Apache Spark custom data source for AWS DynamoDB.
We published a small article about the project, check it out here: https://www.audienceproject.com/blog/tech/sparkdynamodb-using-aws-dynamodb-data-source-apache-spark/
UPDATE 2019-11-25: We are releasing version 1.0.0 of the Spark+DynamoDB connector, which is based on the Spark Data Source V2 API. Out-of-the-box throughput calculations, parallelism and partition planning should now be more reliable. We have also pulled out the external dependency on Guava, which was causing a lot of compatibility issues.
- Distributed, parallel scan with lazy evaluation
- Throughput control by rate limiting on target fraction of provisioned table/index capacity
- Schema discovery to suit your needs
- Dynamic inference
- Static analysis of case class
- Column and filter pushdown
- Global secondary index support
- Write support
The library is available from Maven Central. Add the dependency in SBT as "com.audienceproject" %% "spark-dynamodb" % "latest"
Spark is used in the library as a "provided" dependency, which means Spark has to be installed separately on the container where the application is running, such as is the case on AWS EMR.
import com.audienceproject.spark.dynamodb.implicits._ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() // Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference. val dynamoDf = spark.read.dynamodb("SomeTableName") // <-- DataFrame of Row objects with inferred schema. // Scan the table for the first 100 items (the order is arbitrary) and print them. dynamoDf.show(100) // write to some other table overwriting existing item with same keys dynamoDf.write.dynamodb("SomeOtherTable") // Case class representing the items in our table. import com.audienceproject.spark.dynamodb.attribute case class Vegetable (name: String, color: String, @attribute("weight_kg") weightKg: Double) // Load a Dataset[Vegetable]. Notice the @attribute annotation on the case class - we imagine the weight attribute is named with an underscore in DynamoDB. import org.apache.spark.sql.functions._ import spark.implicits._ val vegetableDs = spark.read.dynamodbAs[Vegetable]("VegeTable") val avgWeightByColor = vegetableDs.agg($"color", avg($"weightKg")) // The column is called 'weightKg' in the Dataset.# Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference. dynamoDf = spark.read.option("tableName", "SomeTableName") \ .format("dynamodb") \ .load() # <-- DataFrame of Row objects with inferred schema. # Scan the table for the first 100 items (the order is arbitrary) and print them. dynamoDf.show(100) # write to some other table overwriting existing item with same keys dynamoDf.write.option("tableName", "SomeOtherTable") \ .format("dynamodb") \ .save()Note: When running from pyspark shell, you can add the library as:
pyspark --packages com.audienceproject:spark-dynamodb_<spark-scala-version>:<version>The following parameters can be set as options on the Spark reader and writer object before loading/saving.
regionsets the region where the dynamodb table. Default is environment specific.roleArnsets an IAM role to assume. This allows for access to a DynamoDB in a different account than the Spark cluster. Defaults to the standard role configuration.
The following parameters can be set as options on the Spark reader object before loading.
readPartitionsnumber of partitions to split the initial RDD when loading the data into Spark. Defaults to the size of the DynamoDB table divided into chunks ofmaxPartitionBytesmaxPartitionBytesthe maximum size of a single input partition. Default 128 MBdefaultParallelismthe number of input partitions that can be read from DynamoDB simultaneously. Defaults tosparkContext.defaultParallelismtargetCapacityfraction of provisioned read capacity on the table (or index) to consume for reading. Default 1 (i.e. 100% capacity).stronglyConsistentReadswhether or not to use strongly consistent reads. Default false.bytesPerRCUnumber of bytes that can be read per second with a single Read Capacity Unit. Default 4000 (4 KB). This value is multiplied by two whenstronglyConsistentReads=falsefilterPushdownwhether or not to use filter pushdown to DynamoDB on scan requests. Default true.throughputthe desired read throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
The following parameters can be set as options on the Spark writer object before saving.
writeBatchSizenumber of items to send per call to DynamoDB BatchWriteItem. Default 25.targetCapacityfraction of provisioned write capacity on the table to consume for writing or updating. Default 1 (i.e. 100% capacity).updateif true items will be written using UpdateItem on keys rather than BatchWriteItem. Default false.throughputthe desired write throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
The unit tests are dependent on the AWS DynamoDBLocal client, which in turn is dependent on sqlite4java. I had some problems running this on OSX, so I had to put the library directly in the /lib folder, as graciously explained in this Stack Overflow answer.
In order to run the tests, make sure to put the following as additional VM parameters:
-Djava.library.path=./lib/sqlite4java -Daws.dynamodb.endpoint=http://localhost:8000
Usage of parallel scan and rate limiter inspired by work in https://github.com/traviscrawford/spark-dynamodb