Skip to content

Conversation

@cgardens
Copy link
Contributor

@cgardens cgardens commented May 7, 2021

What

  • Switch to listening to the destination to get state instead of the source.

How

  • DefaultReplicationWorker now listens to the destination for state and only returns state it gets from the destination. (this part was in the original draft that michel reviewed, everything else is new)
  • In the ReplicationActivity and new type of TemporalAttemptExecution called PartialSuccessTemporalAttemptExecution. This new abstraction determines whether, if there is a failure, if we can retry again. It does so up to 3 times. Each attempt it makes it tracks the state.
  • Add ReplicationOutput.yaml struct. This is now the output of the DefaultReplicationWorker. Then in the SyncWorkflow we compile that into the StandardSyncOutput.yaml that we know and love. I think the weakness of this PR is that some of this retry logic is not in the right place. This aggregation logic living directly in the SyncWorkflow breaks conventions we have already. I could push the retries into a worker that wraps the existing DefaultReplicationWorker or just find a way to push the aggregation into the PartialSuccessTemporalAttemptExecution. I'm happy to do either of these (or something different) based on review. Also happy to just leave it as is for now.
  • No changes needed to the API or UI (a trade off of this is that there is some information about what's going on under the hood that is obfuscated)
  • Tests coming in next PR (Checkpointing Testing #3473)

Pre-merge Checklist

  • This PR is currently breaking for all destination that don't use BufferedStreamConsumer. Will either need to adjust destinations or switch in the worker (temporarily) before merging.
  • Incorporate tests into the replication worker.
  • Figure out activity lifecycle / retries.
  • migrate all destinations to output state

Recommended reading order

  1. DefaultReplicationWorker.java
  2. PartialSuccessTemporalAttemptExecution.java
  3. SyncWorkflow.java
  4. the rest

┆Issue is synchronized with this Asana task by Unito


messageTracker.accept(message);
destination.accept(message);
final Thread sourceThread = new Thread(() -> {
Copy link
Contributor

@michel-tricot michel-tricot May 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use a newFixedThreadPool(2) instead. It is generally error prone to use thread directly and it will make the join cleaner

try {
while (!cancelled.get() && !source.isFinished()) {
final Optional<AirbyteMessage> messageOptional = source.attemptRead();
if (messageOptional.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a potential busy loop here. Shouldn't you have some sleep somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is actually a bit misleading. it isn't really a busy loop. attemptRead blocks until the next line comes in. I think the only time that we return an empty optional is at the end of the stream. I will write a test to refresh my memory on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirmed it blocks while waiting for input.

final Mapper<AirbyteMessage> mapper,
final Destination<AirbyteMessage> destination,
final MessageTracker<AirbyteMessage> messageTracker) {
final MessageTracker<AirbyteMessage> sourceMessageTracker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should pass a TrackedSource and TrackedDestination that are decorators on top of source & destination.

That will make the code impossible to mess up in the future since you only accept once

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fiddled with this and was struggling to get something that was actually safer. The trade off seems to exposing the MessageTracker functionality in the Destination and Source. That seems pretty awkward and clutters up pretty important abstractions. I'm down to keep playing with this, but want to do is separately at this point.

final AirbyteMessage message = mapper.mapMessage(messageOptional.get());

sourceMessageTracker.accept(message);
destination.accept(message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it where you should be using a blockingqueue? The communication between the source and the destination?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this design would make it extremely clear where the backpressure happens (ie in the worker).

You would just need an additional thread

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also prepares the code the mappers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. The blocking queue wouldn't do anything for now, since destination.accept applies the back pressure for us. I was going to wait to add the queue until we actually needed it. I can just go ahead and do it now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion here! which ever makes the most sense for now. It seems like the current approach is good enough

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote for leaving it out until we actually need it.

@cgardens cgardens force-pushed the cgardens/checkpointing_respect_destination_state branch from 15aabaa to 141ba6c Compare May 17, 2021 15:57
@cgardens cgardens marked this pull request as ready for review May 18, 2021 22:55
@cgardens cgardens mentioned this pull request May 18, 2021
1 task
Copy link
Contributor

@sherifnada sherifnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting changes just to answer a few questions, but overall looks great. Really excited to roll this out -- feels like a user asks about it every week now! 🎉

We should also update the Airbyte protocol docs (very reasonably in a separate PR if needed)

output.withState(state);
});
} else if (syncInput.getState() != null) {
LOGGER.warn("State capture: No new state, falling back on input state: {}", syncInput.getState());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this branch basically for backwards compatibility? If so, should we also check if the sync is successful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. Basically we don't want to "forget" state, so if we fail to checkpoint at all we just return the initial state because we know that's true.

I need to handle the backwards compatibility case separately. I think that's going to be in here with the logic you described. If we are successful, got no output from the destination but do have output from the source, we will fall back on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see Sherif's point - is the destination emitting state an invariant? Should we fail loudly if this isn't the case?

Copy link
Contributor

@davinchia davinchia May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM, I just read the PartialSuccessTemporalWorkflow class - my earlier comment doesn't make sense.

For my understanding - when will this happen? If a destination isn't migrated over or if a destination fails before it reaches any one checkpoint? Do we handle partial checkpointing today?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think i want to skip backwards compatibility and just upgrade all destinations. Mainly I don't think there's a safe way to get out of the backwards compatibility state since we don't have connector - core compatibility safeties. so might as well just make a clean cut and tell people to move to the bleeding edge. #alpha

Copy link
Contributor

@sherifnada sherifnada May 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought more about this - emitting state should not be an invariant. full refresh streams don't need to output state. if all streams are FR then no state would be emitted.

This being said we aren't catching the case where the destination is e.g: buggy and not outputting state. I don't think we want to fail the sync in that case but it is a bug that could go unnoticed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This being said we aren't catching the case where the destination is e.g: buggy and not outputting state. I don't think we want to fail the sync in that case but it is a bug that could go unnoticed

@sherifnada could you say a bit more here? It seems to me that if a destination is failing to output state when it is expected to that we should fail the sync. If the destination doesn't tell us what records then we can never move state forward. Destination has to be source of truth here. So I think the new contract is that (at least on incremental--although you could make a coherent argument for FR too) that if the destination doesn't output state at least once something has gone wrong.

Typing this out it makes me realize that it would actually probably be helpful for us to track whether the source ever emitted state, so in these failure cases we at least have logs that give us a clue if the destination failed to emit state because of something with it (or the worker) or because the source never emitted any state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed with the bit about tracking source emitting state. The invariants are:

  1. The destination should output a non-zero number of state messages if the source output any state messages
  2. The destination should output state messages in monotonically increasing order (depending on order received from the source)
  3. The last state message output by the destination in a successful sync should == the last state message output by the source
@sherifnada
Copy link
Contributor

Also you are a saint for this amazing PR description. Thank you!! 🎉

Copy link
Contributor

@davinchia davinchia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! General approach is sound.

My comments are around readability and my own understanding.

To respond to your comment about the cleanliness - having aggregation in the SyncWorkflow doesn't look too bad to me. The Sync workflow is quite different from the others already in any case, since the workflow itself is more complicated. Not a big issue for me.

If you want to tackle it, I'd favor putting aggregation into the temporal workflow, rather than wrapping the worker in another worker. Worker in worker smell too complex/too many abstractions layers to me.

@cgardens cgardens force-pushed the cgardens/checkpointing_respect_destination_state branch from dba87c8 to ae161d7 Compare May 21, 2021 05:36
@cgardens
Copy link
Contributor Author

@michel-tricot @davinchia @sherifnada thank you for the review! I think i have addressed all of your feedback and am ready for another round.

long getBytesCount();

Optional<JsonNode> getOutputState();
Optional<State> getOutputState();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using the State struct, which is just a wrapper around whatever state the connector returns allows us to differentiate between the case when state is not successfully returned and when state is successfully returned but it is null / empty.

output.withState(state);
});
} else if (syncInput.getState() != null) {
LOGGER.warn("State capture: No new state, falling back on input state: {}", syncInput.getState());
Copy link
Contributor

@sherifnada sherifnada May 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought more about this - emitting state should not be an invariant. full refresh streams don't need to output state. if all streams are FR then no state would be emitted.

This being said we aren't catching the case where the destination is e.g: buggy and not outputting state. I don't think we want to fail the sync in that case but it is a bug that could go unnoticed

@cgardens cgardens force-pushed the cgardens/checkpointing_respect_destination_state branch 2 times, most recently from aa941af to 81cce57 Compare May 22, 2021 20:58

@Override
public StandardSyncOutput run(StandardSyncInput syncInput, Path jobRoot) throws WorkerException {
public ReplicationOutput run(StandardSyncInput syncInput, Path jobRoot) throws WorkerException {
Copy link
Contributor

@davinchia davinchia May 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it would be helpful to leave a javadoc summarising run behaviour about state emission and saving state. i.e. State remains emitted by the source and is only saved in the Destination. Otherwise I can see someone getting confused about where state originates from.

Copy link
Contributor

@davinchia davinchia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small comment, otherwise looks great!

@cgardens
Copy link
Contributor Author

cgardens commented May 25, 2021

/test connector=destination-bigquery

🕑 destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/876477126
✅ destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/876477126

@cgardens
Copy link
Contributor Author

cgardens commented May 25, 2021

/test connector=destination-meilisearch

🕑 destination-meilisearch https://github.com/airbytehq/airbyte/actions/runs/876477259
✅ destination-meilisearch https://github.com/airbytehq/airbyte/actions/runs/876477259

@cgardens
Copy link
Contributor Author

cgardens commented May 25, 2021

/test connector=destination-mssql

🕑 destination-mssql https://github.com/airbytehq/airbyte/actions/runs/876477317
❌ destination-mssql https://github.com/airbytehq/airbyte/actions/runs/876477317

@cgardens
Copy link
Contributor Author

cgardens commented May 25, 2021

/publish connector=connectors/destination-csv

🕑 connectors/destination-csv https://github.com/airbytehq/airbyte/actions/runs/876540224
✅ connectors/destination-csv https://github.com/airbytehq/airbyte/actions/runs/876540224

@cgardens
Copy link
Contributor Author

cgardens commented May 25, 2021

/publish connector=connectors/destination-local-json

🕑 connectors/destination-local-json https://github.com/airbytehq/airbyte/actions/runs/876540691
✅ connectors/destination-local-json https://github.com/airbytehq/airbyte/actions/runs/876540691

@cgardens
Copy link
Contributor Author

cgardens commented May 25, 2021

/publish connector=connectors/destination-postgres

🕑 connectors/destination-postgres https://github.com/airbytehq/airbyte/actions/runs/876540906
✅ connectors/destination-postgres https://github.com/airbytehq/airbyte/actions/runs/876540906

@cgardens
Copy link
Contributor Author

cgardens commented May 25, 2021

/publish connector=connectors/destination-mysql

🕑 connectors/destination-mysql https://github.com/airbytehq/airbyte/actions/runs/876541430
✅ connectors/destination-mysql https://github.com/airbytehq/airbyte/actions/runs/876541430

@cgardens
Copy link
Contributor Author

cgardens commented May 25, 2021

/publish connector=connectors/destination-snowflake

🕑 connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/876541600
❌ connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/876541600

@cgardens
Copy link
Contributor Author

cgardens commented May 25, 2021

/publish connector=connectors/destination-redshift

🕑 connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/876541858
✅ connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/876541858

@cgardens
Copy link
Contributor Author

cgardens commented May 25, 2021

/publish connector=connectors/destination-bigquery

🕑 connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/876541951
✅ connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/876541951

@cgardens
Copy link
Contributor Author

cgardens commented May 25, 2021

/publish connector=connectors/destination-meilisearch

🕑 connectors/destination-meilisearch https://github.com/airbytehq/airbyte/actions/runs/876542190
✅ connectors/destination-meilisearch https://github.com/airbytehq/airbyte/actions/runs/876542190

@cgardens
Copy link
Contributor Author

"/publish connector=connectors/destination-mssql"

I did this manually locally since this can't pass a new integration test. This is not a new regression in the connector it is that it has never been able to pass this test

@cgardens cgardens merged commit aa6afb7 into master May 25, 2021
@cgardens cgardens deleted the cgardens/checkpointing_respect_destination_state branch May 25, 2021 23:47
@cgardens cgardens changed the title Checkpointing: Worker use destination (instead of source) for state 🎉 Checkpointing: Worker use destination (instead of source) for state May 25, 2021
@cgardens cgardens changed the title 🎉 Checkpointing: Worker use destination (instead of source) for state 🎉 Checkpointing: Worker use destination (instead of source) for state May 25, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment