Skip to content

Commit a876e60

Browse files
✨29779 source postgres slow ctid read seen on customer connection (#30125)
Co-authored-by: subodh <subodh1810@gmail.com> Co-authored-by: subodh1810 <subodh1810@users.noreply.github.com> Co-authored-by: rodireich <rodireich@users.noreply.github.com>
1 parent 598afd6 commit a876e60

28 files changed

+392
-96
lines changed

airbyte-integrations/connectors/source-alloydb/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ ENV APPLICATION source-alloydb
2424

2525
COPY --from=build /airbyte /airbyte
2626

27-
LABEL io.airbyte.version=3.1.5
27+
LABEL io.airbyte.version=3.1.8
2828
LABEL io.airbyte.name=airbyte/source-alloydb

airbyte-integrations/connectors/source-alloydb/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ data:
66
connectorSubtype: database
77
connectorType: source
88
definitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
9-
dockerImageTag: 3.1.5
9+
dockerImageTag: 3.1.8
1010
dockerRepository: airbyte/source-alloydb
1111
githubIssueLabel: source-alloydb
1212
icon: alloydb.svg

airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ ENV APPLICATION source-postgres-strict-encrypt
2424

2525
COPY --from=build /airbyte /airbyte
2626

27-
LABEL io.airbyte.version=3.1.7
27+
LABEL io.airbyte.version=3.1.8
2828
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt

airbyte-integrations/connectors/source-postgres-strict-encrypt/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ data:
1212
connectorType: source
1313
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
1414
maxSecondsBetweenMessages: 7200
15-
dockerImageTag: 3.1.7
15+
dockerImageTag: 3.1.8
1616
dockerRepository: airbyte/source-postgres-strict-encrypt
1717
githubIssueLabel: source-postgres
1818
icon: postgresql.svg

airbyte-integrations/connectors/source-postgres/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ ENV APPLICATION source-postgres
2424

2525
COPY --from=build /airbyte /airbyte
2626

27-
LABEL io.airbyte.version=3.1.7
27+
LABEL io.airbyte.version=3.1.8
2828
LABEL io.airbyte.name=airbyte/source-postgres

airbyte-integrations/connectors/source-postgres/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ data:
66
connectorSubtype: database
77
connectorType: source
88
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
9-
dockerImageTag: 3.1.7
9+
dockerImageTag: 3.1.8
1010
maxSecondsBetweenMessages: 7200
1111
dockerRepository: airbyte/source-postgres
1212
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresQueryUtils.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,15 @@ SELECT pg_relation_filenode('%s')
106106
WITH block_sz AS (SELECT current_setting('block_size')::int), rel_sz AS (select pg_relation_size('%s')) SELECT * from block_sz, rel_sz
107107
""";
108108

109+
/**
110+
* Query estimates the max tuple in a page. We are estimating in two ways and selecting the greatest
111+
* value.
112+
*/
113+
public static final String CTID_ESTIMATE_MAX_TUPLE =
114+
"""
115+
SELECT COALESCE(MAX((ctid::text::point)[1]::int), 0) AS max_tuple FROM "%s"."%s"
116+
""";
117+
109118
/**
110119
* Logs the current xmin status : 1. The number of wraparounds the source DB has undergone. (These
111120
* are the epoch bits in the xmin snapshot). 2. The 32-bit xmin value associated with the xmin
@@ -246,7 +255,7 @@ public static ResultWithFailed<List<io.airbyte.protocol.models.v0.AirbyteStreamN
246255
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
247256
conn -> conn.prepareStatement(CTID_FULL_VACUUM_IN_PROGRESS_QUERY.formatted(fullTableName)).executeQuery(),
248257
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
249-
if (jsonNodes.size() != 0) {
258+
if (!jsonNodes.isEmpty()) {
250259
Preconditions.checkState(jsonNodes.size() == 1);
251260
LOGGER.warn("Full Vacuum currently in progress for table {} in {} phase, the table will be skipped from syncing data", fullTableName,
252261
jsonNodes.get(0).get("phase"));
@@ -311,4 +320,38 @@ public static List<ConfiguredAirbyteStream> filterStreamsUnderVacuumForCtidSync(
311320
.toList();
312321
}
313322

323+
public static Map<AirbyteStreamNameNamespacePair, Integer> getTableMaxTupleForStreams(final JdbcDatabase database,
324+
final List<ConfiguredAirbyteStream> streams,
325+
final String quoteString) {
326+
final Map<AirbyteStreamNameNamespacePair, Integer> tableMaxTupleEstimates = new HashMap<>();
327+
streams.forEach(stream -> {
328+
final AirbyteStreamNameNamespacePair namespacePair =
329+
new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace());
330+
final int maxTuple = getTableMaxTupleForStream(database, namespacePair, quoteString);
331+
tableMaxTupleEstimates.put(namespacePair, maxTuple);
332+
});
333+
return tableMaxTupleEstimates;
334+
}
335+
336+
public static int getTableMaxTupleForStream(final JdbcDatabase database,
337+
final AirbyteStreamNameNamespacePair stream,
338+
final String quoteString) {
339+
try {
340+
final String streamName = stream.getName();
341+
final String schemaName = stream.getNamespace();
342+
final String fullTableName =
343+
getFullyQualifiedTableNameWithQuoting(schemaName, streamName, quoteString);
344+
LOGGER.debug("running {}", CTID_ESTIMATE_MAX_TUPLE.formatted(schemaName, streamName));
345+
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
346+
conn -> conn.prepareStatement(CTID_ESTIMATE_MAX_TUPLE.formatted(schemaName, streamName)).executeQuery(),
347+
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
348+
Preconditions.checkState(jsonNodes.size() == 1);
349+
final int maxTuple = jsonNodes.get(0).get("max_tuple").asInt();
350+
LOGGER.info("Stream {} max tuple is {}", fullTableName, maxTuple);
351+
return maxTuple;
352+
} catch (final SQLException e) {
353+
throw new RuntimeException(e);
354+
}
355+
}
356+
314357
}

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import io.airbyte.integrations.source.postgres.ctid.CtidPerStreamStateManager;
7171
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations;
7272
import io.airbyte.integrations.source.postgres.ctid.CtidStateManager;
73+
import io.airbyte.integrations.source.postgres.ctid.CtidUtils;
7374
import io.airbyte.integrations.source.postgres.ctid.CtidUtils.StreamsCategorised;
7475
import io.airbyte.integrations.source.postgres.ctid.FileNodeHandler;
7576
import io.airbyte.integrations.source.postgres.ctid.PostgresCtidHandler;
@@ -492,6 +493,9 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
492493
finalListOfStreamsToBeSyncedViaCtid,
493494
getQuoteString());
494495

496+
final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, Integer> tablesMaxTuple =
497+
CtidUtils.isTidRangeScanCapableDBServer(database) ? null :
498+
PostgresQueryUtils.getTableMaxTupleForStreams(database, finalListOfStreamsToBeSyncedViaCtid, getQuoteString());
495499
if (!streamsCategorised.ctidStreams().streamsForCtidSync().isEmpty()) {
496500
LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size());
497501
LOGGER.info("Streams: {}", prettyPrintConfiguredAirbyteStreamList(finalListOfStreamsToBeSyncedViaCtid));
@@ -511,7 +515,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
511515

512516
final PostgresCtidHandler ctidHandler =
513517
new PostgresCtidHandler(sourceConfig, database, new CtidPostgresSourceOperations(Optional.empty()), getQuoteString(),
514-
fileNodeHandler, tableBlockSizes, ctidStateManager,
518+
fileNodeHandler, tableBlockSizes, tablesMaxTuple, ctidStateManager,
515519
namespacePair -> Jsons.jsonNode(xminStatus));
516520

517521
final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>(ctidHandler.getInitialSyncCtidIterator(
@@ -557,6 +561,11 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
557561
database,
558562
finalListOfStreamsToBeSyncedViaCtid,
559563
getQuoteString());
564+
565+
final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, Integer> tablesMaxTuple =
566+
CtidUtils.isTidRangeScanCapableDBServer(database) ? null :
567+
PostgresQueryUtils.getTableMaxTupleForStreams(database, finalListOfStreamsToBeSyncedViaCtid, getQuoteString());
568+
560569
if (finalListOfStreamsToBeSyncedViaCtid.isEmpty()) {
561570
LOGGER.info("No Streams will be synced via ctid.");
562571
} else {
@@ -581,6 +590,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
581590
getQuoteString(),
582591
fileNodeHandler,
583592
tableBlockSizes,
593+
tablesMaxTuple,
584594
ctidStateManager,
585595
namespacePair -> Jsons.jsonNode(cursorBasedStatusMap.get(namespacePair)));
586596

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations;
2727
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations.CdcMetadataInjector;
2828
import io.airbyte.integrations.source.postgres.ctid.CtidStateManager;
29+
import io.airbyte.integrations.source.postgres.ctid.CtidUtils;
2930
import io.airbyte.integrations.source.postgres.ctid.FileNodeHandler;
3031
import io.airbyte.integrations.source.postgres.ctid.PostgresCtidHandler;
3132
import io.airbyte.integrations.source.relationaldb.TableInfo;
@@ -140,11 +141,17 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
140141
database,
141142
finalListOfStreamsToBeSyncedViaCtid,
142143
quoteString);
144+
145+
final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, Integer> tablesMaxTuple =
146+
CtidUtils.isTidRangeScanCapableDBServer(database) ? null :
147+
PostgresQueryUtils.getTableMaxTupleForStreams(database, finalListOfStreamsToBeSyncedViaCtid, quoteString);
148+
143149
final PostgresCtidHandler ctidHandler = new PostgresCtidHandler(sourceConfig, database,
144150
ctidPostgresSourceOperations,
145151
quoteString,
146152
fileNodeHandler,
147153
tableBlockSizes,
154+
tablesMaxTuple,
148155
ctidStateManager,
149156
namespacePair -> Jsons.emptyObject());
150157

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/Ctid.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class Ctid {
1616

1717
final Long page;
1818
final Long tuple;
19+
public static final Ctid ZERO = Ctid.of(0, 0);
1920

2021
public static Ctid of(final long page, final long tuple) {
2122
return new Ctid(page, tuple);
@@ -71,4 +72,7 @@ public int hashCode() {
7172
return Objects.hash(page, tuple);
7273
}
7374

75+
public static Ctid inc(final Ctid ctid, final long maxTuple) {
76+
return (ctid.tuple + 1 > maxTuple) ? Ctid.of(ctid.page + 1, 1) : Ctid.of(ctid.page, ctid.tuple + 1);
77+
}
7478
}

0 commit comments

Comments
 (0)