|  | 
| 20 | 20 | import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.STREAMING_ECDSA_SIGNED_PAYLOAD; | 
| 21 | 21 | import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER; | 
| 22 | 22 | import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.STREAMING_UNSIGNED_PAYLOAD_TRAILER; | 
|  | 23 | +import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.X_AMZ_DECODED_CONTENT_LENGTH; | 
| 23 | 24 | import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.X_AMZ_TRAILER; | 
|  | 25 | +import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerUtils.computeAndMoveContentLength; | 
| 24 | 26 | 
 | 
| 25 | 27 | import java.io.InputStream; | 
|  | 28 | +import java.nio.ByteBuffer; | 
| 26 | 29 | import java.nio.charset.StandardCharsets; | 
| 27 | 30 | import java.util.ArrayList; | 
| 28 | 31 | import java.util.Collections; | 
| 29 | 32 | import java.util.List; | 
|  | 33 | +import java.util.Optional; | 
|  | 34 | +import java.util.concurrent.CompletableFuture; | 
|  | 35 | +import org.reactivestreams.Publisher; | 
| 30 | 36 | import software.amazon.awssdk.annotations.SdkInternalApi; | 
| 31 | 37 | import software.amazon.awssdk.checksums.SdkChecksum; | 
| 32 | 38 | import software.amazon.awssdk.checksums.spi.ChecksumAlgorithm; | 
|  | 
| 35 | 41 | import software.amazon.awssdk.http.SdkHttpRequest; | 
| 36 | 42 | import software.amazon.awssdk.http.auth.aws.internal.signer.CredentialScope; | 
| 37 | 43 | import software.amazon.awssdk.http.auth.aws.internal.signer.NoOpPayloadChecksumStore; | 
|  | 44 | +import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.AsyncChunkEncodedPayload; | 
| 38 | 45 | import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChecksumTrailerProvider; | 
| 39 | 46 | import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedInputStream; | 
|  | 47 | +import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedPayload; | 
|  | 48 | +import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedPublisher; | 
| 40 | 49 | import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.TrailerProvider; | 
| 41 | 50 | import software.amazon.awssdk.http.auth.aws.internal.signer.io.ChecksumInputStream; | 
| 42 | 51 | import software.amazon.awssdk.http.auth.aws.internal.signer.io.ResettableContentStreamProvider; | 
| @@ -112,10 +121,66 @@ public ContentStreamProvider sign(ContentStreamProvider payload, V4aRequestSigni | 
| 112 | 121 |  return new ResettableContentStreamProvider(chunkedEncodedInputStreamBuilder::build); | 
| 113 | 122 |  } | 
| 114 | 123 | 
 | 
|  | 124 | + /** | 
|  | 125 | + * Given a payload and result of request signing, sign the payload via the SigV4 process. | 
|  | 126 | + */ | 
|  | 127 | + @Override | 
|  | 128 | + public Publisher<ByteBuffer> signAsync(Publisher<ByteBuffer> payload, V4aRequestSigningResult requestSigningResult) { | 
|  | 129 | + ChunkedEncodedPublisher.Builder chunkedStreamBuilder = ChunkedEncodedPublisher.builder() | 
|  | 130 | + .publisher(payload) | 
|  | 131 | + .chunkSize(chunkSize) | 
|  | 132 | + .addEmptyTrailingChunk(true); | 
|  | 133 | + AsyncChunkEncodedPayload chunkedPayload = new AsyncChunkEncodedPayload(chunkedStreamBuilder); | 
|  | 134 | + | 
|  | 135 | + signCommon(chunkedPayload, requestSigningResult); | 
|  | 136 | + | 
|  | 137 | + return chunkedStreamBuilder.build(); | 
|  | 138 | + } | 
|  | 139 | + | 
|  | 140 | + private ChunkedEncodedPayload signCommon(ChunkedEncodedPayload payload, V4aRequestSigningResult requestSigningResult) { | 
|  | 141 | + SdkHttpRequest.Builder request = requestSigningResult.getSignedRequest(); | 
|  | 142 | + | 
|  | 143 | + payload.decodedContentLength(request.firstMatchingHeader(X_AMZ_DECODED_CONTENT_LENGTH) | 
|  | 144 | + .map(Long::parseLong) | 
|  | 145 | + .orElseThrow(() -> { | 
|  | 146 | + String msg = String.format("Expected header '%s' to be present", | 
|  | 147 | + X_AMZ_DECODED_CONTENT_LENGTH); | 
|  | 148 | + return new RuntimeException(msg); | 
|  | 149 | + })); | 
|  | 150 | + | 
|  | 151 | + preExistingTrailers.forEach(trailer -> payload.addTrailer(() -> trailer)); | 
|  | 152 | + | 
|  | 153 | + switch (requestSigningResult.getSigningConfig().getSignedBodyValue()) { | 
|  | 154 | + case STREAMING_ECDSA_SIGNED_PAYLOAD: { | 
|  | 155 | + RollingSigner rollingSigner = new RollingSigner(requestSigningResult.getSignature(), | 
|  | 156 | + requestSigningResult.getSigningConfig()); | 
|  | 157 | + payload.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope)); | 
|  | 158 | + break; | 
|  | 159 | + } | 
|  | 160 | + case STREAMING_UNSIGNED_PAYLOAD_TRAILER: | 
|  | 161 | + setupChecksumTrailerIfNeeded(payload); | 
|  | 162 | + break; | 
|  | 163 | + case STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER: { | 
|  | 164 | + RollingSigner rollingSigner = new RollingSigner(requestSigningResult.getSignature(), | 
|  | 165 | + requestSigningResult.getSigningConfig()); | 
|  | 166 | + payload.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope)); | 
|  | 167 | + setupChecksumTrailerIfNeeded(payload); | 
|  | 168 | + payload.addTrailer( | 
|  | 169 | + new SigV4aTrailerProvider(payload.trailers(), rollingSigner, credentialScope) | 
|  | 170 | + ); | 
|  | 171 | + break; | 
|  | 172 | + } | 
|  | 173 | + default: | 
|  | 174 | + throw new UnsupportedOperationException(); | 
|  | 175 | + } | 
|  | 176 | + | 
|  | 177 | + return payload; | 
|  | 178 | + } | 
|  | 179 | + | 
| 115 | 180 |  @Override | 
| 116 | 181 |  public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider payload, String checksum) { | 
| 117 | 182 |  long encodedContentLength = 0; | 
| 118 |  | - long contentLength = SignerUtils.computeAndMoveContentLength(request, payload); | 
|  | 183 | + long contentLength = computeAndMoveContentLength(request, payload); | 
| 119 | 184 |  setupPreExistingTrailers(request); | 
| 120 | 185 | 
 | 
| 121 | 186 |  // pre-existing trailers | 
| @@ -157,6 +222,72 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider | 
| 157 | 222 |  // CRT-signed request doesn't expect 'aws-chunked' Content-Encoding, so we don't add it | 
| 158 | 223 |  } | 
| 159 | 224 | 
 | 
|  | 225 | + @Override | 
|  | 226 | + public CompletableFuture<Pair<SdkHttpRequest.Builder, Optional<Publisher<ByteBuffer>>>> beforeSigningAsync( | 
|  | 227 | + SdkHttpRequest.Builder request, Publisher<ByteBuffer> payload, String checksum) { | 
|  | 228 | + | 
|  | 229 | + return SignerUtils.moveContentLength(request, payload) | 
|  | 230 | + .thenApply(p -> { | 
|  | 231 | + SdkHttpRequest.Builder requestBuilder = p.left(); | 
|  | 232 | + setupPreExistingTrailers(requestBuilder); | 
|  | 233 | + | 
|  | 234 | + long decodedContentLength = | 
|  | 235 | + requestBuilder.firstMatchingHeader(X_AMZ_DECODED_CONTENT_LENGTH) | 
|  | 236 | + .map(Long::parseLong) | 
|  | 237 | + // should not happen, this header is added by | 
|  | 238 | + // moveContentLength | 
|  | 239 | + .orElseThrow(() -> new RuntimeException( | 
|  | 240 | + X_AMZ_DECODED_CONTENT_LENGTH + " header not present")); | 
|  | 241 | + | 
|  | 242 | + long encodedContentLength = calculateEncodedContentLength(request, decodedContentLength, checksum); | 
|  | 243 | + | 
|  | 244 | + if (checksumAlgorithm != null) { | 
|  | 245 | + String checksumHeaderName = checksumHeaderName(checksumAlgorithm); | 
|  | 246 | + request.appendHeader(X_AMZ_TRAILER, checksumHeaderName); | 
|  | 247 | + } | 
|  | 248 | + request.putHeader(Header.CONTENT_LENGTH, Long.toString(encodedContentLength)); | 
|  | 249 | + | 
|  | 250 | + return Pair.of(requestBuilder, p.right()); | 
|  | 251 | + }); | 
|  | 252 | + } | 
|  | 253 | + | 
|  | 254 | + private long calculateEncodedContentLength(SdkHttpRequest.Builder requestBuilder, long decodedContentLength, | 
|  | 255 | + String checksum) { | 
|  | 256 | + long encodedContentLength = 0; | 
|  | 257 | + | 
|  | 258 | + encodedContentLength += calculateExistingTrailersLength(); | 
|  | 259 | + | 
|  | 260 | + switch (checksum) { | 
|  | 261 | + case STREAMING_ECDSA_SIGNED_PAYLOAD: { | 
|  | 262 | + long extensionsLength = 161; // ;chunk-signature:<sigv4a-ecsda hex signature, 144 bytes> | 
|  | 263 | + encodedContentLength += calculateChunksLength(decodedContentLength, extensionsLength); | 
|  | 264 | + break; | 
|  | 265 | + } | 
|  | 266 | + case STREAMING_UNSIGNED_PAYLOAD_TRAILER: | 
|  | 267 | + if (checksumAlgorithm != null) { | 
|  | 268 | + encodedContentLength += calculateChecksumTrailerLength(checksumHeaderName(checksumAlgorithm)); | 
|  | 269 | + } | 
|  | 270 | + encodedContentLength += calculateChunksLength(decodedContentLength, 0); | 
|  | 271 | + break; | 
|  | 272 | + case STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER: { | 
|  | 273 | + long extensionsLength = 161; // ;chunk-signature:<sigv4a-ecsda hex signature, 144 bytes> | 
|  | 274 | + encodedContentLength += calculateChunksLength(decodedContentLength, extensionsLength); | 
|  | 275 | + if (checksumAlgorithm != null) { | 
|  | 276 | + encodedContentLength += calculateChecksumTrailerLength(checksumHeaderName(checksumAlgorithm)); | 
|  | 277 | + } | 
|  | 278 | + encodedContentLength += 170; // x-amz-trailer-signature:<sigv4a-ecsda hex signature, 144 bytes>\r\n | 
|  | 279 | + break; | 
|  | 280 | + } | 
|  | 281 | + default: | 
|  | 282 | + throw new UnsupportedOperationException(); | 
|  | 283 | + } | 
|  | 284 | + | 
|  | 285 | + // terminating \r\n | 
|  | 286 | + encodedContentLength += 2; | 
|  | 287 | + | 
|  | 288 | + return encodedContentLength; | 
|  | 289 | + } | 
|  | 290 | + | 
| 160 | 291 |  /** | 
| 161 | 292 |  * Set up a map of pre-existing trailer (headers) for the given request to be used when chunk-encoding the payload. | 
| 162 | 293 |  * <p> | 
| @@ -270,6 +401,30 @@ private void setupChecksumTrailerIfNeeded(ChunkedEncodedInputStream.Builder buil | 
| 270 | 401 |  builder.inputStream(checksumInputStream).addTrailer(checksumTrailer); | 
| 271 | 402 |  } | 
| 272 | 403 | 
 | 
|  | 404 | + private void setupChecksumTrailerIfNeeded(ChunkedEncodedPayload payload) { | 
|  | 405 | + if (checksumAlgorithm == null) { | 
|  | 406 | + return; | 
|  | 407 | + } | 
|  | 408 | + String checksumHeaderName = checksumHeaderName(checksumAlgorithm); | 
|  | 409 | + | 
|  | 410 | + String cachedChecksum = getCachedChecksum(); | 
|  | 411 | + | 
|  | 412 | + if (cachedChecksum != null) { | 
|  | 413 | + LOG.debug(() -> String.format("Cached payload checksum available for algorithm %s: %s. Using cached value", | 
|  | 414 | + checksumAlgorithm.algorithmId(), checksumHeaderName)); | 
|  | 415 | + payload.addTrailer(() -> Pair.of(checksumHeaderName, Collections.singletonList(cachedChecksum))); | 
|  | 416 | + return; | 
|  | 417 | + } | 
|  | 418 | + | 
|  | 419 | + SdkChecksum sdkChecksum = fromChecksumAlgorithm(checksumAlgorithm); | 
|  | 420 | + payload.checksumPayload(sdkChecksum); | 
|  | 421 | + | 
|  | 422 | + TrailerProvider checksumTrailer = | 
|  | 423 | + new ChecksumTrailerProvider(sdkChecksum, checksumHeaderName, checksumAlgorithm, payloadChecksumStore); | 
|  | 424 | + | 
|  | 425 | + payload.addTrailer(checksumTrailer); | 
|  | 426 | + } | 
|  | 427 | + | 
| 273 | 428 |  private String getCachedChecksum() { | 
| 274 | 429 |  byte[] checksumBytes = payloadChecksumStore.getChecksumValue(checksumAlgorithm); | 
| 275 | 430 |  if (checksumBytes != null) { | 
|  | 
0 commit comments