@@ -110,56 +110,34 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
110110 }
111111
112112 override def putItems (columnSchema : ColumnSchema , items : Seq [InternalRow ])
113- (client : DynamoDB , rateLimiter : RateLimiter , delete : Boolean ): Unit = {
113+ (client : DynamoDB , rateLimiter : RateLimiter ): Unit = {
114114 // For each batch.
115115 val batchWriteItemSpec = new BatchWriteItemSpec ().withReturnConsumedCapacity(ReturnConsumedCapacity .TOTAL )
116-
117- val tableWriteItems = new TableWriteItems (tableName)
118- val tableWriteItemsWithItems : TableWriteItems = if (delete) {
119- // check if hash key only or also range key
120- columnSchema.keys() match {
121- case Left ((hashKey, hashKeyIndex, hashKeyType)) =>
122- val hashKeys = items.map(row =>
123- JavaConverter .convertRowValue(row, hashKeyIndex, hashKeyType).asInstanceOf [AnyRef ])
124- tableWriteItems.withHashOnlyKeysToDelete(hashKey, hashKeys : _* )
125-
126- case Right (((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))) =>
127- val alternatingHashAndRangeKeys = items.flatMap { case row =>
116+ batchWriteItemSpec.withTableWriteItems(new TableWriteItems (tableName).withItemsToPut(
117+ // Map the items.
118+ items.map(row => {
119+ val item = new Item ()
120+
121+ // Map primary key.
122+ columnSchema.keys() match {
123+ case Left ((hashKey, hashKeyIndex, hashKeyType)) =>
124+ item.withPrimaryKey(hashKey, JavaConverter .convertRowValue(row, hashKeyIndex, hashKeyType))
125+ case Right (((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))) =>
128126 val hashKeyValue = JavaConverter .convertRowValue(row, hashKeyIndex, hashKeyType)
129127 val rangeKeyValue = JavaConverter .convertRowValue(row, rangeKeyIndex, rangeKeyType)
130- Seq (hashKeyValue.asInstanceOf [AnyRef ], rangeKeyValue.asInstanceOf [AnyRef ])
131- }
132- tableWriteItems.withHashAndRangeKeysToDelete(hashKey, rangeKey, alternatingHashAndRangeKeys : _* )
133- }
134- } else {
135- // Map the items.
136- tableWriteItems.withItemsToPut(
137- items.map(row => {
138- val item = new Item ()
139-
140- // Map primary key.
141- columnSchema.keys() match {
142- case Left ((hashKey, hashKeyIndex, hashKeyType)) =>
143- item.withPrimaryKey(hashKey, JavaConverter .convertRowValue(row, hashKeyIndex, hashKeyType))
144- case Right (((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))) =>
145- val hashKeyValue = JavaConverter .convertRowValue(row, hashKeyIndex, hashKeyType)
146- val rangeKeyValue = JavaConverter .convertRowValue(row, rangeKeyIndex, rangeKeyType)
147- item.withPrimaryKey(hashKey, hashKeyValue, rangeKey, rangeKeyValue)
148- }
128+ item.withPrimaryKey(hashKey, hashKeyValue, rangeKey, rangeKeyValue)
129+ }
149130
150- // Map remaining columns.
151- columnSchema.attributes().foreach({
152- case (name, index, dataType) if ! row.isNullAt(index) =>
153- item.`with`(name, JavaConverter .convertRowValue(row, index, dataType))
154- case _ =>
155- })
131+ // Map remaining columns.
132+ columnSchema.attributes().foreach({
133+ case (name, index, dataType) if ! row.isNullAt(index) =>
134+ item.`with`(name, JavaConverter .convertRowValue(row, index, dataType))
135+ case _ =>
136+ })
156137
157- item
158- }): _*
159- )
160- }
161-
162- batchWriteItemSpec.withTableWriteItems(tableWriteItemsWithItems)
138+ item
139+ }): _*
140+ ))
163141
164142 val response = client.batchWriteItem(batchWriteItemSpec)
165143 handleBatchWriteResponse(client, rateLimiter)(response)
@@ -193,6 +171,35 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
193171 .foreach(cap => rateLimiter.acquire(cap.getCapacityUnits.toInt max 1 ))
194172 }
195173
174+ override def deleteItems (columnSchema : ColumnSchema , items : Seq [InternalRow ])
175+ (client : DynamoDB , rateLimiter : RateLimiter ): Unit = {
176+ // For each batch.
177+ val batchWriteItemSpec = new BatchWriteItemSpec ().withReturnConsumedCapacity(ReturnConsumedCapacity .TOTAL )
178+
179+ val tableWriteItems = new TableWriteItems (tableName)
180+ val tableWriteItemsWithItems : TableWriteItems =
181+ // check if hash key only or also range key
182+ columnSchema.keys() match {
183+ case Left ((hashKey, hashKeyIndex, hashKeyType)) =>
184+ val hashKeys = items.map(row =>
185+ JavaConverter .convertRowValue(row, hashKeyIndex, hashKeyType).asInstanceOf [AnyRef ])
186+ tableWriteItems.withHashOnlyKeysToDelete(hashKey, hashKeys : _* )
187+
188+ case Right (((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))) =>
189+ val alternatingHashAndRangeKeys = items.flatMap { case row =>
190+ val hashKeyValue = JavaConverter .convertRowValue(row, hashKeyIndex, hashKeyType)
191+ val rangeKeyValue = JavaConverter .convertRowValue(row, rangeKeyIndex, rangeKeyType)
192+ Seq (hashKeyValue.asInstanceOf [AnyRef ], rangeKeyValue.asInstanceOf [AnyRef ])
193+ }
194+ tableWriteItems.withHashAndRangeKeysToDelete(hashKey, rangeKey, alternatingHashAndRangeKeys : _* )
195+ }
196+
197+ batchWriteItemSpec.withTableWriteItems(tableWriteItemsWithItems)
198+
199+ val response = client.batchWriteItem(batchWriteItemSpec)
200+ handleBatchWriteResponse(client, rateLimiter)(response)
201+ }
202+
196203 @ tailrec
197204 private def handleBatchWriteResponse (client : DynamoDB , rateLimiter : RateLimiter )
198205 (response : BatchWriteItemOutcome ): Unit = {
0 commit comments