@@ -36,7 +36,7 @@ import org.apache.hadoop.conf.Configuration
3636import org .apache .hadoop .fs .Path
3737
3838import org .apache .spark .paths .SparkPath
39- import org .apache .spark .sql .{ Column , DataFrame , Dataset , Encoder , SparkSession }
39+ import org .apache .spark .sql ._
4040import org .apache .spark .sql .catalyst .expressions .{AttributeReference , Expression , FileSourceMetadataAttribute , GenericInternalRow }
4141import org .apache .spark .sql .catalyst .plans .logical .{LogicalPlan , Project }
4242import org .apache .spark .sql .execution .datasources .{FileFormat , HadoopFsRelation , LogicalRelation }
@@ -47,11 +47,13 @@ import org.apache.spark.util.{SerializableConfiguration, Utils => SparkUtils}
4747
4848
4949/**
50- * Contains utility classes and method to delete rows in a table using the Deletion Vectors.
50+ * Contains utility classes and method for performing DML operations with Deletion Vectors.
5151 */
52- object DeleteWithDeletionVectorsHelper extends DeltaCommand {
52+ object DMLWithDeletionVectorsHelper extends DeltaCommand {
53+ val SUPPORTED_DML_COMMANDS : Seq [String ] = Seq (" DELETE" , " UPDATE" )
54+
5355 /**
54- * Creates a DataFrame that can be used to scan for rows matching DELETE condition in given
56+ * Creates a DataFrame that can be used to scan for rows matching the condition in the given
5557 * files. Generally the given file list is a pruned file list using the stats based pruning.
5658 */
5759 def createTargetDfForScanningForMatches (
@@ -114,8 +116,14 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand {
114116 deltaLog : DeltaLog ,
115117 targetDf : DataFrame ,
116118 fileIndex : TahoeFileIndex ,
117- condition : Expression ): Seq [TouchedFileWithDV ] = {
118- recordDeltaOperation(deltaLog, opType = " DELETE.findTouchedFiles" ) {
119+ condition : Expression ,
120+ opName : String ): Seq [TouchedFileWithDV ] = {
121+ require(
122+ SUPPORTED_DML_COMMANDS .contains(opName),
123+ s " Expecting opName to be one of ${SUPPORTED_DML_COMMANDS .mkString(" , " )}, " +
124+ s " but got ' $opName'. " )
125+
126+ recordDeltaOperation(deltaLog, opType = s " $opName.findTouchedFiles " ) {
119127 val candidateFiles = fileIndex match {
120128 case f : TahoeBatchFileIndex => f.addFiles
121129 case _ => throw new IllegalArgumentException (" Unexpected file index found!" )
@@ -165,7 +173,7 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand {
165173 spark : SparkSession ,
166174 touchedFiles : Seq [TouchedFileWithDV ],
167175 snapshot : Snapshot ): (Seq [FileAction ], Map [String , Long ]) = {
168- val numDeletedRows : Long = touchedFiles.map(_.numberOfModifiedRows).sum
176+ val numModifiedRows : Long = touchedFiles.map(_.numberOfModifiedRows).sum
169177 val numRemovedFiles : Long = touchedFiles.count(_.isFullyReplaced())
170178
171179 val (fullyRemovedFiles, notFullyRemovedFiles) = touchedFiles.partition(_.isFullyReplaced())
@@ -192,7 +200,7 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand {
192200 }
193201 numDeletionVectorsRemoved += fullyRemoved.count(_.deletionVector != null )
194202 val metricMap = Map (
195- " numDeletedRows " -> numDeletedRows ,
203+ " numModifiedRows " -> numModifiedRows ,
196204 " numRemovedFiles" -> numRemovedFiles,
197205 " numDeletionVectorsAdded" -> numDeletionVectorsAdded,
198206 " numDeletionVectorsRemoved" -> numDeletionVectorsRemoved,
@@ -485,8 +493,8 @@ object DeletionVectorData {
485493}
486494
487495/** Final output for each file containing the file path, DeletionVectorDescriptor and how many
488- * rows are marked as deleted in this file as part of the this DELETE (doesn't include already
489- * rows marked as deleted)
496+ * rows are marked as deleted in this file as part of the this operation (doesn't include rows that
497+ * are already marked as deleted).
490498 *
491499 * @param filePath Absolute path of the data file this DV result is generated for.
492500 * @param deletionVector Deletion vector generated containing the newly deleted row indices from
@@ -643,7 +651,7 @@ object DeletionVectorWriter extends DeltaLogging {
643651 }
644652
645653 /**
646- * Prepares a mapper function that can be used by DELETE command to store the Deletion Vectors
654+ * Prepares a mapper function that can be used by DML commands to store the Deletion Vectors
647655 * that are in described in [[DeletionVectorData ]] and return their descriptors
648656 * [[DeletionVectorResult ]].
649657 */
0 commit comments