Skip to content

Conversation

@kessplas
Copy link
Contributor

@kessplas kessplas commented May 1, 2025

Issue #, if available:

Description of changes:

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

Check any applicable:

  • Were any files moved? Moving files changes their URL, which breaks all hyperlinks to the files.
@kessplas kessplas marked this pull request as ready for review May 1, 2025 18:15
@kessplas kessplas requested a review from a team as a code owner May 1, 2025 18:15
@kessplas kessplas changed the title [DRAFT] Kessplas/last on next fix: fix CipherSubscriber to only call onNext once per request May 1, 2025
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
/*
Check if stream has read all expected content.
Once all content has been read, call onComplete.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Once all content has been read, call onComplete.
Once all content has been read, call finalBytes.
Comment on lines 88 to 91
// All content has been read; complete the stream.
// (Signalling onComplete from here is Reactive Streams-spec compliant;
// this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.)
finalBytes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// All content has been read; complete the stream.
// (Signalling onComplete from here is Reactive Streams-spec compliant;
// this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.)
finalBytes();
// All content has been read; compute finalBytes
finalBytes();
public void onComplete() {
// In rare cases, e.g. when the last part of a low-level MPU has 0 length,
// onComplete will be called before onNext is called once.
if (contentRead.get() < contentLength) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this calculation include the tag length?
It feels weird that this formula is different than the other ones

Suggested change
if (contentRead.get() < contentLength) {
if (contentRead.get() + tagLength < contentLength) {
Comment on lines +128 to +130
if (contentRead.get() + tagLength <= contentLength) {
finalBytes();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just be <?

Suggested change
if (contentRead.get() + tagLength <= contentLength) {
finalBytes();
}
if (contentRead.get() + tagLength < contentLength) {
finalBytes();
}

Since if they're equal, then we know the finalBytes already happened? Maybe?
Either way, if finalBytes already happened, then finalBytesCalled would guard against this, so this is fine.

@kessplas kessplas merged commit 646b735 into main May 2, 2025
31 of 35 checks passed
@kessplas kessplas deleted the kessplas/last-on-next branch May 2, 2025 17:35
josecorella pushed a commit that referenced this pull request May 5, 2025
## [3.3.3](v3.3.2...v3.3.3) (2025-05-05) ### Fixes * fix CipherSubscriber to only call onNext once per request ([#456](#456)) ([646b735](646b735))
if (contentRead.get() + tagLength >= contentLength) {
// All content has been read, so complete to get the final bytes
this.onComplete();
finalBytes();
Copy link

@davidconnard davidconnard May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @kessplas

Is the lack of calling onComplete() after finalBytes() deliberate? There seems to be a change in behaviour here. edit: this doesn't seem to be a problem, I've downloaded the src directly and confirmed that re-adding it makes no difference.

FWIW, I'm looking into a failure in our software that has been introduced with v3.3.3 of this SDK, and it seems to relate to our Kotlin Flows not completing normally and instead "cancelling", which leads to our completion handlers not firing properly. My current working theory is that the missing onComplete() here is leading to the Kotlin cancellation, when the upload Flow has completed sending all ByteBuffers and shutdown, but without the subscriber having properly called onComplete(). edit: this is not the case, and I'm still confused

Any idea what might be going on here? Are you sure that this change was safe?

Copy link
Contributor Author

@kessplas kessplas May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @davidconnard ,

This change made the CipherSubscriber more spec compliant as onNext() MUST be only called once for one onNext() signal. The spec is not abundantly clear about the validity of signaling onComplete() from within onNext() but intuitively, the onComplete() signal should come from the publisher once all data is published and flow through the subscribers accordingly; calling onComplete() from within onNext() subverts this expectation.

We don't specifically support Kotlin, as we don't test against the Kotlin SDK. That said, I would recommend opening an issue for this, as it's easier for us to track than comments on a specific PR. Thanks!

Copy link

@davidconnard davidconnard May 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kessplas for the response ... Yesterday, I didn't yet have enough information to raise an issue. I've done some further digging, and I've uncovered where the change in behaviour is coming from.

In the AWS SDK, in the netty-nio-client, in the NettyRequestExecutor.StreamedRequest class, in the onNext() method, there is a check for shouldContinuePublishing(), which checks if the content-length has been reached. When it has been reached, the subscription is cancelled, and an onComplete() call is made. See https://github.com/aws/aws-sdk-java-v2/blob/7009f86260a3d77f811c1dde31679d1297c1cc01/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java#L453-L456

It is this subscription cancellation that is causing my test failure. Encountering this subscription cancellation behaviour is new (as far as my test is concerned). Before this change, the final content-length would only be reached (ie. final bytes sent) as a side-effect of the onComplete() call, and the subscription cancellation would have no actual effect (ie. would not fire on onError() for the publisher). With your changes, the final content-length is reached in the terminal onNext() call, and the cancellation (triggered by the AWS SDK) fires at that point (prior to publisher completion), leading to an onError() call, which triggers different completion behaviour in the Kotlin Flow.

So, this isn't a problem with your change, and apologies for that! Previously, the CipherPublisher was hiding this early cancellation behaviour of the AWS SDK from our flow, and your change has simply exposed our code to this behaviour in the underlying AWS SDK. If I remove the AWS S3 Encryption SDK from the picture, and apply our Flow transformer (with its completion handler) over the raw AWS S3 SDK, then I see the exact same behaviour - our completion handler is invoked (with an error) after cancellation of the subscription/flow).

The solution for us appears to be to restructure our flow / publisher logic. For reference, when our requests to S3 are using the AWS S3 encryption SDK (which is not all the time), we are wrapping the byte stream publisher in another publisher, which performs additional verification of the plaintext (original bytes) checksum prior to sending to S3. To date, we had used onNext() calls to accumulate the plaintext checksum, and we performed the verification in the onComplete() call. While that worked, on reflection, it does not seem to be the correct way to verify this, and it only worked because of the CipherSubscriber previously delayed sending the final bytes to S3 (and therefore, delayed the cancellation) until the onComplete() call (rather than having it fire from the terminal onNext() call).

The right solution seems to be to change our code to perform the verification in the terminal onNext() call (by tracking the byte count, like the AWS SDK is doing). This will verify the plaintext checksum before sending the final bytes to the wrapped subscriber (ie. CipherSubscriber) in the chained onNext() call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

3 participants