Skip to content

Commit e1d1570

Browse files
Merge conflicts
2 parents 4ff809f + adda712 commit e1d1570

File tree

9 files changed

+167
-0
lines changed

9 files changed

+167
-0
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,15 @@ public class AbfsConfiguration{
638638
DefaultValue = DEFAULT_FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT)
639639
private int tailLatencyMaxRetryCount;
640640

641+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY,
642+
DefaultValue = DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY)
643+
private boolean enablePrefetchRequestPriority;
644+
645+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE,
646+
MinValue = DEFAULT_FS_AZURE_STANDARD_REQUEST_PRIORITY_VALUE,
647+
DefaultValue = DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE)
648+
private int prefetchRequestPriorityValue;
649+
641650
private String clientProvidedEncryptionKey;
642651
private String clientProvidedEncryptionKeySHA;
643652

@@ -1355,6 +1364,14 @@ public boolean getIsCreateIdempotencyEnabled() {
13551364
return enableCreateIdempotency;
13561365
}
13571366

1367+
public boolean isEnablePrefetchRequestPriority() {
1368+
return enablePrefetchRequestPriority;
1369+
}
1370+
1371+
public String getPrefetchRequestPriorityValue() {
1372+
return Integer.toString(prefetchRequestPriorityValue);
1373+
}
1374+
13581375
/**
13591376
* Enum config to allow user to pick format of x-ms-client-request-id header
13601377
* @return tracingContextFormat config if valid, else default ALL_ID_FORMAT

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,18 @@ public final class ConfigurationKeys {
282282
*/
283283
public static final String FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING = "fs.azure.enable.readahead.v2.dynamic.scaling";
284284

285+
/**
286+
* Enable or disable request priority for prefetch requests
287+
* Value: {@value}.
288+
*/
289+
public static final String FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = "fs.azure.enable.prefetch.request.priority";
290+
291+
/**
292+
* Request priority value for prefetch requests
293+
* Value: {@value}.
294+
*/
295+
public static final String FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE = "fs.azure.prefetch.request.priority.value";
296+
285297
/**
286298
* Minimum number of prefetch threads in the thread pool for readahead V2.
287299
* {@value }

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,13 @@ public final class FileSystemConfigurations {
393393

394394
public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = true;
395395

396+
public static final boolean DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = true;
397+
398+
// The default traffic request priority is 3 (from service side)
399+
// The lowest priority a request can get is 7
400+
public static final int DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE = 7;
401+
public static final int DEFAULT_FS_AZURE_STANDARD_REQUEST_PRIORITY_VALUE = 3;
402+
396403
public static final boolean DEFAULT_FS_AZURE_ENABLE_TAIL_LATENCY_TRACKER = false;
397404
public static final boolean DEFAULT_FS_AZURE_ENABLE_TAIL_LATENCY_REQUEST_TIMEOUT = false;
398405
public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_PERCENTILE = 99;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public final class HttpHeaderConfigurations {
6969
public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
7070
public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
7171
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
72+
public static final String X_MS_REQUEST_PRIORITY = "x-ms-request-priority";
7273

7374
/**
7475
* Http Request Header for denoting the lease id of source in copy operation.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1335,6 +1335,9 @@ public AbfsRestOperation read(final String path,
13351335
requestHeaders.add(rangeHeader);
13361336
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
13371337

1338+
// Add request priority header for prefetch reads
1339+
addRequestPriorityForPrefetch(requestHeaders, tracingContext);
1340+
13381341
// Add request header to fetch MD5 Hash of data returned by server.
13391342
if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) {
13401343
requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE));

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
6363
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
6464
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
65+
import org.apache.hadoop.fs.azurebfs.constants.ReadType;
6566
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
6667
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
6768
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
@@ -139,6 +140,7 @@
139140
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT_CHARSET;
140141
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_MD5;
141142
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE;
143+
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_PRIORITY;
142144
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT;
143145
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_ALGORITHM;
144146
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;
@@ -1406,6 +1408,21 @@ protected void addCheckSumHeaderForWrite(List<AbfsHttpHeader> requestHeaders,
14061408
}
14071409
}
14081410

1411+
/**
1412+
* Add request priority header for prefetch read requests if enabled.
1413+
*
1414+
* @param requestHeaders to be updated with request priority header
1415+
* @param tracingContext tracing context to check read type
1416+
*/
1417+
protected void addRequestPriorityForPrefetch(List<AbfsHttpHeader> requestHeaders,
1418+
TracingContext tracingContext) {
1419+
if (getAbfsConfiguration().isEnablePrefetchRequestPriority()
1420+
&& ReadType.PREFETCH_READ.equals(tracingContext.getReadType())) {
1421+
requestHeaders.add(new AbfsHttpHeader(X_MS_REQUEST_PRIORITY,
1422+
getAbfsConfiguration().getPrefetchRequestPriorityValue()));
1423+
}
1424+
}
1425+
14091426
/**
14101427
* To verify the checksum information received from server for the data read.
14111428
* @param buffer stores the data received from server.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,6 +1072,10 @@ public AbfsRestOperation read(final String path,
10721072
}
10731073

10741074
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
1075+
1076+
// Add request priority header for prefetch reads
1077+
addRequestPriorityForPrefetch(requestHeaders, tracingContext);
1078+
10751079
// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
10761080
String sasTokenForReuse = appendSASTokenToQuery(path,
10771081
SASTokenProvider.READ_OPERATION,

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,15 @@ public void setReadType(ReadType readType) {
392392
}
393393
}
394394

395+
/**
396+
* Returns the read type for the current operation.
397+
*
398+
* @return the read type for the request.
399+
*/
400+
public ReadType getReadType() {
401+
return readType;
402+
}
403+
395404
/**
396405
* Sets the metric results string used for tracing or logging.
397406
* @param metricResults the formatted metric data to store.

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.io.IOException;
2222
import java.net.URI;
2323
import java.net.URISyntaxException;
24+
import java.net.URL;
25+
import java.util.ArrayList;
2426
import java.util.Arrays;
2527
import java.util.List;
2628
import java.util.Optional;
@@ -58,6 +60,8 @@
5860
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
5961
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
6062
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT;
63+
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY;
64+
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_PRIORITY;
6165
import static org.apache.hadoop.fs.azurebfs.constants.ReadType.DIRECT_READ;
6266
import static org.apache.hadoop.fs.azurebfs.constants.ReadType.FOOTER_READ;
6367
import static org.apache.hadoop.fs.azurebfs.constants.ReadType.MISSEDCACHE_READ;
@@ -66,8 +70,12 @@
6670
import static org.apache.hadoop.fs.azurebfs.constants.ReadType.SMALLFILE_READ;
6771
import static org.mockito.ArgumentMatchers.any;
6872
import static org.mockito.ArgumentMatchers.anyBoolean;
73+
import static org.mockito.ArgumentMatchers.anyInt;
74+
import static org.mockito.ArgumentMatchers.anyList;
6975
import static org.mockito.ArgumentMatchers.anyString;
7076
import static org.mockito.ArgumentMatchers.nullable;
77+
import static org.mockito.Mockito.doAnswer;
78+
import static org.mockito.Mockito.doNothing;
7179
import static org.mockito.Mockito.doReturn;
7280
import static org.mockito.Mockito.doThrow;
7381
import static org.mockito.Mockito.mock;
@@ -105,6 +113,7 @@ public class TestAbfsInputStream extends
105113
private static final int OPERATION_INDEX = 6;
106114
private static final int READTYPE_INDEX = 11;
107115

116+
108117
@AfterEach
109118
@Override
110119
public void teardown() throws Exception {
@@ -899,6 +908,94 @@ private void testReadTypeInTracingContextHeaderInternal(AzureBlobFileSystem fs,
899908
assertReadTypeInClientRequestId(fs, numOfReadCalls, totalReadCalls, readType);
900909
}
901910

911+
/*
912+
* Test to verify that both conditions of prefetch read and respective config
913+
* enabled needs to be true for the priority header to be added
914+
*/
915+
@Test
916+
public void testPrefetchReadAddsPriorityHeaderWithDifferentConfigs()
917+
throws Exception {
918+
Configuration configuration1 = new Configuration(getRawConfiguration());
919+
configuration1.set(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY, "true");
920+
921+
Configuration configuration2 = new Configuration(getRawConfiguration());
922+
//use the default value for the config: false
923+
configuration2.unset(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY);
924+
925+
TracingContext tracingContext1 = mock(TracingContext.class);
926+
when(tracingContext1.getReadType()).thenReturn(PREFETCH_READ);
927+
928+
//Prefetch Read with config enabled
929+
executePrefetchReadTest(tracingContext1, configuration1, true);
930+
//Prefetch Read with config disabled
931+
executePrefetchReadTest(tracingContext1, configuration2, false);
932+
933+
when(tracingContext1.getReadType()).thenReturn(DIRECT_READ);
934+
935+
//Non-prefetch read with config disabled
936+
executePrefetchReadTest(tracingContext1, configuration2, false);
937+
//Non-prefetch read with config enabled
938+
executePrefetchReadTest(tracingContext1, configuration1, false);
939+
}
940+
941+
/*
942+
* Helper method to execute read and verify if priority header is added or not as expected
943+
*/
944+
private void executePrefetchReadTest(TracingContext tracingContext,
945+
Configuration rawConfig,
946+
boolean shouldHaveHeader) throws Exception {
947+
try (AzureBlobFileSystem azureFs = (AzureBlobFileSystem) FileSystem.newInstance(
948+
rawConfig)) {
949+
AzureBlobFileSystemStore store = Mockito.spy(azureFs.getAbfsStore());
950+
951+
AbfsClient abfsClient = Mockito.spy(store.getClient());
952+
Mockito.doReturn(abfsClient).when(store).getClient();
953+
954+
List<AbfsHttpHeader> headersList = new ArrayList<>();
955+
956+
doAnswer(invocation -> {
957+
AbfsRestOperation realOp
958+
= (AbfsRestOperation) invocation.callRealMethod();
959+
AbfsRestOperation spiedOp = spy(realOp);
960+
961+
headersList.addAll(spiedOp.getRequestHeaders());
962+
963+
doNothing().when(spiedOp).execute(any(TracingContext.class));
964+
return spiedOp;
965+
})
966+
.when(abfsClient)
967+
.getAbfsRestOperation(
968+
any(AbfsRestOperationType.class),
969+
anyString(),
970+
any(URL.class),
971+
anyList(),
972+
any(byte[].class),
973+
anyInt(),
974+
anyInt(),
975+
nullable(String.class)
976+
);
977+
978+
abfsClient.read(
979+
"dummy-path", 0L, new byte[1], 0, 1,
980+
"etag", "leaseId", null, tracingContext);
981+
982+
AbfsConfiguration abfsConfig = store.getAbfsConfiguration();
983+
if (shouldHaveHeader) {
984+
assertThat(headersList)
985+
.anySatisfy(header -> {
986+
assertThat(header.getName()).isEqualTo(
987+
X_MS_REQUEST_PRIORITY);
988+
assertThat(header.getValue()).isEqualTo(
989+
abfsConfig.getPrefetchRequestPriorityValue());
990+
});
991+
} else {
992+
assertThat(headersList)
993+
.noneSatisfy(header -> assertThat(header.getName()).isEqualTo(
994+
X_MS_REQUEST_PRIORITY));
995+
}
996+
}
997+
}
998+
902999
private Path createTestFile(AzureBlobFileSystem fs, int fileSize) throws Exception {
9031000
Path testPath = new Path("testFile");
9041001
byte[] fileContent = getRandomBytesArray(fileSize);

0 commit comments

Comments
 (0)