Skip to content

Commit 0a0ea97

Browse files
xupefeivkorukanti
authored andcommitted
[Spark][UPDATE with DV] Let UPDATE command write DVs
This is the first PR in [[Feature Request] Support UPDATE command with Deletion Vectors](#1923). This PR introduces a `UPDATE_USE_PERSISTENT_DELETION_VECTORS` config to enable/disable writing DVs for the UPDATE command. In short, rows being updated will be marked as `deleted` by DV, while updated rows will be written to a new file. When CDF is enabled, updated rows and CDC (`preimage` and `postimage`) will be written to the file. New, preliminary tests. Yes. When `UPDATE_USE_PERSISTENT_DELETION_VECTORS` is set to true, `UPDATE` command will not rewrite the whole file but write only the rows being updated. Closes #1942 Signed-off-by: Paddy Xu <xupaddy@gmail.com> GitOrigin-RevId: 3ad7c251bb064420d17cd1e685265e61845096a7
1 parent bbf19c3 commit 0a0ea97

File tree

10 files changed

+403
-86
lines changed

10 files changed

+403
-86
lines changed
Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.hadoop.conf.Configuration
3636
import org.apache.hadoop.fs.Path
3737

3838
import org.apache.spark.paths.SparkPath
39-
import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder, SparkSession}
39+
import org.apache.spark.sql._
4040
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, FileSourceMetadataAttribute, GenericInternalRow}
4141
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
4242
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation}
@@ -47,11 +47,13 @@ import org.apache.spark.util.{SerializableConfiguration, Utils => SparkUtils}
4747

4848

4949
/**
50-
* Contains utility classes and method to delete rows in a table using the Deletion Vectors.
50+
* Contains utility classes and method for performing DML operations with Deletion Vectors.
5151
*/
52-
object DeleteWithDeletionVectorsHelper extends DeltaCommand {
52+
object DMLWithDeletionVectorsHelper extends DeltaCommand {
53+
val SUPPORTED_DML_COMMANDS: Seq[String] = Seq("DELETE", "UPDATE")
54+
5355
/**
54-
* Creates a DataFrame that can be used to scan for rows matching DELETE condition in given
56+
* Creates a DataFrame that can be used to scan for rows matching the condition in the given
5557
* files. Generally the given file list is a pruned file list using the stats based pruning.
5658
*/
5759
def createTargetDfForScanningForMatches(
@@ -114,8 +116,14 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand {
114116
deltaLog: DeltaLog,
115117
targetDf: DataFrame,
116118
fileIndex: TahoeFileIndex,
117-
condition: Expression): Seq[TouchedFileWithDV] = {
118-
recordDeltaOperation(deltaLog, opType = "DELETE.findTouchedFiles") {
119+
condition: Expression,
120+
opName: String): Seq[TouchedFileWithDV] = {
121+
require(
122+
SUPPORTED_DML_COMMANDS.contains(opName),
123+
s"Expecting opName to be one of ${SUPPORTED_DML_COMMANDS.mkString(", ")}, " +
124+
s"but got '$opName'.")
125+
126+
recordDeltaOperation(deltaLog, opType = s"$opName.findTouchedFiles") {
119127
val candidateFiles = fileIndex match {
120128
case f: TahoeBatchFileIndex => f.addFiles
121129
case _ => throw new IllegalArgumentException("Unexpected file index found!")
@@ -165,7 +173,7 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand {
165173
spark: SparkSession,
166174
touchedFiles: Seq[TouchedFileWithDV],
167175
snapshot: Snapshot): (Seq[FileAction], Map[String, Long]) = {
168-
val numDeletedRows: Long = touchedFiles.map(_.numberOfModifiedRows).sum
176+
val numModifiedRows: Long = touchedFiles.map(_.numberOfModifiedRows).sum
169177
val numRemovedFiles: Long = touchedFiles.count(_.isFullyReplaced())
170178

171179
val (fullyRemovedFiles, notFullyRemovedFiles) = touchedFiles.partition(_.isFullyReplaced())
@@ -192,7 +200,7 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand {
192200
}
193201
numDeletionVectorsRemoved += fullyRemoved.count(_.deletionVector != null)
194202
val metricMap = Map(
195-
"numDeletedRows" -> numDeletedRows,
203+
"numModifiedRows" -> numModifiedRows,
196204
"numRemovedFiles" -> numRemovedFiles,
197205
"numDeletionVectorsAdded" -> numDeletionVectorsAdded,
198206
"numDeletionVectorsRemoved" -> numDeletionVectorsRemoved,
@@ -485,8 +493,8 @@ object DeletionVectorData {
485493
}
486494

487495
/** Final output for each file containing the file path, DeletionVectorDescriptor and how many
488-
* rows are marked as deleted in this file as part of the this DELETE (doesn't include already
489-
* rows marked as deleted)
496+
* rows are marked as deleted in this file as part of the this operation (doesn't include rows that
497+
* are already marked as deleted).
490498
*
491499
* @param filePath Absolute path of the data file this DV result is generated for.
492500
* @param deletionVector Deletion vector generated containing the newly deleted row indices from
@@ -643,7 +651,7 @@ object DeletionVectorWriter extends DeltaLogging {
643651
}
644652

645653
/**
646-
* Prepares a mapper function that can be used by DELETE command to store the Deletion Vectors
654+
* Prepares a mapper function that can be used by DML commands to store the Deletion Vectors
647655
* that are in described in [[DeletionVectorData]] and return their descriptors
648656
* [[DeletionVectorResult]].
649657
*/

spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.apache.spark.sql.delta.commands
1818

19+
import java.util.concurrent.TimeUnit
20+
1921
import org.apache.spark.sql.delta.metric.IncrementMetric
2022
import org.apache.spark.sql.delta._
2123
import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, FileAction}
@@ -253,7 +255,7 @@ case class DeleteCommand(
253255
val fileIndex = new TahoeBatchFileIndex(
254256
sparkSession, "delete", candidateFiles, deltaLog, deltaLog.dataPath, txn.snapshot)
255257
if (shouldWriteDVs) {
256-
val targetDf = DeleteWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
258+
val targetDf = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
257259
sparkSession,
258260
target,
259261
fileIndex)
@@ -262,21 +264,22 @@ case class DeleteCommand(
262264
// with deletion vectors.
263265
val mustReadDeletionVectors = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot)
264266

265-
val touchedFiles = DeleteWithDeletionVectorsHelper.findTouchedFiles(
267+
val touchedFiles = DMLWithDeletionVectorsHelper.findTouchedFiles(
266268
sparkSession,
267269
txn,
268270
mustReadDeletionVectors,
269271
deltaLog,
270272
targetDf,
271273
fileIndex,
272-
cond)
274+
cond,
275+
opName = "DELETE")
273276

274277
if (touchedFiles.nonEmpty) {
275-
val (actions, metricMap) = DeleteWithDeletionVectorsHelper.processUnmodifiedData(
278+
val (actions, metricMap) = DMLWithDeletionVectorsHelper.processUnmodifiedData(
276279
sparkSession,
277280
touchedFiles,
278281
txn.snapshot)
279-
metrics("numDeletedRows").set(metricMap("numDeletedRows"))
282+
metrics("numDeletedRows").set(metricMap("numModifiedRows"))
280283
numDeletionVectorsAdded = metricMap("numDeletionVectorsAdded")
281284
numDeletionVectorsRemoved = metricMap("numDeletionVectorsRemoved")
282285
numDeletionVectorsUpdated = metricMap("numDeletionVectorsUpdated")
@@ -342,7 +345,8 @@ case class DeleteCommand(
342345
}
343346
numAddedChangeFiles = changeFiles.size
344347
changeFileBytes = changeFiles.collect { case f: AddCDCFile => f.size }.sum
345-
rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs
348+
rewriteTimeMs =
349+
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) - scanTimeMs
346350
numDeletedRows = Some(metrics("numDeletedRows").value)
347351
numCopiedRows =
348352
Some(metrics("numTouchedRows").value - metrics("numDeletedRows").value)

0 commit comments

Comments
 (0)