Skip to content

Commit acb9ce1

Browse files
authored
[source-postgres] Fix duplicate streams in postgres (#40719)
1 parent 0ab6314 commit acb9ce1

File tree

9 files changed

+542
-529
lines changed

9 files changed

+542
-529
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

Lines changed: 256 additions & 255 deletions
Large diffs are not rendered by default.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.40.9
1+
version=0.40.10

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
6161
)
6262
.withSourceDefinedPrimaryKey(
6363
java.util.List.of(java.util.List.of(COL_ID)),
64-
),
64+
)
65+
.withIsResumable(true),
6566
),
6667
)
6768

@@ -688,7 +689,8 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
688689
.withSupportedSyncModes(
689690
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL),
690691
)
691-
.withSourceDefinedPrimaryKey(java.util.List.of(java.util.List.of(COL_ID))),
692+
.withSourceDefinedPrimaryKey(java.util.List.of(java.util.List.of(COL_ID)))
693+
.withIsResumable(true),
692694
)
693695
airbyteStream.syncMode = SyncMode.FULL_REFRESH
694696

@@ -770,6 +772,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
770772

771773
val recordMessages2 = extractRecordMessages(actualRecords2)
772774
val stateMessages2 = extractStateMessages(actualRecords2)
775+
stateMessages2.map { state -> assertStateDoNotHaveDuplicateStreams(state) }
773776

774777
assertExpectedStateMessagesFromIncrementalSync(stateMessages2)
775778
assertExpectedStateMessageCountMatches(stateMessages2, 1)
@@ -826,6 +829,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
826829

827830
val recordMessages2 = extractRecordMessages(actualRecords2)
828831
val stateMessages2 = extractStateMessages(actualRecords2)
832+
stateMessages2.map { state -> assertStateDoNotHaveDuplicateStreams(state) }
829833

830834
assertExpectedRecords(
831835
(MODEL_RECORDS_2 + listOf(puntoRecord)).toSet(),
@@ -905,7 +909,8 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
905909
)
906910
.withSupportedSyncModes(
907911
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL),
908-
),
912+
)
913+
.withIsResumable(false),
909914
)
910915
airbyteStream.syncMode = SyncMode.FULL_REFRESH
911916

@@ -932,6 +937,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
932937
stateMessages1,
933938
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong()
934939
)
940+
stateMessages1.map { state -> assertStateDoNotHaveDuplicateStreams(state) }
935941
assertExpectedRecords(
936942
(MODEL_RECORDS_2 + MODEL_RECORDS).toSet(),
937943
recordMessages1,
@@ -950,6 +956,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
950956

951957
val recordMessages2 = extractRecordMessages(actualRecords2)
952958
val stateMessages2 = extractStateMessages(actualRecords2)
959+
stateMessages2.map { state -> assertStateDoNotHaveDuplicateStreams(state) }
953960

954961
assertExpectedStateMessageCountMatches(stateMessages2, 1 + MODEL_RECORDS_2.size.toLong())
955962
assertExpectedRecords(
@@ -1370,7 +1377,8 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
13701377
.withSupportedSyncModes(
13711378
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL),
13721379
)
1373-
.withSourceDefinedPrimaryKey(java.util.List.of(java.util.List.of(COL_ID))),
1380+
.withSourceDefinedPrimaryKey(java.util.List.of(java.util.List.of(COL_ID)))
1381+
.withIsResumable(true),
13741382
)
13751383
airbyteStream.syncMode = SyncMode.FULL_REFRESH
13761384

airbyte-integrations/connectors/source-postgres/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ java {
1212
}
1313

1414
airbyteJavaConnector {
15-
cdkVersionRequired = '0.40.7'
15+
cdkVersionRequired = '0.40.10'
1616
features = ['db-sources', 'datastore-postgres']
1717
useLocalCdk = false
1818
}

airbyte-integrations/connectors/source-postgres/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.4.21
12+
dockerImageTag: 3.4.22
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidGlobalStateManager.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ private void initStream(final CtidStreams ctidStreams,
6868
}
6969

7070
if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
71-
if (fileNodeHandler.hasFileNode(
72-
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()))) {
71+
if (configuredAirbyteStream.getStream().getIsResumable()) {
7372
this.resumableFullRefreshStreams.add(pair);
7473
} else {
7574
this.nonResumableFullRefreshStreams.add(pair);
@@ -144,10 +143,14 @@ public CdcState getCdcState() {
144143

145144
}
146145

146+
private boolean isIncrementalStream(final AirbyteStreamNameNamespacePair pair) {
147+
return !resumableFullRefreshStreams.contains(pair) && !nonResumableFullRefreshStreams.contains(pair);
148+
}
149+
147150
@Override
148151
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) {
149152
// Only incremental streams can be transformed into the next phase.
150-
if (!resumableFullRefreshStreams.contains(pair)) {
153+
if (isIncrementalStream(pair)) {
151154
streamsThatHaveCompletedSnapshot.add(pair);
152155
}
153156
final List<AirbyteStreamState> streamStates = new ArrayList<>();

airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
8686
Field.of("name", JsonSchemaType.STRING))
8787
.withSourceDefinedCursor(true)
8888
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
89-
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
89+
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
90+
.withIsResumable(true)),
9091
new ConfiguredAirbyteStream()
9192
.withSyncMode(SyncMode.INCREMENTAL)
9293
.withDestinationSyncMode(DestinationSyncMode.APPEND)
@@ -97,7 +98,8 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
9798
Field.of("name", JsonSchemaType.STRING))
9899
.withSourceDefinedCursor(true)
99100
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
100-
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
101+
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
102+
.withIsResumable(true))));
101103
}
102104

103105
protected ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() {
@@ -112,7 +114,8 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() {
112114
/* no name field */)
113115
.withSourceDefinedCursor(true)
114116
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
115-
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
117+
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
118+
.withIsResumable(true)),
116119
new ConfiguredAirbyteStream()
117120
.withSyncMode(SyncMode.INCREMENTAL)
118121
.withDestinationSyncMode(DestinationSyncMode.APPEND)
@@ -123,7 +126,8 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalogWithPartialColumns() {
123126
Field.of("name", JsonSchemaType.STRING))
124127
.withSourceDefinedCursor(true)
125128
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
126-
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
129+
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
130+
.withIsResumable(true))));
127131
}
128132

129133
@Override

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,6 @@ protected void validateStreamStateInResumableFullRefresh(final JsonNode streamSt
266266
assertEquals("ctid", streamStateToBeTested.get("state_type").asText());
267267
}
268268

269-
@Override
270-
@Test
271-
protected void testCdcAndNonResumableFullRefreshInSameSync() throws Exception {}
272-
273269
@Override
274270
protected void assertStateMessagesForNewTableSnapshotTest(final List<? extends AirbyteStateMessage> stateMessages,
275271
final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion) {

0 commit comments

Comments
 (0)