Skip to content

Commit 225639b

Browse files
fix merge
2 parents ffc6bff + 32fd9d1 commit 225639b

16 files changed

+1283
-1163
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,10 @@ public class AbfsConfiguration{
403403
FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION)
404404
private boolean isChecksumValidationEnabled;
405405

406+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
407+
FS_AZURE_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_FULL_BLOB_ABFS_CHECKSUM_VALIDATION)
408+
private boolean isFullBlobChecksumValidationEnabled;
409+
406410
@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
407411
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE)
408412
private boolean isPaginatedDeleteEnabled;
@@ -1629,6 +1633,10 @@ public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled)
16291633
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
16301634
}
16311635

1636+
public boolean isFullBlobChecksumValidationEnabled() {
1637+
return isFullBlobChecksumValidationEnabled;
1638+
}
1639+
16321640
public long getBlobCopyProgressPollWaitMillis() {
16331641
return blobCopyProgressPollWaitMillis;
16341642
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,9 @@ public final class ConfigurationKeys {
320320
/** Add extra layer of verification of the integrity of the request content during transport: {@value}. */
321321
public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION = "fs.azure.enable.checksum.validation";
322322

323+
/** Add extra layer of verification of the integrity of the full blob request content during transport: {@value}. */
324+
public static final String FS_AZURE_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION = "fs.azure.enable.full.blob.checksum.validation";
325+
323326
public static String accountProperty(String property, String account) {
324327
return property + DOT + account;
325328
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ public final class FileSystemConfigurations {
139139
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
140140
public static final boolean DEFAULT_ENABLE_PAGINATED_DELETE = false;
141141
public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION = false;
142+
public static final boolean DEFAULT_ENABLE_FULL_BLOB_ABFS_CHECKSUM_VALIDATION = false;
142143

143144
/**
144145
* Limit of queued block upload operations before writes

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,9 +1100,10 @@ public AbfsRestOperation flush(byte[] buffer,
11001100
if (leaseId != null) {
11011101
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
11021102
}
1103-
if (blobMd5 != null) {
1104-
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5));
1105-
}
1103+
String md5Value = (isFullBlobChecksumValidationEnabled() && blobMd5 != null)
1104+
? blobMd5
1105+
: computeMD5Hash(buffer, 0, buffer.length);
1106+
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, md5Value));
11061107
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
11071108
abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST);
11081109
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
@@ -1127,7 +1128,12 @@ public AbfsRestOperation flush(byte[] buffer,
11271128
AbfsRestOperation op1 = getPathStatus(path, true, tracingContext,
11281129
contextEncryptionAdapter);
11291130
String metadataMd5 = op1.getResult().getResponseHeader(CONTENT_MD5);
1130-
if (blobMd5 != null && !blobMd5.equals(metadataMd5)) {
1131+
/*
1132+
* Validate the response by comparing the server's MD5 metadata against either:
1133+
* 1. The full blob content MD5 (if full blob checksum validation is enabled), or
1134+
* 2. The full block ID list buffer MD5 (fallback if blob checksum validation is disabled)
1135+
*/
1136+
if (md5Value != null && !md5Value.equals(metadataMd5)) {
11311137
throw ex;
11321138
}
11331139
return op;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1419,14 +1419,25 @@ protected boolean isChecksumValidationEnabled(List<AbfsHttpHeader> requestHeader
14191419
/**
14201420
* Conditions check for allowing checksum support for write operation.
14211421
* Server will support this if client sends the MD5 Hash as a request header.
1422-
* For azure stoage service documentation and more details refer to
1422+
* For azure storage service documentation and more details refer to
14231423
* <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">Path - Update Azure Rest API</a>.
14241424
* @return true if checksum validation enabled.
14251425
*/
14261426
protected boolean isChecksumValidationEnabled() {
14271427
return getAbfsConfiguration().getIsChecksumValidationEnabled();
14281428
}
14291429

1430+
/**
1431+
* Conditions check for allowing checksum support for write operation.
1432+
* Server will support this if client sends the MD5 Hash as a request header.
1433+
* For azure storage service documentation and more details refer to
1434+
* <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">Path - Update Azure Rest API</a>.
1435+
* @return true if full blob checksum validation enabled.
1436+
*/
1437+
protected boolean isFullBlobChecksumValidationEnabled() {
1438+
return getAbfsConfiguration().isFullBlobChecksumValidationEnabled();
1439+
}
1440+
14301441
/**
14311442
* Compute MD5Hash of the given byte array starting from given offset up to given length.
14321443
* @param data byte array from which data is fetched to compute MD5 Hash.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -867,10 +867,9 @@ public AbfsRestOperation flush(final String path,
867867
if (leaseId != null) {
868868
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
869869
}
870-
if (isChecksumValidationEnabled() && blobMd5 != null) {
870+
if (isFullBlobChecksumValidationEnabled() && blobMd5 != null) {
871871
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5));
872872
}
873-
874873
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
875874
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
876875
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
223223
md5 = MessageDigest.getInstance(MD5);
224224
fullBlobContentMd5 = MessageDigest.getInstance(MD5);
225225
} catch (NoSuchAlgorithmException e) {
226-
if (client.isChecksumValidationEnabled()) {
226+
if (isChecksumValidationEnabled()) {
227227
throw new IOException("MD5 algorithm not available", e);
228228
}
229229
}
@@ -464,10 +464,13 @@ public synchronized void write(final byte[] data, final int off, final int lengt
464464
AbfsBlock block = createBlockIfNeeded(position);
465465
int written = bufferData(block, data, off, length);
466466
// Update the incremental MD5 hash with the written data.
467-
getMessageDigest().update(data, off, written);
468-
467+
if (isChecksumValidationEnabled()) {
468+
getMessageDigest().update(data, off, written);
469+
}
469470
// Update the full blob MD5 hash with the written data.
470-
getFullBlobContentMd5().update(data, off, written);
471+
if (isFullBlobChecksumValidationEnabled()) {
472+
getFullBlobContentMd5().update(data, off, written);
473+
}
471474
int remainingCapacity = block.remainingCapacity();
472475

473476
if (written < length) {
@@ -544,7 +547,7 @@ private void uploadBlockAsync(AbfsBlock blockToUpload,
544547
outputStreamStatistics.bytesToUpload(bytesLength);
545548
outputStreamStatistics.writeCurrentBuffer();
546549
DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload();
547-
String md5Hash = getMd5();
550+
String md5Hash = getClient().isChecksumValidationEnabled() ? getMd5() : null;
548551
final Future<Void> job =
549552
executorService.submit(() -> {
550553
AbfsPerfTracker tracker =
@@ -1222,6 +1225,20 @@ public MessageDigest getFullBlobContentMd5() {
12221225
return fullBlobContentMd5;
12231226
}
12241227

1228+
/**
1229+
* @return true if checksum validation is enabled.
1230+
*/
1231+
public boolean isChecksumValidationEnabled() {
1232+
return getClient().isChecksumValidationEnabled();
1233+
}
1234+
1235+
/**
1236+
* @return true if full blob checksum validation is enabled.
1237+
*/
1238+
public boolean isFullBlobChecksumValidationEnabled() {
1239+
return getClient().isFullBlobChecksumValidationEnabled();
1240+
}
1241+
12251242
/**
12261243
* Returns the Base64-encoded MD5 checksum based on the current digest state.
12271244
* This finalizes the digest calculation. Returns null if the digest is empty.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ protected synchronized AbfsBlock createBlockInternal(long position)
9595
setBlockCount(getBlockCount() + 1);
9696
AbfsBlock activeBlock = new AbfsBlobBlock(getAbfsOutputStream(), position, getBlockIdLength(), getBlockCount());
9797
activeBlock.setBlockEntry(addNewEntry(activeBlock.getBlockId(), activeBlock.getOffset()));
98-
getAbfsOutputStream().getMessageDigest().reset();
98+
if (getAbfsOutputStream().isChecksumValidationEnabled()) {
99+
getAbfsOutputStream().getMessageDigest().reset();
100+
}
99101
setActiveBlock(activeBlock);
100102
}
101103
return getActiveBlock();

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,10 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
180180
tracingContextFlush.setIngressHandler(BLOB_FLUSH);
181181
tracingContextFlush.setPosition(String.valueOf(offset));
182182
LOG.trace("Flushing data at offset {} for path {}", offset, getAbfsOutputStream().getPath());
183-
String fullBlobMd5 = computeFullBlobMd5();
183+
String fullBlobMd5 = null;
184+
if (getClient().isFullBlobChecksumValidationEnabled()) {
185+
fullBlobMd5 = computeFullBlobMd5();
186+
}
184187
op = getClient().flush(blockListXml.getBytes(StandardCharsets.UTF_8),
185188
getAbfsOutputStream().getPath(),
186189
isClose, getAbfsOutputStream().getCachedSasTokenString(), leaseId,
@@ -194,7 +197,9 @@ isClose, getAbfsOutputStream().getCachedSasTokenString(), leaseId,
194197
LOG.error("Error in remote flush for path {} and offset {}", getAbfsOutputStream().getPath(), offset, ex);
195198
throw ex;
196199
} finally {
197-
getAbfsOutputStream().getFullBlobContentMd5().reset();
200+
if (getClient().isFullBlobChecksumValidationEnabled()) {
201+
getAbfsOutputStream().getFullBlobContentMd5().reset();
202+
}
198203
}
199204
return op;
200205
}
@@ -221,7 +226,7 @@ protected AbfsRestOperation remoteAppendBlobWrite(String path,
221226
AppendRequestParameters reqParams,
222227
TracingContext tracingContext) throws IOException {
223228
// Perform the remote append operation using the blob client.
224-
AbfsRestOperation op = null;
229+
AbfsRestOperation op;
225230
try {
226231
op = blobClient.appendBlock(path, reqParams, uploadData.toByteArray(), tracingContext);
227232
} catch (AbfsRestOperationException ex) {

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ protected synchronized AbfsBlock createBlockInternal(long position)
6262
if (getActiveBlock() == null) {
6363
setBlockCount(getBlockCount() + 1);
6464
AbfsBlock activeBlock = new AbfsBlock(getAbfsOutputStream(), position);
65-
getAbfsOutputStream().getMessageDigest().reset();
65+
if (getAbfsOutputStream().isChecksumValidationEnabled()) {
66+
getAbfsOutputStream().getMessageDigest().reset();
67+
}
6668
setActiveBlock(activeBlock);
6769
}
6870
return getActiveBlock();

0 commit comments

Comments
 (0)