Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-postgres/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.42.1'
cdkVersionRequired = '0.42.2'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = true
useLocalCdk = false
}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.6.1
dockerImageTag: 3.6.2
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.SyncMode;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -61,6 +58,42 @@ public static AirbyteStream overrideSyncModes(final AirbyteStream stream) {
return stream.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL));
}

public static List<String> getViewsInSchema(final JdbcDatabase database, final String schema) throws SQLException {
final String query = "SELECT viewname FROM pg_catalog.pg_views WHERE schemaname = ?";
final List<JsonNode> result = database.queryJsons(query, schema);
return result.stream()
.map(jsonNode -> jsonNode.get("viewname").asText())
.collect(Collectors.toList());
}

public static Map<String, List<String>> getViewsForAllSchemas(final JdbcDatabase database, final List<String> schemas) throws SQLException {
Map<String, List<String>> viewsBySchema = new HashMap<>();
for (String schema : schemas) {
List<String> views = getViewsInSchema(database, schema);
viewsBySchema.put(schema, views);
}
return viewsBySchema;
}

public static boolean isStreamAView(Map<String, List<String>> viewsBySchema, final AirbyteStream stream) {
return viewsBySchema.getOrDefault(stream.getNamespace(), Collections.emptyList()).contains(stream.getName());
}

/**
* This method is used for CDC sync in order to overwrite sync modes for cursor fields cause cdc use
* another cursor logic. Used in discovery when xmin is configured.
*
* @param stream - airbyte stream
* @param viewsBySchema - view names by schema
* @return will return list of sync modes where view streams will have only FULL_REFRESH sync mode
*/
public static AirbyteStream overrideSyncModes(final AirbyteStream stream, Map<String, List<String>> viewsBySchema) {
if (isStreamAView(viewsBySchema, stream)) {
return stream.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but I thought in an xmin sync, full refreshes were still incremental... and the therefore still dependent on a usable cursor?

Copy link
Contributor Author

@theyueli theyueli Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you mentioned is for the cursor-based sync mode;

maybe I'm wrong, but from what I tested, it does not require specifying a cursor in xmin (see picture below)
Screenshot 2024-07-18 at 7 30 15 PM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The full refreshes are still incremental (based on CTID), but I didn't think a cursor needed to be explicitly set? So I think fixing the sync modes might work

cc : @xiaohansong who might know this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - cursor is not needed for full refresh. We should support full refresh anyway, for views they would be non resumable.

}
return stream.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL));
}

/*
* Set all streams that do have incremental to sourceDefined, so that the user cannot set or
* override a cursor field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,16 +312,20 @@ public AirbyteCatalog discover(final JsonNode config) {

catalog.setStreams(streams);
} else if (isXmin(config)) {
// Xmin replication has a source-defined cursor (the xmin column). This is done to prevent the user
// from being able to pick their own cursor.
final List<AirbyteStream> streams = catalog.getStreams().stream()
.map(PostgresCatalogHelper::overrideSyncModes)
.map(PostgresCatalogHelper::setIncrementalToSourceDefined)
.collect(toList());

catalog.setStreams(streams);
try {
JdbcDatabase database = createDatabase(config);
Map<String, List<String>> viewsBySchema = PostgresCatalogHelper.getViewsForAllSchemas(database, schemas);
// Xmin replication has a source-defined cursor (the xmin column). This is done to prevent the user
// from being able to pick their own cursor.
final List<AirbyteStream> streams = catalog.getStreams().stream()
.map(stream -> PostgresCatalogHelper.overrideSyncModes(stream, viewsBySchema))
.map(PostgresCatalogHelper::setIncrementalToSourceDefined)
.collect(toList());
catalog.setStreams(streams);
} catch (SQLException e) {
LOGGER.error("Error checking if stream is a view", e);
}
}

return catalog;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,25 @@ void testDiscover() throws Exception {
});
}

@Test
void testDiscoverDisableIncrementalSyncForView() throws Exception {
testdb.query(ctx -> {
ctx.fetch("CREATE VIEW id_and_name_view AS SELECT * FROM id_and_name;");
return null;
});
final AirbyteCatalog actual = source().discover(getXminConfig());
actual.getStreams().forEach(actualStream -> {
if (actualStream.getName().equals("id_and_name_view")) {
assertTrue(!actualStream.getSupportedSyncModes().contains(SyncMode.INCREMENTAL));
assertTrue(actualStream.getSupportedSyncModes().contains(SyncMode.FULL_REFRESH));
}
});
testdb.query(ctx -> {
ctx.fetch("DROP VIEW id_and_name_view;");
return null;
});
}

@Test
void testReadSuccess() throws Exception {
// Perform an initial sync with the configured catalog, which is set up to use xmin_replication.
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.6.2 | 2024-07-18 | [42108](https://github.com/airbytehq/airbyte/pull/42108) | Disable incremental sync for view streams in xmin replication mode |
| 3.6.1 | 2024-07-05 | [40716](https://github.com/airbytehq/airbyte/pull/40716) | Fix typo in connector specification |
| 3.6.0 | 2024-07-17 | [40208](https://github.com/airbytehq/airbyte/pull/40208) | Start using the new error Postgres source error handler that comes with a new error translation layer. |
| 3.5.2 | 2024-07-17 | [42068](https://github.com/airbytehq/airbyte/pull/42068) | Add analytics for WASS case occurrence. |
Expand Down