|
21 | 21 |
|
22 | 22 | import com.amazonaws.AmazonClientException; |
23 | 23 | import com.amazonaws.services.s3.model.*; |
| 24 | +import com.amazonaws.util.Base64; |
| 25 | +import org.elasticsearch.common.logging.ESLogger; |
| 26 | +import org.elasticsearch.common.logging.Loggers; |
24 | 27 | import org.elasticsearch.common.unit.ByteSizeUnit; |
25 | 28 | import org.elasticsearch.common.unit.ByteSizeValue; |
26 | 29 |
|
27 | 30 | import java.io.ByteArrayInputStream; |
28 | 31 | import java.io.IOException; |
29 | 32 | import java.io.InputStream; |
| 33 | +import java.security.DigestInputStream; |
| 34 | +import java.security.MessageDigest; |
| 35 | +import java.security.NoSuchAlgorithmException; |
30 | 36 | import java.util.ArrayList; |
31 | 37 | import java.util.List; |
32 | 38 |
|
|
49 | 55 | public class DefaultS3OutputStream extends S3OutputStream { |
50 | 56 |
|
51 | 57 | private static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB); |
52 | | - |
| 58 | + private static final ESLogger logger = Loggers.getLogger("cloud.aws"); |
53 | 59 | /** |
54 | 60 | * Multipart Upload API data |
55 | 61 | */ |
@@ -120,7 +126,28 @@ protected void doUpload(S3BlobStore blobStore, String bucketName, String blobNam |
120 | 126 | md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); |
121 | 127 | } |
122 | 128 | md.setContentLength(length); |
123 | | - blobStore.client().putObject(bucketName, blobName, is, md); |
| 129 | + |
| 130 | + InputStream inputStream = is; |
| 131 | + |
| 132 | + // We try to compute a MD5 while reading it |
| 133 | + MessageDigest messageDigest; |
| 134 | + try { |
| 135 | + messageDigest = MessageDigest.getInstance("MD5"); |
| 136 | + inputStream = new DigestInputStream(is, messageDigest); |
| 137 | + } catch (NoSuchAlgorithmException impossible) { |
| 138 | + // Every implementation of the Java platform is required to support MD5 (see MessageDigest) |
| 139 | + throw new RuntimeException(impossible); |
| 140 | + } |
| 141 | + PutObjectResult putObjectResult = blobStore.client().putObject(bucketName, blobName, inputStream, md); |
| 142 | + |
| 143 | + String localMd5 = Base64.encodeAsString(messageDigest.digest()); |
| 144 | + String remoteMd5 = putObjectResult.getContentMd5(); |
| 145 | + if (!localMd5.equals(remoteMd5)) { |
| 146 | + logger.debug("MD5 local [{}], remote [{}] are not equal...", localMd5, remoteMd5); |
| 147 | + throw new AmazonS3Exception("MD5 local [" + localMd5 + |
| 148 | + "], remote [" + remoteMd5 + |
| 149 | + "] are not equal..."); |
| 150 | + } |
124 | 151 | } |
125 | 152 |
|
126 | 153 | private void initializeMultipart() { |
|
0 commit comments