Skip to content

Commit c5d4375

Browse files
HADOOP-19658_3.4. ABFS:Create and rename idempotency for FNS Blob (#7914) (#7929)
Contributed by Anmol Asrani
1 parent 32fd9d1 commit c5d4375

File tree

10 files changed

+504
-132
lines changed

10 files changed

+504
-132
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,10 @@ public class AbfsConfiguration{
457457
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID)
458458
private boolean enableClientTransactionId;
459459

460+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY,
461+
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY)
462+
private boolean enableCreateIdempotency;
463+
460464
private String clientProvidedEncryptionKey;
461465
private String clientProvidedEncryptionKeySHA;
462466

@@ -1005,6 +1009,12 @@ public String getAzureAtomicRenameDirs() {
10051009
}
10061010

10071011
public boolean isConditionalCreateOverwriteEnabled() {
1012+
// If either the configured FS service type or the ingress service type is BLOB,
1013+
// conditional create-overwrite is not used.
1014+
if (getIsCreateIdempotencyEnabled() && (getFsConfiguredServiceType() == AbfsServiceType.BLOB
1015+
|| getIngressServiceType() == AbfsServiceType.BLOB)) {
1016+
return false;
1017+
}
10081018
return this.enableConditionalCreateOverwrite;
10091019
}
10101020

@@ -1136,6 +1146,10 @@ public boolean getIsClientTransactionIdEnabled() {
11361146
return enableClientTransactionId;
11371147
}
11381148

1149+
public boolean getIsCreateIdempotencyEnabled() {
1150+
return enableCreateIdempotency;
1151+
}
1152+
11391153
/**
11401154
* Enum config to allow user to pick format of x-ms-client-request-id header
11411155
* @return tracingContextFormat config if valid, else default ALL_ID_FORMAT

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,8 @@ public static String containerProperty(String property, String fsName, String ac
394394
public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = "fs.azure.blob.dir.delete.max.thread";
395395
/**Flag to enable/disable sending client transactional ID during create/rename operations: {@value}*/
396396
public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = "fs.azure.enable.client.transaction.id";
397+
/**Flag to enable/disable create idempotency during create operation: {@value}*/
398+
public static final String FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = "fs.azure.enable.create.blob.idempotency";
397399

398400
private ConfigurationKeys() {}
399401
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,5 +232,7 @@ public final class FileSystemConfigurations {
232232

233233
public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = true;
234234

235+
public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = true;
236+
235237
private FileSystemConfigurations() {}
236238
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -509,9 +509,34 @@ public AbfsRestOperation createPath(final String path,
509509
final TracingContext tracingContext) throws AzureBlobFileSystemException {
510510
AbfsRestOperation op;
511511
if (isFileCreation) {
512-
// Create a file with the specified parameters
513-
op = createFile(path, overwrite, permissions, isAppendBlob, eTag,
514-
contextEncryptionAdapter, tracingContext);
512+
if (getAbfsConfiguration().getIsCreateIdempotencyEnabled()) {
513+
AbfsRestOperation statusOp = null;
514+
try {
515+
// Check if the file already exists by calling GetPathStatus
516+
statusOp = getPathStatus(path, tracingContext, null, false);
517+
} catch (AbfsRestOperationException ex) {
518+
// If the path does not exist, continue with file creation
519+
// For other errors, rethrow the exception
520+
if (ex.getStatusCode() != HTTP_NOT_FOUND) {
521+
throw ex;
522+
}
523+
}
524+
// If the file exists and overwrite is not allowed, throw conflict
525+
if (statusOp != null && statusOp.hasResult() && !overwrite) {
526+
throw new AbfsRestOperationException(
527+
HTTP_CONFLICT,
528+
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
529+
PATH_EXISTS,
530+
null);
531+
} else {
532+
// Proceed with file creation (force overwrite = true)
533+
op = createFile(path, true, permissions, isAppendBlob, eTag,
534+
contextEncryptionAdapter, tracingContext);
535+
}
536+
} else {
537+
op = createFile(path, overwrite, permissions, isAppendBlob, eTag,
538+
contextEncryptionAdapter, tracingContext);
539+
}
515540
} else {
516541
// Create a directory with the specified parameters
517542
op = createDirectory(path, permissions, isAppendBlob, eTag,
@@ -584,7 +609,6 @@ public AbfsRestOperation createPathRestOp(final String path,
584609
if (eTag != null && !eTag.isEmpty()) {
585610
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
586611
}
587-
588612
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
589613
final AbfsRestOperation op = getAbfsRestOperation(
590614
AbfsRestOperationType.PutBlob,

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
package org.apache.hadoop.fs.azurebfs.services;
2020

21+
import java.util.Collections;
2122
import java.util.List;
2223
import java.util.Map;
24+
import java.util.stream.Collectors;
2325

2426
import org.slf4j.Logger;
2527
import org.slf4j.LoggerFactory;
@@ -54,7 +56,15 @@ public static void dumpHeadersToDebugLog(final String origin,
5456
if (key == null) {
5557
key = "HTTP Response";
5658
}
57-
String values = StringUtils.join(";", entry.getValue());
59+
List<String> valuesList = entry.getValue();
60+
if (valuesList == null) {
61+
valuesList = Collections.emptyList();
62+
} else {
63+
valuesList = valuesList.stream()
64+
.map(v -> v == null ? "" : v) // replace null with empty string
65+
.collect(Collectors.toList());
66+
}
67+
String values = StringUtils.join(";", valuesList);
5868
if (key.contains("Cookie")) {
5969
values = "*cookie info*";
6070
}

0 commit comments

Comments
 (0)