- Notifications
You must be signed in to change notification settings - Fork 4.9k
🎉 Checkpointing: Worker use destination (instead of source) for state #3290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 Checkpointing: Worker use destination (instead of source) for state #3290
Conversation
| | ||
| messageTracker.accept(message); | ||
| destination.accept(message); | ||
| final Thread sourceThread = new Thread(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should use a newFixedThreadPool(2) instead. It is generally error prone to use thread directly and it will make the join cleaner
| try { | ||
| while (!cancelled.get() && !source.isFinished()) { | ||
| final Optional<AirbyteMessage> messageOptional = source.attemptRead(); | ||
| if (messageOptional.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a potential busy loop here. Shouldn't you have some sleep somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is actually a bit misleading. it isn't really a busy loop. attemptRead blocks until the next line comes in. I think the only time that we return an empty optional is at the end of the stream. I will write a test to refresh my memory on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confirmed it blocks while waiting for input.
| final Mapper<AirbyteMessage> mapper, | ||
| final Destination<AirbyteMessage> destination, | ||
| final MessageTracker<AirbyteMessage> messageTracker) { | ||
| final MessageTracker<AirbyteMessage> sourceMessageTracker, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should pass a TrackedSource and TrackedDestination that are decorators on top of source & destination.
That will make the code impossible to mess up in the future since you only accept once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fiddled with this and was struggling to get something that was actually safer. The trade off seems to exposing the MessageTracker functionality in the Destination and Source. That seems pretty awkward and clutters up pretty important abstractions. I'm down to keep playing with this, but want to do is separately at this point.
| final AirbyteMessage message = mapper.mapMessage(messageOptional.get()); | ||
| | ||
| sourceMessageTracker.accept(message); | ||
| destination.accept(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it where you should be using a blockingqueue? The communication between the source and the destination?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this design would make it extremely clear where the backpressure happens (ie in the worker).
You would just need an additional thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also prepares the code the mappers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. The blocking queue wouldn't do anything for now, since destination.accept applies the back pressure for us. I was going to wait to add the queue until we actually needed it. I can just go ahead and do it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong opinion here! which ever makes the most sense for now. It seems like the current approach is good enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I vote for leaving it out until we actually need it.
airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java Outdated Show resolved Hide resolved
15aabaa to 141ba6c Compare
sherifnada left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requesting changes just to answer a few questions, but overall looks great. Really excited to roll this out -- feels like a user asks about it every week now! 🎉
We should also update the Airbyte protocol docs (very reasonably in a separate PR if needed)
airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java Outdated Show resolved Hide resolved
| output.withState(state); | ||
| }); | ||
| } else if (syncInput.getState() != null) { | ||
| LOGGER.warn("State capture: No new state, falling back on input state: {}", syncInput.getState()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this branch basically for backwards compatibility? If so, should we also check if the sync is successful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really. Basically we don't want to "forget" state, so if we fail to checkpoint at all we just return the initial state because we know that's true.
I need to handle the backwards compatibility case separately. I think that's going to be in here with the logic you described. If we are successful, got no output from the destination but do have output from the source, we will fall back on that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see Sherif's point - is the destination emitting state an invariant? Should we fail loudly if this isn't the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NVM, I just read the PartialSuccessTemporalWorkflow class - my earlier comment doesn't make sense.
For my understanding - when will this happen? If a destination isn't migrated over or if a destination fails before it reaches any one checkpoint? Do we handle partial checkpointing today?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually think i want to skip backwards compatibility and just upgrade all destinations. Mainly I don't think there's a safe way to get out of the backwards compatibility state since we don't have connector - core compatibility safeties. so might as well just make a clean cut and tell people to move to the bleeding edge. #alpha
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought more about this - emitting state should not be an invariant. full refresh streams don't need to output state. if all streams are FR then no state would be emitted.
This being said we aren't catching the case where the destination is e.g: buggy and not outputting state. I don't think we want to fail the sync in that case but it is a bug that could go unnoticed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This being said we aren't catching the case where the destination is e.g: buggy and not outputting state. I don't think we want to fail the sync in that case but it is a bug that could go unnoticed
@sherifnada could you say a bit more here? It seems to me that if a destination is failing to output state when it is expected to that we should fail the sync. If the destination doesn't tell us what records then we can never move state forward. Destination has to be source of truth here. So I think the new contract is that (at least on incremental--although you could make a coherent argument for FR too) that if the destination doesn't output state at least once something has gone wrong.
Typing this out it makes me realize that it would actually probably be helpful for us to track whether the source ever emitted state, so in these failure cases we at least have logs that give us a clue if the destination failed to emit state because of something with it (or the worker) or because the source never emitted any state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed with the bit about tracking source emitting state. The invariants are:
- The destination should output a non-zero number of state messages if the source output any state messages
- The destination should output state messages in monotonically increasing order (depending on order received from the source)
- The last state message output by the destination in a successful sync should == the last state message output by the source
...orkers/src/main/java/io/airbyte/workers/temporal/PartialSuccessTemporalAttemptExecution.java Outdated Show resolved Hide resolved
...orkers/src/main/java/io/airbyte/workers/temporal/PartialSuccessTemporalAttemptExecution.java Outdated Show resolved Hide resolved
...orkers/src/main/java/io/airbyte/workers/temporal/PartialSuccessTemporalAttemptExecution.java Outdated Show resolved Hide resolved
...orkers/src/main/java/io/airbyte/workers/temporal/PartialSuccessTemporalAttemptExecution.java Outdated Show resolved Hide resolved
...orkers/src/main/java/io/airbyte/workers/temporal/PartialSuccessTemporalAttemptExecution.java Outdated Show resolved Hide resolved
airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java Show resolved Hide resolved
...orkers/src/main/java/io/airbyte/workers/temporal/PartialSuccessTemporalAttemptExecution.java Outdated Show resolved Hide resolved
| Also you are a saint for this amazing PR description. Thank you!! 🎉 |
airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java Outdated Show resolved Hide resolved
airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java Outdated Show resolved Hide resolved
...orkers/src/main/java/io/airbyte/workers/temporal/PartialSuccessTemporalAttemptExecution.java Outdated Show resolved Hide resolved
...orkers/src/main/java/io/airbyte/workers/temporal/PartialSuccessTemporalAttemptExecution.java Outdated Show resolved Hide resolved
...orkers/src/main/java/io/airbyte/workers/temporal/PartialSuccessTemporalAttemptExecution.java Outdated Show resolved Hide resolved
airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java Show resolved Hide resolved
airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java Show resolved Hide resolved
airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java Outdated Show resolved Hide resolved
airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java Show resolved Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! General approach is sound.
My comments are around readability and my own understanding.
To respond to your comment about the cleanliness - having aggregation in the SyncWorkflow doesn't look too bad to me. The Sync workflow is quite different from the others already in any case, since the workflow itself is more complicated. Not a big issue for me.
If you want to tackle it, I'd favor putting aggregation into the temporal workflow, rather than wrapping the worker in another worker. Worker in worker smell too complex/too many abstractions layers to me.
dba87c8 to ae161d7 Compare | @michel-tricot @davinchia @sherifnada thank you for the review! I think i have addressed all of your feedback and am ready for another round. |
| long getBytesCount(); | ||
| | ||
| Optional<JsonNode> getOutputState(); | ||
| Optional<State> getOutputState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using the State struct, which is just a wrapper around whatever state the connector returns allows us to differentiate between the case when state is not successfully returned and when state is successfully returned but it is null / empty.
| output.withState(state); | ||
| }); | ||
| } else if (syncInput.getState() != null) { | ||
| LOGGER.warn("State capture: No new state, falling back on input state: {}", syncInput.getState()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought more about this - emitting state should not be an invariant. full refresh streams don't need to output state. if all streams are FR then no state would be emitted.
This being said we aren't catching the case where the destination is e.g: buggy and not outputting state. I don't think we want to fail the sync in that case but it is a bug that could go unnoticed
aa941af to 81cce57 Compare | | ||
| @Override | ||
| public StandardSyncOutput run(StandardSyncInput syncInput, Path jobRoot) throws WorkerException { | ||
| public ReplicationOutput run(StandardSyncInput syncInput, Path jobRoot) throws WorkerException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think it would be helpful to leave a javadoc summarising run behaviour about state emission and saving state. i.e. State remains emitted by the source and is only saved in the Destination. Otherwise I can see someone getting confused about where state originates from.
davinchia left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small comment, otherwise looks great!
| /test connector=destination-bigquery
|
| /test connector=destination-meilisearch
|
| /test connector=destination-mssql
|
| /publish connector=connectors/destination-csv
|
| /publish connector=connectors/destination-local-json
|
| /publish connector=connectors/destination-postgres
|
| /publish connector=connectors/destination-mysql
|
| /publish connector=connectors/destination-snowflake
|
| /publish connector=connectors/destination-redshift
|
| /publish connector=connectors/destination-bigquery
|
| /publish connector=connectors/destination-meilisearch
|
| "/publish connector=connectors/destination-mssql" I did this manually locally since this can't pass a new integration test. This is not a new regression in the connector it is that it has never been able to pass this test |
What
How
DefaultReplicationWorkernow listens to the destination for state and only returns state it gets from the destination. (this part was in the original draft that michel reviewed, everything else is new)ReplicationActivityand new type ofTemporalAttemptExecutioncalledPartialSuccessTemporalAttemptExecution. This new abstraction determines whether, if there is a failure, if we can retry again. It does so up to 3 times. Each attempt it makes it tracks the state.ReplicationOutput.yamlstruct. This is now the output of theDefaultReplicationWorker. Then in theSyncWorkflowwe compile that into theStandardSyncOutput.yamlthat we know and love. I think the weakness of this PR is that some of this retry logic is not in the right place. This aggregation logic living directly in theSyncWorkflowbreaks conventions we have already. I could push the retries into a worker that wraps the existingDefaultReplicationWorkeror just find a way to push the aggregation into thePartialSuccessTemporalAttemptExecution. I'm happy to do either of these (or something different) based on review. Also happy to just leave it as is for now.Pre-merge Checklist
BufferedStreamConsumer. Will either need to adjust destinations or switch in the worker (temporarily) before merging.Recommended reading order
DefaultReplicationWorker.javaPartialSuccessTemporalAttemptExecution.javaSyncWorkflow.java┆Issue is synchronized with this Asana task by Unito