Skip to content

Commit 5b0ed4f

Browse files
authored
Code cleanup in abstract classes (#18811)
* Refactor attempt 1 Increasing readability: - Performed audit of methods that are marked as public/protected but should be private - Performed audit of methods that are marked as public but should be protected - Removing @OverRide from methods that don't need them (are the very first) * Remove .sqlite files * Remove AbstractRelationDbSource class - Removing AbstractRelationalDbSource abstract class and moving contents of it to a Util class * Rename utility class * Add overrides back
1 parent d405bc9 commit 5b0ed4f

File tree

6 files changed

+111
-101
lines changed

6 files changed

+111
-101
lines changed

airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
package io.airbyte.integrations.source.bigquery;
66

7+
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList;
8+
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName;
9+
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;
10+
711
import com.fasterxml.jackson.databind.JsonNode;
812
import com.google.cloud.bigquery.QueryParameterValue;
913
import com.google.cloud.bigquery.StandardSQLTypeName;
@@ -18,8 +22,9 @@
1822
import io.airbyte.db.bigquery.BigQuerySourceOperations;
1923
import io.airbyte.integrations.base.IntegrationRunner;
2024
import io.airbyte.integrations.base.Source;
21-
import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource;
25+
import io.airbyte.integrations.source.relationaldb.AbstractDbSource;
2226
import io.airbyte.integrations.source.relationaldb.CursorInfo;
27+
import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils;
2328
import io.airbyte.integrations.source.relationaldb.TableInfo;
2429
import io.airbyte.protocol.models.CommonField;
2530
import io.airbyte.protocol.models.JsonSchemaType;
@@ -34,7 +39,7 @@
3439
import org.slf4j.Logger;
3540
import org.slf4j.LoggerFactory;
3641

37-
public class BigQuerySource extends AbstractRelationalDbSource<StandardSQLTypeName, BigQueryDatabase> implements Source {
42+
public class BigQuerySource extends AbstractDbSource<StandardSQLTypeName, BigQueryDatabase> implements Source {
3843

3944
private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySource.class);
4045
private static final String QUOTE = "`";
@@ -76,7 +81,7 @@ public List<CheckedConsumer<BigQueryDatabase, Exception>> getCheckOperations(fin
7681
checkList.add(database -> {
7782
if (isDatasetConfigured(database)) {
7883
database.query(String.format("select 1 from %s where 1=0",
79-
getFullTableName(getConfigDatasetId(database), "INFORMATION_SCHEMA.TABLES")));
84+
getFullTableName(getConfigDatasetId(database), "INFORMATION_SCHEMA.TABLES", getQuoteString())));
8085
LOGGER.info("The source passed the Dataset query test!");
8186
} else {
8287
LOGGER.info("The Dataset query test is skipped due to not configured datasetId!");
@@ -140,12 +145,23 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final BigQueryDatab
140145
final CursorInfo cursorInfo,
141146
final StandardSQLTypeName cursorFieldType) {
142147
return queryTableWithParams(database, String.format("SELECT %s FROM %s WHERE %s > ?",
143-
enquoteIdentifierList(columnNames),
144-
getFullTableName(schemaName, tableName),
148+
RelationalDbQueryUtils.enquoteIdentifierList(columnNames, getQuoteString()),
149+
getFullTableName(schemaName, tableName, getQuoteString()),
145150
cursorInfo.getCursorField()),
146151
sourceOperations.getQueryParameter(cursorFieldType, cursorInfo.getCursor()));
147152
}
148153

154+
@Override
155+
protected AutoCloseableIterator<JsonNode> queryTableFullRefresh(final BigQueryDatabase database,
156+
final List<String> columnNames,
157+
final String schemaName,
158+
final String tableName) {
159+
LOGGER.info("Queueing query for table: {}", tableName);
160+
return queryTable(database, String.format("SELECT %s FROM %s",
161+
enquoteIdentifierList(columnNames, getQuoteString()),
162+
getFullTableName(schemaName, tableName, getQuoteString())));
163+
}
164+
149165
@Override
150166
public boolean isCursorType(final StandardSQLTypeName standardSQLTypeName) {
151167
return true;

airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import static io.airbyte.db.jdbc.JdbcConstants.JDBC_COLUMN_TYPE_NAME;
2121
import static io.airbyte.db.jdbc.JdbcConstants.JDBC_IS_NULLABLE;
2222
import static io.airbyte.db.jdbc.JdbcUtils.EQUALS;
23+
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList;
24+
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName;
25+
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;
2326

2427
import com.fasterxml.jackson.databind.JsonNode;
2528
import com.google.common.collect.ImmutableList;
@@ -40,7 +43,7 @@
4043
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
4144
import io.airbyte.integrations.base.Source;
4245
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
43-
import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource;
46+
import io.airbyte.integrations.source.relationaldb.AbstractDbSource;
4447
import io.airbyte.integrations.source.relationaldb.CursorInfo;
4548
import io.airbyte.integrations.source.relationaldb.TableInfo;
4649
import io.airbyte.integrations.source.relationaldb.state.StateManager;
@@ -81,7 +84,7 @@
8184
* relational DB source which can be accessed via JDBC driver. If you are implementing a connector
8285
* for a relational DB which has a JDBC driver, make an effort to use this class.
8386
*/
84-
public abstract class AbstractJdbcSource<Datatype> extends AbstractRelationalDbSource<Datatype, JdbcDatabase> implements Source {
87+
public abstract class AbstractJdbcSource<Datatype> extends AbstractDbSource<Datatype, JdbcDatabase> implements Source {
8588

8689
public static final String SSL_MODE = "sslMode";
8790

@@ -136,12 +139,23 @@ public AbstractJdbcSource(final String driverClass,
136139
this.sourceOperations = sourceOperations;
137140
}
138141

142+
@Override
143+
protected AutoCloseableIterator<JsonNode> queryTableFullRefresh(final JdbcDatabase database,
144+
final List<String> columnNames,
145+
final String schemaName,
146+
final String tableName) {
147+
LOGGER.info("Queueing query for table: {}", tableName);
148+
return queryTable(database, String.format("SELECT %s FROM %s",
149+
enquoteIdentifierList(columnNames, getQuoteString()),
150+
getFullTableName(schemaName, tableName, getQuoteString())));
151+
}
152+
139153
/**
140154
* Configures a list of operations that can be used to check the connection to the source.
141155
*
142156
* @return list of consumers that run queries for the check command.
143157
*/
144-
public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config) throws Exception {
158+
protected List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config) throws Exception {
145159
return ImmutableList.of(database -> {
146160
LOGGER.info("Attempting to get metadata from the database to see if we can connect.");
147161
database.bufferedResultSetQuery(connection -> connection.getMetaData().getCatalogs(), sourceOperations::rowToJson);
@@ -258,7 +272,7 @@ private JsonNode getColumnMetadata(final ResultSet resultSet) throws SQLExceptio
258272
* @param field Essential column information returned from
259273
* {@link AbstractJdbcSource#getColumnMetadata}.
260274
*/
261-
public Datatype getFieldType(final JsonNode field) {
275+
private Datatype getFieldType(final JsonNode field) {
262276
return sourceOperations.getFieldType(field);
263277
}
264278

@@ -515,7 +529,7 @@ public void close() {
515529
* @param config configuration
516530
* @return map containing relevant parsed values including location of keystore or an empty map
517531
*/
518-
public Map<String, String> parseSSLConfig(final JsonNode config) {
532+
protected Map<String, String> parseSSLConfig(final JsonNode config) {
519533
LOGGER.debug("source config: {}", config);
520534

521535
final Map<String, String> additionalParameters = new HashMap<>();
@@ -572,7 +586,7 @@ public Map<String, String> parseSSLConfig(final JsonNode config) {
572586
* @param sslParams
573587
* @return SSL portion of JDBC question params or and empty string
574588
*/
575-
public String toJDBCQueryParams(final Map<String, String> sslParams) {
589+
protected String toJDBCQueryParams(final Map<String, String> sslParams) {
576590
return Objects.isNull(sslParams) ? ""
577591
: sslParams.entrySet()
578592
.stream()

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
88
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
99
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
10+
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList;
11+
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName;
12+
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getIdentifierWithQuoting;
13+
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;
1014
import static java.util.stream.Collectors.toList;
1115

1216
import com.fasterxml.jackson.databind.JsonNode;
@@ -80,7 +84,7 @@ public AutoCloseableIterator<JsonNode> queryTableFullRefresh(final JdbcDatabase
8084
LOGGER.info("Queueing query for table: {}", tableName);
8185

8286
final String newIdentifiers = getWrappedColumnNames(database, null, columnNames, schemaName, tableName);
83-
final String preparedSqlQuery = String.format("SELECT %s FROM %s", newIdentifiers, getFullTableName(schemaName, tableName));
87+
final String preparedSqlQuery = String.format("SELECT %s FROM %s", newIdentifiers, getFullTableName(schemaName, tableName, getQuoteString()));
8488

8589
LOGGER.info("Prepared SQL query for TableFullRefresh is: " + preparedSqlQuery);
8690
return queryTable(database, preparedSqlQuery);
@@ -107,8 +111,8 @@ protected String getWrappedColumnNames(final JdbcDatabase database,
107111
final SQLServerResultSetMetaData sqlServerResultSetMetaData = (SQLServerResultSetMetaData) database
108112
.queryMetadata(String
109113
.format("SELECT TOP 1 %s FROM %s", // only first row is enough to get field's type
110-
enquoteIdentifierList(columnNames),
111-
getFullTableName(schemaName, tableName)));
114+
enquoteIdentifierList(columnNames, getQuoteString()),
115+
getFullTableName(schemaName, tableName, getQuoteString())));
112116

113117
// metadata will be null if table doesn't contain records
114118
if (sqlServerResultSetMetaData != null) {
@@ -127,7 +131,7 @@ protected String getWrappedColumnNames(final JdbcDatabase database,
127131
.map(
128132
el -> hierarchyIdColumns.contains(el) ? String
129133
.format("%s.ToString() as %s%s%s", el, identifierQuoteString, el, identifierQuoteString)
130-
: getIdentifierWithQuoting(el))
134+
: getIdentifierWithQuoting(el, getQuoteString()))
131135
.toList());
132136
} catch (final SQLException e) {
133137
LOGGER.error("Failed to fetch metadata to prepare a proper request.", e);

airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
88

99
import com.fasterxml.jackson.databind.JsonNode;
10+
import com.google.common.annotations.VisibleForTesting;
1011
import com.google.common.base.Preconditions;
1112
import com.google.common.collect.Lists;
1213
import io.airbyte.commons.exceptions.ConnectionErrorException;
@@ -234,7 +235,7 @@ private void validateCursorFieldForIncrementalTables(
234235
}
235236
}
236237

237-
protected List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(
238+
private List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(
238239
final Database database) throws Exception {
239240
final Set<String> systemNameSpaces = getExcludedInternalNameSpaces();
240241
final List<TableInfo<CommonField<DataType>>> discoveredTables = discoverInternal(database);
@@ -244,7 +245,7 @@ protected List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(
244245
Collectors.toList()));
245246
}
246247

247-
protected List<AutoCloseableIterator<AirbyteMessage>> getFullRefreshIterators(
248+
private List<AutoCloseableIterator<AirbyteMessage>> getFullRefreshIterators(
248249
final Database database,
249250
final ConfiguredAirbyteCatalog catalog,
250251
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
@@ -287,7 +288,7 @@ protected List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
287288
* sync mode
288289
* @return List of AirbyteMessageIterators containing all iterators for a catalog
289290
*/
290-
protected List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(
291+
private List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(
291292
final Database database,
292293
final ConfiguredAirbyteCatalog catalog,
293294
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
@@ -331,7 +332,7 @@ protected List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(
331332
* @param emittedAt Time when data was emitted from the Source database
332333
* @return
333334
*/
334-
protected AutoCloseableIterator<AirbyteMessage> createReadIterator(final Database database,
335+
private AutoCloseableIterator<AirbyteMessage> createReadIterator(final Database database,
335336
final ConfiguredAirbyteStream airbyteStream,
336337
final TableInfo<CommonField<DataType>> table,
337338
final StateManager stateManager,
@@ -414,7 +415,7 @@ protected AutoCloseableIterator<AirbyteMessage> createReadIterator(final Databas
414415
* @param emittedAt Time when data was emitted from the Source database
415416
* @return AirbyteMessage Iterator that
416417
*/
417-
protected AutoCloseableIterator<AirbyteMessage> getIncrementalStream(final Database database,
418+
private AutoCloseableIterator<AirbyteMessage> getIncrementalStream(final Database database,
418419
final ConfiguredAirbyteStream airbyteStream,
419420
final List<String> selectedDatabaseFields,
420421
final TableInfo<CommonField<DataType>> table,
@@ -456,7 +457,7 @@ protected AutoCloseableIterator<AirbyteMessage> getIncrementalStream(final Datab
456457
* @param emittedAt Time when data was emitted from the Source database
457458
* @return AirbyteMessageIterator with all records for a database source
458459
*/
459-
protected AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Database database,
460+
private AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Database database,
460461
final String streamName,
461462
final String namespace,
462463
final List<String> selectedDatabaseFields,
@@ -468,11 +469,11 @@ protected AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Datab
468469
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
469470
}
470471

471-
protected String getFullyQualifiedTableName(final String nameSpace, final String tableName) {
472+
private String getFullyQualifiedTableName(final String nameSpace, final String tableName) {
472473
return nameSpace != null ? nameSpace + "." + tableName : tableName;
473474
}
474475

475-
public AutoCloseableIterator<AirbyteMessage> getMessageIterator(
476+
private AutoCloseableIterator<AirbyteMessage> getMessageIterator(
476477
final AutoCloseableIterator<JsonNode> recordIterator,
477478
final String streamName,
478479
final String namespace,
@@ -493,7 +494,7 @@ public AutoCloseableIterator<AirbyteMessage> getMessageIterator(
493494
* @return list of table/data structure info
494495
* @throws Exception might throw an error during connection to database
495496
*/
496-
protected List<TableInfo<Field>> getTables(final Database database) throws Exception {
497+
private List<TableInfo<Field>> getTables(final Database database) throws Exception {
497498
final List<TableInfo<CommonField<DataType>>> tableInfos = discoverWithoutSystemTables(database);
498499
final Map<String, List<String>> fullyQualifiedTableNameToPrimaryKeys = discoverPrimaryKeys(
499500
database, tableInfos);
@@ -522,7 +523,7 @@ protected List<TableInfo<Field>> getTables(final Database database) throws Excep
522523
.collect(Collectors.toList());
523524
}
524525

525-
protected Field toField(final CommonField<DataType> field) {
526+
private Field toField(final CommonField<DataType> field) {
526527
if (getType(field.getType()) == JsonSchemaType.OBJECT && field.getProperties() != null
527528
&& !field.getProperties().isEmpty()) {
528529
final var properties = field.getProperties().stream().map(this::toField).toList();
@@ -532,7 +533,7 @@ protected Field toField(final CommonField<DataType> field) {
532533
}
533534
}
534535

535-
protected void assertColumnsWithSameNameAreSame(final String nameSpace, final String tableName,
536+
private void assertColumnsWithSameNameAreSame(final String nameSpace, final String tableName,
536537
final List<CommonField<DataType>> columns) {
537538
columns.stream()
538539
.collect(Collectors.groupingBy(CommonField<DataType>::getName))
@@ -557,7 +558,7 @@ protected void assertColumnsWithSameNameAreSame(final String nameSpace, final St
557558
* for SELECT-ing the table with privileges. In some cases such SELECT doesn't require (e.g. in
558559
* Oracle DB - the schema is the user, you cannot REVOKE a privilege on a table from its owner).
559560
*/
560-
public <T> Set<T> getPrivilegesTableForCurrentUser(final JdbcDatabase database,
561+
protected <T> Set<T> getPrivilegesTableForCurrentUser(final JdbcDatabase database,
561562
final String schema) throws SQLException {
562563
return Collections.emptySet();
563564
}
@@ -585,7 +586,7 @@ public <T> Set<T> getPrivilegesTableForCurrentUser(final JdbcDatabase database,
585586
*
586587
* @return list of consumers that run queries for the check command.
587588
*/
588-
public abstract List<CheckedConsumer<Database, Exception>> getCheckOperations(JsonNode config)
589+
protected abstract List<CheckedConsumer<Database, Exception>> getCheckOperations(JsonNode config)
589590
throws Exception;
590591

591592
/**
@@ -601,7 +602,7 @@ public abstract List<CheckedConsumer<Database, Exception>> getCheckOperations(Js
601602
*
602603
* @return set of system namespaces(schemas) to be excluded
603604
*/
604-
public abstract Set<String> getExcludedInternalNameSpaces();
605+
protected abstract Set<String> getExcludedInternalNameSpaces();
605606

606607
/**
607608
* Discover all available tables in the source database.
@@ -653,7 +654,7 @@ protected abstract Map<String, List<String>> discoverPrimaryKeys(Database databa
653654
* @param tableName target table
654655
* @return iterator with read data
655656
*/
656-
public abstract AutoCloseableIterator<JsonNode> queryTableFullRefresh(final Database database,
657+
protected abstract AutoCloseableIterator<JsonNode> queryTableFullRefresh(final Database database,
657658
final List<String> columnNames,
658659
final String schemaName,
659660
final String tableName);
@@ -666,7 +667,7 @@ public abstract AutoCloseableIterator<JsonNode> queryTableFullRefresh(final Data
666667
*
667668
* @return iterator with read data
668669
*/
669-
public abstract AutoCloseableIterator<JsonNode> queryTableIncremental(Database database,
670+
protected abstract AutoCloseableIterator<JsonNode> queryTableIncremental(Database database,
670671
List<String> columnNames,
671672
String schemaName,
672673
String tableName,
@@ -685,7 +686,7 @@ protected int getStateEmissionFrequency() {
685686
/**
686687
* @return list of fields that could be used as cursors
687688
*/
688-
public abstract boolean isCursorType(DataType type);
689+
protected abstract boolean isCursorType(DataType type);
689690

690691
private Database createDatabaseInternal(final JsonNode sourceConfig) throws Exception {
691692
final Database database = createDatabase(sourceConfig);

0 commit comments

Comments
 (0)