- Notifications
You must be signed in to change notification settings - Fork 4.9k
[source-postgres] Disable incremental sync for view in Postgres xmin replication mode #42108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -18,10 +18,7 @@ | |
| import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; | ||
| import io.airbyte.protocol.models.v0.SyncMode; | ||
| import java.sql.SQLException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.*; | ||
| import java.util.stream.Collectors; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| | @@ -61,6 +58,42 @@ public static AirbyteStream overrideSyncModes(final AirbyteStream stream) { | |
| return stream.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)); | ||
| } | ||
| | ||
| public static List<String> getViewsInSchema(final JdbcDatabase database, final String schema) throws SQLException { | ||
| final String query = "SELECT viewname FROM pg_catalog.pg_views WHERE schemaname = ?"; | ||
| final List<JsonNode> result = database.queryJsons(query, schema); | ||
| return result.stream() | ||
| .map(jsonNode -> jsonNode.get("viewname").asText()) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| | ||
| public static Map<String, List<String>> getViewsForAllSchemas(final JdbcDatabase database, final List<String> schemas) throws SQLException { | ||
| Map<String, List<String>> viewsBySchema = new HashMap<>(); | ||
| for (String schema : schemas) { | ||
| List<String> views = getViewsInSchema(database, schema); | ||
| viewsBySchema.put(schema, views); | ||
| } | ||
| return viewsBySchema; | ||
| } | ||
| | ||
| public static boolean isStreamAView(Map<String, List<String>> viewsBySchema, final AirbyteStream stream) { | ||
| return viewsBySchema.getOrDefault(stream.getNamespace(), Collections.emptyList()).contains(stream.getName()); | ||
| } | ||
| | ||
| /** | ||
| * This method is used for CDC sync in order to overwrite sync modes for cursor fields cause cdc use | ||
| * another cursor logic. Used in discovery when xmin is configured. | ||
| * | ||
| * @param stream - airbyte stream | ||
| * @param viewsBySchema - view names by schema | ||
| * @return will return list of sync modes where view streams will have only FULL_REFRESH sync mode | ||
| */ | ||
| public static AirbyteStream overrideSyncModes(final AirbyteStream stream, Map<String, List<String>> viewsBySchema) { | ||
| if (isStreamAView(viewsBySchema, stream)) { | ||
| return stream.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH)); | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure, but I thought in an xmin sync, full refreshes were still incremental... and the therefore still dependent on a usable cursor? Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The full refreshes are still incremental (based on CTID), but I didn't think a cursor needed to be explicitly set? So I think fixing the sync modes might work cc : @xiaohansong who might know this Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes - cursor is not needed for full refresh. We should support full refresh anyway, for views they would be non resumable. | ||
| } | ||
| return stream.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)); | ||
| } | ||
| | ||
| /* | ||
| * Set all streams that do have incremental to sourceDefined, so that the user cannot set or | ||
| * override a cursor field. | ||
| | ||

Uh oh!
There was an error while loading. Please reload this page.