Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ea02ea4
wip
subodh1810 Aug 21, 2023
b44af98
wip
subodh1810 Aug 22, 2023
5c9d11c
handle when no data available
subodh1810 Aug 22, 2023
0ff642d
update filenode every time
subodh1810 Aug 22, 2023
2493899
Merge branch 'master' into postgres-validate-file-node-each-chunk
subodh1810 Aug 22, 2023
3644565
cleanup
subodh1810 Aug 22, 2023
9328866
minor simplification
subodh1810 Aug 22, 2023
190b10b
check for null file node
subodh1810 Aug 23, 2023
896334a
rename iterators for clarity
subodh1810 Aug 23, 2023
fb9ca98
Merge branch 'master' into postgres-validate-file-node-each-chunk
subodh1810 Aug 24, 2023
dc6969b
Merge branch 'master' into postgres-validate-file-node-each-chunk
subodh1810 Aug 30, 2023
be6f71c
address review comments
subodh1810 Aug 30, 2023
c0dbc58
address review comments part 2
subodh1810 Aug 31, 2023
008e2be
Merge branch 'master' into postgres-validate-file-node-each-chunk
subodh1810 Aug 31, 2023
ae40ec1
Automated Commit - Formatting Changes
subodh1810 Aug 31, 2023
b4ca423
fix log message
subodh1810 Aug 31, 2023
7f3b586
Merge branch 'master' into postgres-validate-file-node-each-chunk
subodh1810 Sep 1, 2023
417a16f
address review comments
subodh1810 Sep 1, 2023
758c722
initial commit
rodireich Sep 4, 2023
cfb1c34
Automated Commit - Formatting Changes
rodireich Sep 4, 2023
a1d2801
initial commit
rodireich Sep 4, 2023
466d5da
Automated Commit - Formatting Changes
rodireich Sep 4, 2023
6e4af73
initial commit
rodireich Sep 5, 2023
d97e257
Automated Commit - Formatting Changes
rodireich Sep 5, 2023
3b316c4
Merge branch 'master' into 29779-source-postgres-slow-ctid-read-seen-…
rodireich Sep 11, 2023
261d1ac
initial commit
rodireich Sep 11, 2023
aabb0a1
adding testing
rodireich Sep 15, 2023
383a6a4
adding testing
rodireich Sep 15, 2023
3f4ba01
sanity
rodireich Sep 15, 2023
625b0cc
Merge branch 'master' into 29779-source-postgres-slow-ctid-read-seen-…
rodireich Sep 15, 2023
ed9918c
sanity
rodireich Sep 15, 2023
8f6dc13
sanity
rodireich Sep 15, 2023
fcb8ab8
sanity
rodireich Sep 15, 2023
3abf18b
Better error handling. per review request
rodireich Sep 19, 2023
ef89daf
Improve test
rodireich Sep 19, 2023
5570206
Avoid edge case of ctid query plan infinite loop
rodireich Sep 19, 2023
201421c
Merge branch 'master' into 29779-source-postgres-slow-ctid-read-seen-…
rodireich Sep 20, 2023
53f8727
reduce querying for version server
rodireich Sep 20, 2023
ffe61c9
Bump version. update changelog
rodireich Sep 20, 2023
f40e703
sanity
rodireich Sep 20, 2023
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-alloydb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-alloydb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=3.1.5
LABEL io.airbyte.version=3.1.8
LABEL io.airbyte.name=airbyte/source-alloydb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
dockerImageTag: 3.1.5
dockerImageTag: 3.1.8
dockerRepository: airbyte/source-alloydb
githubIssueLabel: source-alloydb
icon: alloydb.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=3.1.7
LABEL io.airbyte.version=3.1.8
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ data:
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
maxSecondsBetweenMessages: 7200
dockerImageTag: 3.1.7
dockerImageTag: 3.1.8
dockerRepository: airbyte/source-postgres-strict-encrypt
githubIssueLabel: source-postgres
icon: postgresql.svg
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=3.1.7
LABEL io.airbyte.version=3.1.8
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.1.7
dockerImageTag: 3.1.8
maxSecondsBetweenMessages: 7200
dockerRepository: airbyte/source-postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ SELECT pg_relation_filenode('%s')
WITH block_sz AS (SELECT current_setting('block_size')::int), rel_sz AS (select pg_relation_size('%s')) SELECT * from block_sz, rel_sz
""";

/**
* Query estimates the max tuple in a page. We are estimating in two ways and selecting the greatest
* value.
*/
public static final String CTID_ESTIMATE_MAX_TUPLE =
"""
SELECT COALESCE(MAX((ctid::text::point)[1]::int), 0) AS max_tuple FROM "%s"."%s"
""";

/**
* Logs the current xmin status : 1. The number of wraparounds the source DB has undergone. (These
* are the epoch bits in the xmin snapshot). 2. The 32-bit xmin value associated with the xmin
Expand Down Expand Up @@ -246,7 +255,7 @@ public static ResultWithFailed<List<io.airbyte.protocol.models.v0.AirbyteStreamN
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
conn -> conn.prepareStatement(CTID_FULL_VACUUM_IN_PROGRESS_QUERY.formatted(fullTableName)).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
if (jsonNodes.size() != 0) {
if (!jsonNodes.isEmpty()) {
Preconditions.checkState(jsonNodes.size() == 1);
LOGGER.warn("Full Vacuum currently in progress for table {} in {} phase, the table will be skipped from syncing data", fullTableName,
jsonNodes.get(0).get("phase"));
Expand Down Expand Up @@ -311,4 +320,38 @@ public static List<ConfiguredAirbyteStream> filterStreamsUnderVacuumForCtidSync(
.toList();
}

public static Map<AirbyteStreamNameNamespacePair, Integer> getTableMaxTupleForStreams(final JdbcDatabase database,
final List<ConfiguredAirbyteStream> streams,
final String quoteString) {
final Map<AirbyteStreamNameNamespacePair, Integer> tableMaxTupleEstimates = new HashMap<>();
streams.forEach(stream -> {
final AirbyteStreamNameNamespacePair namespacePair =
new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace());
final int maxTuple = getTableMaxTupleForStream(database, namespacePair, quoteString);
tableMaxTupleEstimates.put(namespacePair, maxTuple);
});
return tableMaxTupleEstimates;
}

public static int getTableMaxTupleForStream(final JdbcDatabase database,
final AirbyteStreamNameNamespacePair stream,
final String quoteString) {
try {
final String streamName = stream.getName();
final String schemaName = stream.getNamespace();
final String fullTableName =
getFullyQualifiedTableNameWithQuoting(schemaName, streamName, quoteString);
LOGGER.debug("running {}", CTID_ESTIMATE_MAX_TUPLE.formatted(schemaName, streamName));
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
conn -> conn.prepareStatement(CTID_ESTIMATE_MAX_TUPLE.formatted(schemaName, streamName)).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
final int maxTuple = jsonNodes.get(0).get("max_tuple").asInt();
LOGGER.info("Stream {} max tuple is {}", fullTableName, maxTuple);
return maxTuple;
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.airbyte.integrations.source.postgres.ctid.CtidPerStreamStateManager;
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations;
import io.airbyte.integrations.source.postgres.ctid.CtidStateManager;
import io.airbyte.integrations.source.postgres.ctid.CtidUtils;
import io.airbyte.integrations.source.postgres.ctid.CtidUtils.StreamsCategorised;
import io.airbyte.integrations.source.postgres.ctid.FileNodeHandler;
import io.airbyte.integrations.source.postgres.ctid.PostgresCtidHandler;
Expand Down Expand Up @@ -492,6 +493,9 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
finalListOfStreamsToBeSyncedViaCtid,
getQuoteString());

final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, Integer> tablesMaxTuple =
CtidUtils.isTidRangeScanCapableDBServer(database) ? null :
PostgresQueryUtils.getTableMaxTupleForStreams(database, finalListOfStreamsToBeSyncedViaCtid, getQuoteString());
if (!streamsCategorised.ctidStreams().streamsForCtidSync().isEmpty()) {
LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size());
LOGGER.info("Streams: {}", prettyPrintConfiguredAirbyteStreamList(finalListOfStreamsToBeSyncedViaCtid));
Expand All @@ -511,7 +515,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final

final PostgresCtidHandler ctidHandler =
new PostgresCtidHandler(sourceConfig, database, new CtidPostgresSourceOperations(Optional.empty()), getQuoteString(),
fileNodeHandler, tableBlockSizes, ctidStateManager,
fileNodeHandler, tableBlockSizes, tablesMaxTuple, ctidStateManager,
namespacePair -> Jsons.jsonNode(xminStatus));

final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>(ctidHandler.getInitialSyncCtidIterator(
Expand Down Expand Up @@ -557,6 +561,11 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
database,
finalListOfStreamsToBeSyncedViaCtid,
getQuoteString());

final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, Integer> tablesMaxTuple =
CtidUtils.isTidRangeScanCapableDBServer(database) ? null :
PostgresQueryUtils.getTableMaxTupleForStreams(database, finalListOfStreamsToBeSyncedViaCtid, getQuoteString());

if (finalListOfStreamsToBeSyncedViaCtid.isEmpty()) {
LOGGER.info("No Streams will be synced via ctid.");
} else {
Expand All @@ -581,6 +590,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
getQuoteString(),
fileNodeHandler,
tableBlockSizes,
tablesMaxTuple,
ctidStateManager,
namespacePair -> Jsons.jsonNode(cursorBasedStatusMap.get(namespacePair)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations;
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations.CdcMetadataInjector;
import io.airbyte.integrations.source.postgres.ctid.CtidStateManager;
import io.airbyte.integrations.source.postgres.ctid.CtidUtils;
import io.airbyte.integrations.source.postgres.ctid.FileNodeHandler;
import io.airbyte.integrations.source.postgres.ctid.PostgresCtidHandler;
import io.airbyte.integrations.source.relationaldb.TableInfo;
Expand Down Expand Up @@ -140,11 +141,17 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
database,
finalListOfStreamsToBeSyncedViaCtid,
quoteString);

final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, Integer> tablesMaxTuple =
CtidUtils.isTidRangeScanCapableDBServer(database) ? null :
PostgresQueryUtils.getTableMaxTupleForStreams(database, finalListOfStreamsToBeSyncedViaCtid, quoteString);

final PostgresCtidHandler ctidHandler = new PostgresCtidHandler(sourceConfig, database,
ctidPostgresSourceOperations,
quoteString,
fileNodeHandler,
tableBlockSizes,
tablesMaxTuple,
ctidStateManager,
namespacePair -> Jsons.emptyObject());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class Ctid {

final Long page;
final Long tuple;
public static final Ctid ZERO = Ctid.of(0, 0);

public static Ctid of(final long page, final long tuple) {
return new Ctid(page, tuple);
Expand Down Expand Up @@ -71,4 +72,7 @@ public int hashCode() {
return Objects.hash(page, tuple);
}

public static Ctid inc(final Ctid ctid, final long maxTuple) {
return (ctid.tuple + 1 > maxTuple) ? Ctid.of(ctid.page + 1, 1) : Ctid.of(ctid.page, ctid.tuple + 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@

import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CtidUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(CtidUtils.class);
public static final int POSTGRESQL_VERSION_TID_RANGE_SCAN_CAPABLE = 14;

public static List<ConfiguredAirbyteStream> identifyNewlyAddedStreams(final ConfiguredAirbyteCatalog fullCatalog,
final Set<AirbyteStreamNameNamespacePair> alreadySeenStreams,
final SyncMode syncMode) {
Expand Down Expand Up @@ -53,4 +60,19 @@ public record StreamsCategorised<T> (CtidStreams ctidStreams,

}

/**
* Postgres servers version 14 and above are capable of running a tid range scan.
* Used by ctid queries
* @param database database
* @return true for Tid scan capable server
*/
public static boolean isTidRangeScanCapableDBServer(final JdbcDatabase database) {
try {
return database.getMetaData().getDatabaseMajorVersion() >= POSTGRESQL_VERSION_TID_RANGE_SCAN_CAPABLE;
} catch (final Exception e) {
LOGGER.warn("Failed to get db server version", e);
}
return true;
}

}
Loading