- Notifications
You must be signed in to change notification settings - Fork 25.5k
Support concurrent multipart uploads in Azure #128449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Hi @tlrx, I've created a changelog YAML for you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, left a few comments.
.getBlobAsyncClient(blobName) | ||
.getBlockBlobAsyncClient(); | ||
| ||
Flux.fromIterable(multiparts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:thu: neat
.block(); | ||
} | ||
} catch (final BlobStorageException e) { | ||
if (failIfAlreadyExists |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be follow-up for sure, but I guess we may need some cleanup to deal with failures, deleting the staged blocks? I notice that the original version also does not seem to care so we can defer for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there is more work to do for handling failures. I'll create a task for this if we decide to go this route.
modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java Outdated Show resolved Hide resolved
modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java Outdated Show resolved Hide resolved
| ||
private static InputStream toSynchronizedInputStream(String blobName, InputStream delegate, MultiPart multipart) { | ||
assert delegate.markSupported() : "An InputStream with mark support was expected"; | ||
// We need to introduce a read barrier in order to provide visibility for the underlying |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us leave this for now, but it seems strange to need this synchronized part really, since we expect it to be used serially only anyway - and this must require a happens-before relationship established in reactor code even if this is used across threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I commented #128449 (comment)
stream.mark(Integer.MAX_VALUE); | ||
final var bytesRead = new AtomicLong(0L); | ||
return Flux.defer(() -> { | ||
// Code in this Flux.defer() can be concurrently executed by multiple threads |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it? I think that would break everything. I think it is an existing assumption and code comment copy - but if you agree that it should not perhaps worth adding a question mark/todo around it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the case when I worked on this last week, I could see different repository_azure
threads resetting the same input stream instance concurrently so I assumed the existing comment was also applicable.
I updated the comment so that we can revisit this later, now I fixed the bugs I made.
modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java Outdated Show resolved Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
We should tackle tests too, but I am good with merging this since it is not really used and we can start benchmarking the effect more easily then.
Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination) |
Thanks Henning! |
Enhances existing integration test to account for #128449. Relates ES-11815
Relates ES-11815