Skip to content

Commit 29ea2e9

Browse files
Fix import order of archived data to accommodate foreign key constraints. (#14351)
We have to import actor catalogs first since there is a foreign key constraint from the connections table to the catalog table, so catalog rows need to exist before connections. Co-authored-by: Davin Chia <davinchia@gmail.com>
1 parent d268ab4 commit 29ea2e9

File tree

2 files changed

+65
-21
lines changed

2 files changed

+65
-21
lines changed

airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1434,29 +1434,13 @@ public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final
14341434
LOGGER.warn(ConfigSchema.STANDARD_SYNC_OPERATION + NOT_FOUND);
14351435
}
14361436

1437-
if (configs.containsKey(ConfigSchema.STANDARD_SYNC)) {
1438-
configs.get(ConfigSchema.STANDARD_SYNC).map(c -> (StandardSync) c).forEach(c -> writeStandardSync(Collections.singletonList(c), ctx));
1439-
originalConfigs.remove(ConfigSchema.STANDARD_SYNC);
1440-
} else {
1441-
LOGGER.warn(ConfigSchema.STANDARD_SYNC + NOT_FOUND);
1442-
}
1443-
1444-
if (configs.containsKey(ConfigSchema.STANDARD_SYNC_STATE)) {
1445-
configs.get(ConfigSchema.STANDARD_SYNC_STATE).map(c -> (StandardSyncState) c)
1446-
.forEach(c -> writeStandardSyncState(Collections.singletonList(c), ctx));
1447-
originalConfigs.remove(ConfigSchema.STANDARD_SYNC_STATE);
1448-
} else {
1449-
LOGGER.warn(ConfigSchema.STANDARD_SYNC_STATE + NOT_FOUND);
1450-
}
1451-
14521437
if (configs.containsKey(ConfigSchema.ACTOR_CATALOG)) {
14531438
configs.get(ConfigSchema.ACTOR_CATALOG).map(c -> (ActorCatalog) c)
14541439
.forEach(c -> writeActorCatalog(Collections.singletonList(c), ctx));
14551440
originalConfigs.remove(ConfigSchema.ACTOR_CATALOG);
14561441
} else {
14571442
LOGGER.warn(ConfigSchema.ACTOR_CATALOG + NOT_FOUND);
14581443
}
1459-
14601444
if (configs.containsKey(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT)) {
14611445
configs.get(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT).map(c -> (ActorCatalogFetchEvent) c)
14621446
.forEach(c -> writeActorCatalogFetchEvent(Collections.singletonList(c), ctx));
@@ -1465,6 +1449,24 @@ public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final
14651449
LOGGER.warn(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT + NOT_FOUND);
14661450
}
14671451

1452+
// Syncs need to be imported after Actor Catalogs as they have a foreign key association with Actor
1453+
// Catalogs.
1454+
// e.g. They reference catalogs and thus catalogs need to exist before or the insert will fail.
1455+
if (configs.containsKey(ConfigSchema.STANDARD_SYNC)) {
1456+
configs.get(ConfigSchema.STANDARD_SYNC).map(c -> (StandardSync) c).forEach(c -> writeStandardSync(Collections.singletonList(c), ctx));
1457+
originalConfigs.remove(ConfigSchema.STANDARD_SYNC);
1458+
} else {
1459+
LOGGER.warn(ConfigSchema.STANDARD_SYNC + NOT_FOUND);
1460+
}
1461+
1462+
if (configs.containsKey(ConfigSchema.STANDARD_SYNC_STATE)) {
1463+
configs.get(ConfigSchema.STANDARD_SYNC_STATE).map(c -> (StandardSyncState) c)
1464+
.forEach(c -> writeStandardSyncState(Collections.singletonList(c), ctx));
1465+
originalConfigs.remove(ConfigSchema.STANDARD_SYNC_STATE);
1466+
} else {
1467+
LOGGER.warn(ConfigSchema.STANDARD_SYNC_STATE + NOT_FOUND);
1468+
}
1469+
14681470
if (!originalConfigs.isEmpty()) {
14691471
originalConfigs.forEach(c -> LOGGER.warn("Unknown Config " + c + " ignored"));
14701472
}

airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@
2020
import io.airbyte.commons.json.Jsons;
2121
import io.airbyte.commons.string.Strings;
2222
import io.airbyte.commons.version.AirbyteVersion;
23+
import io.airbyte.config.ActorCatalog;
2324
import io.airbyte.config.ConfigSchema;
2425
import io.airbyte.config.DestinationConnection;
2526
import io.airbyte.config.Notification;
2627
import io.airbyte.config.Notification.NotificationType;
2728
import io.airbyte.config.SlackNotificationConfiguration;
2829
import io.airbyte.config.SourceConnection;
30+
import io.airbyte.config.StandardDestinationDefinition;
2931
import io.airbyte.config.StandardSourceDefinition;
3032
import io.airbyte.config.StandardSourceDefinition.SourceType;
33+
import io.airbyte.config.StandardSync;
3134
import io.airbyte.config.StandardWorkspace;
3235
import io.airbyte.config.init.YamlSeedConfigPersistence;
3336
import io.airbyte.config.persistence.ConfigPersistence;
@@ -41,6 +44,7 @@
4144
import io.airbyte.db.factory.DSLContextFactory;
4245
import io.airbyte.db.factory.DataSourceFactory;
4346
import io.airbyte.db.instance.test.TestDatabaseProviders;
47+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
4448
import io.airbyte.protocol.models.ConnectorSpecification;
4549
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
4650
import io.airbyte.scheduler.persistence.JobPersistence;
@@ -52,6 +56,7 @@
5256
import java.nio.file.Files;
5357
import java.nio.file.Path;
5458
import java.util.Collections;
59+
import java.util.List;
5560
import java.util.Map;
5661
import java.util.Optional;
5762
import java.util.Set;
@@ -193,7 +198,7 @@ void testFullExportImportRoundTrip() throws Exception {
193198
.withSendOnFailure(true)
194199
.withSendOnSuccess(true)
195200
.withSlackConfiguration(new SlackNotificationConfiguration().withWebhook("webhook-url"));
196-
final StandardWorkspace standardWorkspace = new StandardWorkspace()
201+
final StandardWorkspace workspace = new StandardWorkspace()
197202
.withWorkspaceId(UUID.randomUUID())
198203
.withCustomerId(UUID.randomUUID())
199204
.withName("test-workspace")
@@ -208,18 +213,55 @@ void testFullExportImportRoundTrip() throws Exception {
208213
.withNotifications(Collections.singletonList(notification))
209214
.withFirstCompletedSync(true)
210215
.withFeedbackDone(true);
211-
final SourceConnection sourceConnection = new SourceConnection()
216+
final SourceConnection source = new SourceConnection()
212217
.withSourceDefinitionId(sourceS3DefinitionId)
213218
.withSourceId(UUID.randomUUID())
214-
.withWorkspaceId(standardWorkspace.getWorkspaceId())
219+
.withWorkspaceId(workspace.getWorkspaceId())
215220
.withName("Test source")
216221
.withConfiguration(Jsons.deserialize("{}"))
217222
.withTombstone(false);
218223

224+
final StandardDestinationDefinition DESTINATION_S3 = new StandardDestinationDefinition()
225+
.withName("S3")
226+
.withDestinationDefinitionId(UUID.fromString("4816b78f-1489-44c1-9060-4b19d5fa9362"))
227+
.withDockerRepository("airbyte/destination-s3")
228+
.withDockerImageTag("0.1.12")
229+
.withSpec(sourceS3Definition.getSpec())
230+
.withDocumentationUrl("https://docs.airbyte.io/integrations/destinations/s3")
231+
.withTombstone(false);
232+
233+
final DestinationConnection destination = new DestinationConnection()
234+
.withName("Destination")
235+
.withDestinationId(UUID.randomUUID())
236+
.withDestinationDefinitionId(DESTINATION_S3.getDestinationDefinitionId())
237+
.withConfiguration(Jsons.deserialize("{}"))
238+
.withWorkspaceId(workspace.getWorkspaceId());
239+
240+
final ActorCatalog actorCatalog = new ActorCatalog()
241+
.withId(UUID.randomUUID())
242+
.withCatalog(Jsons.deserialize("{}"))
243+
.withCatalogHash("");
244+
245+
final StandardSync sync = new StandardSync()
246+
.withName("Connection")
247+
.withConnectionId(UUID.randomUUID())
248+
.withSourceId(source.getSourceId())
249+
.withDestinationId(destination.getDestinationId())
250+
.withCatalog(new ConfiguredAirbyteCatalog().withStreams(List.of()))
251+
.withSourceCatalogId(actorCatalog.getId())
252+
.withManual(true);
253+
219254
// Write source connection and an old source definition.
220-
configPersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, standardWorkspace.getWorkspaceId().toString(), standardWorkspace);
221-
configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, sourceConnection.getSourceId().toString(), sourceConnection);
255+
configPersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspace.getWorkspaceId().toString(), workspace);
222256
configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceS3DefinitionId.toString(), sourceS3Definition);
257+
configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source);
258+
259+
configPersistence.writeConfig(ConfigSchema.ACTOR_CATALOG, actorCatalog.getId().toString(), actorCatalog);
260+
configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, DESTINATION_S3.getDestinationDefinitionId().toString(),
261+
DESTINATION_S3);
262+
configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destination.getDestinationId().toString(), destination);
263+
264+
configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, sync.getConnectionId().toString(), sync);
223265

224266
// Export, wipe, and import the configs.
225267
archive = archiveHandler.exportData();

0 commit comments

Comments
 (0)