Skip to content

Conversation

disq
Copy link
Member

@disq disq commented Oct 2, 2024

Actual clean up by @murarustefaan but I've done some minor updates to it and updated tests.

Still need to test it E2E. Seems to work, handled the panic: arrow/array: number of columns/fields mismatch [recovered] error immediately (I tested with an old version of the S3 plugin, had to manually bump Arrow to v17)

@github-actions github-actions bot added the fix label Oct 2, 2024
t.Fatalf("expected 0 open tables, got %d", l)
}

if l := testClient.MessageLen(messageTypeInsert); l != 3 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added two inserts above, but increased this from 3 to 4... This was incorrect to expect 3 because the flushing flow has been different for some time, and you don't need the third message to make it flush.

case err := <-outputCh:
if err != nil {
s.errCh <- fmt.Errorf("handler failed on %s: %w", tableName, err)
return
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This return statement is my actual contribution apart from renaming variables and updating tests, oh and better handling of error type panics.

@github-actions github-actions bot added fix and removed fix labels Oct 2, 2024
@disq disq marked this pull request as ready for review October 2, 2024 11:03
@disq disq requested review from a team and marianogappa October 2, 2024 11:03
@github-actions github-actions bot added the fix label Oct 2, 2024
for {
select {
case msg := <-msgs:
if msg == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't you want to msg, ok := instead here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that would be clearer, fixed in 512d6c0

w.lastMsgType = msgType
if err := w.startWorker(ctx, errCh, msg); err != nil {

case err := <-errCh:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous logic wasn't shutting down on error; is this correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous logic would do a w.logger.Err(err).Msg("error from StreamingBatchWriter") and continue working.

@marianogappa marianogappa requested a review from erezrokah October 2, 2024 11:49
@marianogappa
Copy link
Contributor

@erezrokah adding you as well; this is a little tricky with not that much experience with the writer

@disq
Copy link
Member Author

disq commented Oct 2, 2024

Merge at will, I'm not merging right now to prevent any potential noise. (and the unreleased SDK already has some interesting changes)

@kodiakhq kodiakhq bot merged commit d852119 into main Oct 2, 2024
7 checks passed
@kodiakhq kodiakhq bot deleted the fix/streamingbatchwriter-error-handling branch October 2, 2024 16:49
kodiakhq bot pushed a commit that referenced this pull request Oct 2, 2024
🤖 I have created a release *beep* *boop* --- ## [4.64.1](v4.64.0...v4.64.1) (2024-10-02) ### Bug Fixes * Error handling in StreamingBatchWriter ([#1913](#1913)) ([d852119](d852119)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
@erezrokah
Copy link
Member

Looks like this broke something cloudquery/cloudquery#19312

kodiakhq bot pushed a commit that referenced this pull request Jun 9, 2025
original PR: #1913 Reverts the revertion, plus more improvements: - Client's handlers are now allowed to return an immediate error without draining the channel. The channel will be automatically drained for the error condition, and subsequent writes for that table won't get sent to the client. - Some potential race conditions are fixed (`ensureOpened()` inline-func now got refactored into `s.send()` and handles the sending as well. The spawned goroutine doesn't refer to `inputCh` directly so that it can be replaced in `closeFlush()`. - Shutdown logic is handled better: even if `client.Write()` returns an error after `msgs` is closed, it's still logged and returned.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

4 participants