Skip to content

Commit c3987a9

Browse files
authored
Skip validation when previous state is empty due to reset (#20585)
* Skip validation when previous state is empty due to reset * Handle null state object * Fix formatting * Fix logic * Fix method name
1 parent 4897e29 commit c3987a9

File tree

2 files changed

+187
-26
lines changed

2 files changed

+187
-26
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,8 @@ public boolean persist(final UUID connectionId, final StandardSyncOutput syncOut
5757
AirbyteApiClient.retryWithJitter(
5858
() -> airbyteApiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)),
5959
"get state");
60-
if (featureFlags.needStateValidation() && previousState != null) {
61-
final StateType newStateType = maybeStateWrapper.get().getStateType();
62-
final StateType prevStateType = convertClientStateTypeToInternal(previousState.getStateType());
6360

64-
if (isMigration(newStateType, prevStateType) && newStateType == StateType.STREAM) {
65-
validateStreamStates(maybeStateWrapper.get(), configuredCatalog);
66-
}
67-
}
61+
validate(configuredCatalog, maybeStateWrapper, previousState);
6862

6963
AirbyteApiClient.retryWithJitter(
7064
() -> {
@@ -85,6 +79,42 @@ public boolean persist(final UUID connectionId, final StandardSyncOutput syncOut
8579
}
8680
}
8781

82+
/**
83+
* Validates whether it is safe to persist the new state based on the previously saved state.
84+
*
85+
* @param configuredCatalog The configured catalog of streams for the connection.
86+
* @param newState The new state.
87+
* @param previousState The previous state.
88+
*/
89+
private void validate(final ConfiguredAirbyteCatalog configuredCatalog,
90+
final Optional<StateWrapper> newState,
91+
final ConnectionState previousState) {
92+
/**
93+
* If state validation is enabled and the previous state exists and is not empty, make sure that
94+
* state will not be lost as part of the migration from legacy -> per stream.
95+
*
96+
* Otherwise, it is okay to update if the previous state is missing or empty.
97+
*/
98+
if (featureFlags.needStateValidation() && !isStateEmpty(previousState)) {
99+
final StateType newStateType = newState.get().getStateType();
100+
final StateType prevStateType = convertClientStateTypeToInternal(previousState.getStateType());
101+
102+
if (isMigration(newStateType, prevStateType) && newStateType == StateType.STREAM) {
103+
validateStreamStates(newState.get(), configuredCatalog);
104+
}
105+
}
106+
}
107+
108+
/**
109+
* Test whether the connection state is empty.
110+
*
111+
* @param connectionState The connection state.
112+
* @return {@code true} if the connection state is null or empty, {@code false} otherwise.
113+
*/
114+
private boolean isStateEmpty(final ConnectionState connectionState) {
115+
return connectionState == null || connectionState.getState() == null || connectionState.getState().isEmpty();
116+
}
117+
88118
@VisibleForTesting
89119
void validateStreamStates(final StateWrapper state, final ConfiguredAirbyteCatalog configuredCatalog) {
90120
final List<StreamDescriptor> stateStreamDescriptors =

airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/PersistStateActivityTest.java

Lines changed: 150 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,19 @@
44

55
package io.airbyte.workers.temporal.sync;
66

7+
import static org.mockito.ArgumentMatchers.any;
8+
import static org.mockito.Mockito.mock;
79
import static org.mockito.Mockito.spy;
10+
import static org.mockito.Mockito.when;
811

912
import com.fasterxml.jackson.databind.JsonNode;
1013
import io.airbyte.api.client.AirbyteApiClient;
1114
import io.airbyte.api.client.generated.StateApi;
1215
import io.airbyte.api.client.invoker.generated.ApiException;
16+
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
17+
import io.airbyte.api.client.model.generated.ConnectionState;
1318
import io.airbyte.api.client.model.generated.ConnectionStateCreateOrUpdate;
19+
import io.airbyte.api.client.model.generated.ConnectionStateType;
1420
import io.airbyte.commons.features.FeatureFlags;
1521
import io.airbyte.commons.json.Jsons;
1622
import io.airbyte.config.StandardSyncOutput;
@@ -42,6 +48,10 @@
4248
class PersistStateActivityTest {
4349

4450
private final static UUID CONNECTION_ID = UUID.randomUUID();
51+
private static final String STREAM_A = "a";
52+
private static final String STREAM_A_NAMESPACE = "a1";
53+
private static final String STREAM_B = "b";
54+
private static final String STREAM_C = "c";
4555

4656
@Mock
4757
AirbyteApiClient airbyteApiClient;
@@ -78,7 +88,7 @@ void testPersistEmpty() {
7888

7989
@Test
8090
void testPersist() throws ApiException {
81-
Mockito.when(featureFlags.useStreamCapableState()).thenReturn(true);
91+
when(featureFlags.useStreamCapableState()).thenReturn(true);
8292

8393
final JsonNode jsonState = Jsons.jsonNode(Map.ofEntries(
8494
Map.entry("some", "state")));
@@ -88,7 +98,7 @@ void testPersist() throws ApiException {
8898
persistStateActivity.persist(CONNECTION_ID, new StandardSyncOutput().withState(state), new ConfiguredAirbyteCatalog());
8999

90100
// The ser/der of the state into a state wrapper is tested in StateMessageHelperTest
91-
Mockito.verify(stateApi).createOrUpdateState(Mockito.any(ConnectionStateCreateOrUpdate.class));
101+
Mockito.verify(stateApi).createOrUpdateState(any(ConnectionStateCreateOrUpdate.class));
92102
}
93103

94104
// For per-stream state, we expect there to be state for each stream within the configured catalog
@@ -97,8 +107,9 @@ void testPersist() throws ApiException {
97107
// catalog has a state message when migrating from Legacy to Per-Stream
98108
@Test
99109
void testPersistWithValidMissingStateDuringMigration() throws ApiException {
100-
final ConfiguredAirbyteStream stream = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a").withNamespace("a1"));
101-
final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b"));
110+
final ConfiguredAirbyteStream stream =
111+
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_A).withNamespace(STREAM_A_NAMESPACE));
112+
final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_B));
102113

103114
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
104115
.withType(AirbyteStateType.STREAM)
@@ -110,19 +121,20 @@ void testPersistWithValidMissingStateDuringMigration() throws ApiException {
110121

111122
final ConfiguredAirbyteCatalog migrationConfiguredCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(stream, stream2));
112123
final StandardSyncOutput syncOutput = new StandardSyncOutput().withState(state);
113-
Mockito.when(featureFlags.useStreamCapableState()).thenReturn(true);
124+
when(featureFlags.useStreamCapableState()).thenReturn(true);
114125

115-
mockedStateMessageHelper.when(() -> StateMessageHelper.isMigration(Mockito.eq(StateType.STREAM), Mockito.any(StateType.class))).thenReturn(true);
126+
mockedStateMessageHelper.when(() -> StateMessageHelper.isMigration(Mockito.eq(StateType.STREAM), any(StateType.class))).thenReturn(true);
116127
persistStateActivity.persist(CONNECTION_ID, syncOutput, migrationConfiguredCatalog);
117-
Mockito.verify(stateApi).createOrUpdateState(Mockito.any(ConnectionStateCreateOrUpdate.class));
128+
Mockito.verify(stateApi).createOrUpdateState(any(ConnectionStateCreateOrUpdate.class));
118129
}
119130

120131
@Test
121132
void testPersistWithValidStateDuringMigration() throws ApiException {
122-
final ConfiguredAirbyteStream stream = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a").withNamespace("a1"));
123-
final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b"));
133+
final ConfiguredAirbyteStream stream =
134+
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_A).withNamespace(STREAM_A_NAMESPACE));
135+
final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_B));
124136
final ConfiguredAirbyteStream stream3 =
125-
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c")).withSyncMode(SyncMode.FULL_REFRESH);
137+
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_C)).withSyncMode(SyncMode.FULL_REFRESH);
126138

127139
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
128140
.withType(AirbyteStateType.STREAM)
@@ -138,30 +150,149 @@ void testPersistWithValidStateDuringMigration() throws ApiException {
138150

139151
final ConfiguredAirbyteCatalog migrationConfiguredCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(stream, stream2, stream3));
140152
final StandardSyncOutput syncOutput = new StandardSyncOutput().withState(state);
141-
Mockito.when(featureFlags.useStreamCapableState()).thenReturn(true);
142-
mockedStateMessageHelper.when(() -> StateMessageHelper.isMigration(Mockito.eq(StateType.STREAM), Mockito.any(StateType.class))).thenReturn(true);
153+
when(featureFlags.useStreamCapableState()).thenReturn(true);
154+
mockedStateMessageHelper.when(() -> StateMessageHelper.isMigration(Mockito.eq(StateType.STREAM), any(StateType.class))).thenReturn(true);
143155
persistStateActivity.persist(CONNECTION_ID, syncOutput, migrationConfiguredCatalog);
144-
Mockito.verify(stateApi).createOrUpdateState(Mockito.any(ConnectionStateCreateOrUpdate.class));
156+
Mockito.verify(stateApi).createOrUpdateState(any(ConnectionStateCreateOrUpdate.class));
145157
}
146158

147159
// Global stream states do not need to be validated during the migration to per-stream state
148160
@Test
149161
void testPersistWithGlobalStateDuringMigration() throws ApiException {
150-
final ConfiguredAirbyteStream stream = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a").withNamespace("a1"));
151-
final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b"));
162+
final ConfiguredAirbyteStream stream =
163+
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_A).withNamespace(STREAM_A_NAMESPACE));
164+
final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_B));
152165

153166
final AirbyteStateMessage stateMessage = new AirbyteStateMessage().withType(AirbyteStateType.GLOBAL);
154167
final JsonNode jsonState = Jsons.jsonNode(List.of(stateMessage));
155168
final State state = new State().withState(jsonState);
156169

157170
final ConfiguredAirbyteCatalog migrationConfiguredCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(stream, stream2));
158171
final StandardSyncOutput syncOutput = new StandardSyncOutput().withState(state);
159-
Mockito.when(featureFlags.useStreamCapableState()).thenReturn(true);
160-
mockedStateMessageHelper.when(() -> StateMessageHelper.isMigration(Mockito.eq(StateType.GLOBAL), Mockito.any(StateType.class))).thenReturn(true);
172+
when(featureFlags.useStreamCapableState()).thenReturn(true);
173+
mockedStateMessageHelper.when(() -> StateMessageHelper.isMigration(Mockito.eq(StateType.GLOBAL), any(StateType.class))).thenReturn(true);
161174
persistStateActivity.persist(CONNECTION_ID, syncOutput, migrationConfiguredCatalog);
162175
final PersistStateActivityImpl persistStateSpy = spy(persistStateActivity);
163-
Mockito.verify(persistStateSpy, Mockito.times(0)).validateStreamStates(Mockito.any(), Mockito.any());
164-
Mockito.verify(stateApi).createOrUpdateState(Mockito.any(ConnectionStateCreateOrUpdate.class));
176+
Mockito.verify(persistStateSpy, Mockito.times(0)).validateStreamStates(any(), any());
177+
Mockito.verify(stateApi).createOrUpdateState(any(ConnectionStateCreateOrUpdate.class));
178+
}
179+
180+
@Test
181+
void testPersistWithPerStreamStateDuringMigrationFromEmptyLegacyState() throws ApiException {
182+
/*
183+
* This test covers a scenario where a reset is executed before any successful syncs for a
184+
* connection. When this occurs, an empty, legacy state is stored for the connection.
185+
*/
186+
final ConfiguredAirbyteStream stream =
187+
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_A).withNamespace(STREAM_A_NAMESPACE));
188+
final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_B));
189+
final ConfiguredAirbyteStream stream3 =
190+
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_C)).withSyncMode(SyncMode.FULL_REFRESH);
191+
192+
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
193+
.withType(AirbyteStateType.STREAM)
194+
.withStream(
195+
new AirbyteStreamState().withStreamDescriptor(CatalogHelpers.extractDescriptor(stream))
196+
.withStreamState(Jsons.emptyObject()));
197+
final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
198+
.withType(AirbyteStateType.STREAM)
199+
.withStream(
200+
new AirbyteStreamState().withStreamDescriptor(CatalogHelpers.extractDescriptor(stream2)));
201+
final JsonNode jsonState = Jsons.jsonNode(List.of(stateMessage1, stateMessage2));
202+
final State state = new State().withState(jsonState);
203+
204+
final AirbyteApiClient airbyteApiClient1 = mock(AirbyteApiClient.class);
205+
final StateApi stateApi1 = mock(StateApi.class);
206+
final ConnectionState connectionState = mock(ConnectionState.class);
207+
Mockito.lenient().when(connectionState.getStateType()).thenReturn(ConnectionStateType.LEGACY);
208+
Mockito.lenient().when(connectionState.getState()).thenReturn(Jsons.emptyObject());
209+
when(stateApi1.getState(any(ConnectionIdRequestBody.class))).thenReturn(connectionState);
210+
Mockito.lenient().when(airbyteApiClient1.getStateApi()).thenReturn(stateApi1);
211+
212+
final ConfiguredAirbyteCatalog migrationConfiguredCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(stream, stream2, stream3));
213+
final StandardSyncOutput syncOutput = new StandardSyncOutput().withState(state);
214+
when(featureFlags.useStreamCapableState()).thenReturn(true);
215+
216+
final PersistStateActivityImpl persistStateActivity1 = new PersistStateActivityImpl(airbyteApiClient1, featureFlags);
217+
218+
persistStateActivity1.persist(CONNECTION_ID, syncOutput, migrationConfiguredCatalog);
219+
220+
Mockito.verify(stateApi1).createOrUpdateState(any(ConnectionStateCreateOrUpdate.class));
221+
}
222+
223+
@Test
224+
void testPersistWithPerStreamStateDuringMigrationFromNullLegacyState() throws ApiException {
225+
final ConfiguredAirbyteStream stream =
226+
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_A).withNamespace(STREAM_A_NAMESPACE));
227+
final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_B));
228+
final ConfiguredAirbyteStream stream3 =
229+
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_C)).withSyncMode(SyncMode.FULL_REFRESH);
230+
231+
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
232+
.withType(AirbyteStateType.STREAM)
233+
.withStream(
234+
new AirbyteStreamState().withStreamDescriptor(CatalogHelpers.extractDescriptor(stream))
235+
.withStreamState(Jsons.emptyObject()));
236+
final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
237+
.withType(AirbyteStateType.STREAM)
238+
.withStream(
239+
new AirbyteStreamState().withStreamDescriptor(CatalogHelpers.extractDescriptor(stream2)));
240+
final JsonNode jsonState = Jsons.jsonNode(List.of(stateMessage1, stateMessage2));
241+
final State state = new State().withState(jsonState);
242+
243+
final AirbyteApiClient airbyteApiClient1 = mock(AirbyteApiClient.class);
244+
final StateApi stateApi1 = mock(StateApi.class);
245+
final ConnectionState connectionState = mock(ConnectionState.class);
246+
Mockito.lenient().when(connectionState.getStateType()).thenReturn(ConnectionStateType.LEGACY);
247+
Mockito.lenient().when(connectionState.getState()).thenReturn(null);
248+
when(stateApi1.getState(any(ConnectionIdRequestBody.class))).thenReturn(connectionState);
249+
Mockito.lenient().when(airbyteApiClient1.getStateApi()).thenReturn(stateApi1);
250+
251+
final ConfiguredAirbyteCatalog migrationConfiguredCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(stream, stream2, stream3));
252+
final StandardSyncOutput syncOutput = new StandardSyncOutput().withState(state);
253+
when(featureFlags.useStreamCapableState()).thenReturn(true);
254+
255+
final PersistStateActivityImpl persistStateActivity1 = new PersistStateActivityImpl(airbyteApiClient1, featureFlags);
256+
257+
persistStateActivity1.persist(CONNECTION_ID, syncOutput, migrationConfiguredCatalog);
258+
259+
Mockito.verify(stateApi1).createOrUpdateState(any(ConnectionStateCreateOrUpdate.class));
260+
}
261+
262+
@Test
263+
void testPersistWithPerStreamStateDuringMigrationWithNoPreviousState() throws ApiException {
264+
final ConfiguredAirbyteStream stream =
265+
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_A).withNamespace(STREAM_A_NAMESPACE));
266+
final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_B));
267+
final ConfiguredAirbyteStream stream3 =
268+
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(STREAM_C)).withSyncMode(SyncMode.FULL_REFRESH);
269+
270+
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
271+
.withType(AirbyteStateType.STREAM)
272+
.withStream(
273+
new AirbyteStreamState().withStreamDescriptor(CatalogHelpers.extractDescriptor(stream))
274+
.withStreamState(Jsons.emptyObject()));
275+
final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
276+
.withType(AirbyteStateType.STREAM)
277+
.withStream(
278+
new AirbyteStreamState().withStreamDescriptor(CatalogHelpers.extractDescriptor(stream2)));
279+
final JsonNode jsonState = Jsons.jsonNode(List.of(stateMessage1, stateMessage2));
280+
final State state = new State().withState(jsonState);
281+
282+
final AirbyteApiClient airbyteApiClient1 = mock(AirbyteApiClient.class);
283+
final StateApi stateApi1 = mock(StateApi.class);
284+
when(stateApi1.getState(any(ConnectionIdRequestBody.class))).thenReturn(null);
285+
Mockito.lenient().when(airbyteApiClient1.getStateApi()).thenReturn(stateApi1);
286+
287+
final ConfiguredAirbyteCatalog migrationConfiguredCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(stream, stream2, stream3));
288+
final StandardSyncOutput syncOutput = new StandardSyncOutput().withState(state);
289+
when(featureFlags.useStreamCapableState()).thenReturn(true);
290+
291+
final PersistStateActivityImpl persistStateActivity1 = new PersistStateActivityImpl(airbyteApiClient1, featureFlags);
292+
293+
persistStateActivity1.persist(CONNECTION_ID, syncOutput, migrationConfiguredCatalog);
294+
295+
Mockito.verify(stateApi1).createOrUpdateState(any(ConnectionStateCreateOrUpdate.class));
165296
}
166297

167298
}

0 commit comments

Comments
 (0)