Skip to content

Commit 42124f6

Browse files
authored
[source-mysql]: support Planetscale MySQL's per-query row limit (#40561)
Fixes airbytehq/oncall#5051 Planet MySQL has defined multiple system limits. One of our customers has been hitting the 100K per-query row limit, which causes our connector to emit system errors. This patch defines a chunk size limit for our connector. By default, it is the largest long, and if we find it is a Planetscale MySQL, we will adjust to 100K.
1 parent 48aa409 commit 42124f6

File tree

3 files changed

+15
-2
lines changed

3 files changed

+15
-2
lines changed

airbyte-integrations/connectors/source-mysql/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.4.10
12+
dockerImageTag: 3.4.11
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.mysql.cj.MysqlType;
1313
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
1414
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
15+
import io.airbyte.cdk.db.jdbc.JdbcUtils;
1516
import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants;
1617
import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil;
1718
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler;
@@ -57,6 +58,7 @@ public class MySqlInitialLoadHandler implements InitialLoadHandler<MysqlType> {
5758

5859
private static final long QUERY_TARGET_SIZE_GB = 1_073_741_824;
5960
private static final long DEFAULT_CHUNK_SIZE = 1_000_000;
61+
private long MAX_CHUNK_SIZE = Long.MAX_VALUE;
6062
final Map<AirbyteStreamNameNamespacePair, TableSizeInfo> tableSizeInfoMap;
6163

6264
public MySqlInitialLoadHandler(final JsonNode config,
@@ -73,6 +75,16 @@ public MySqlInitialLoadHandler(final JsonNode config,
7375
this.initialLoadStateManager = initialLoadStateManager;
7476
this.streamStateForIncrementalRunSupplier = streamStateForIncrementalRunSupplier;
7577
this.tableSizeInfoMap = tableSizeInfoMap;
78+
adjustChunkSizeLimitForMySQLVariants();
79+
}
80+
81+
private void adjustChunkSizeLimitForMySQLVariants() {
82+
// For PSDB, we need to limit the chunk size to 100k rows to avoid the query being killed by the
83+
// server.
84+
// Reference:
85+
// https://planetscale.com/docs/reference/planetscale-system-limits
86+
if (config.get(JdbcUtils.HOST_KEY).asText().toLowerCase().contains("psdb.cloud"))
87+
MAX_CHUNK_SIZE = 100_000;
7688
}
7789

7890
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
@@ -122,7 +134,7 @@ public AutoCloseableIterator<AirbyteMessage> getIteratorForStream(
122134
.collect(Collectors.toList());
123135
final AutoCloseableIterator<AirbyteRecordData> queryStream =
124136
new MySqlInitialLoadRecordIterator(database, sourceOperations, quoteString, initialLoadStateManager, selectedDatabaseFields, pair,
125-
calculateChunkSize(tableSizeInfoMap.get(pair), pair), isCompositePrimaryKey(airbyteStream));
137+
Long.min(calculateChunkSize(tableSizeInfoMap.get(pair), pair), MAX_CHUNK_SIZE), isCompositePrimaryKey(airbyteStream));
126138
final AutoCloseableIterator<AirbyteMessage> recordIterator =
127139
getRecordIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
128140
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream, pair);

docs/integrations/sources/mysql.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ Any database or table encoding combination of charset and collation is supported
233233

234234
| Version | Date | Pull Request | Subject |
235235
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
236+
| 3.4.11 | 2024-06-26 | [40561](https://github.com/airbytehq/airbyte/pull/40561) | Support PlanetScale MySQL's per-query row limit. |
236237
| 3.4.10 | 2024-06-14 | [39349](https://github.com/airbytehq/airbyte/pull/39349) | Full refresh stream sending internal count metadata. |
237238
| 3.4.9 | 2024-06-11 | [39405](https://github.com/airbytehq/airbyte/pull/39405) | Adopt latest CDK. |
238239
| 3.4.8 | 2024-06-05 | [39144](https://github.com/airbytehq/airbyte/pull/39144) | Upgrade Debezium to 2.5.4 |

0 commit comments

Comments
 (0)