Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ public class DefaultJobPersistence implements JobPersistence {
private final int JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS;

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultJobPersistence.class);
private static final Set<String> SYSTEM_SCHEMA = Set
.of("pg_toast", "information_schema", "pg_catalog", "import_backup", "pg_internal",
"catalog_history");
public static final String ATTEMPT_NUMBER = "attempt_number";
private static final String JOB_ID = "job_id";
private static final String WHERE = "WHERE ";
Expand Down Expand Up @@ -793,18 +790,6 @@ public void setSecretMigrationDone() throws IOException {
setMetadata(SECRET_MIGRATION_STATUS, "true");
}

private final String SCHEDULER_MIGRATION_STATUS = "schedulerMigration";

@Override
public boolean isSchedulerMigrated() throws IOException {
return getMetadata(SCHEDULER_MIGRATION_STATUS).count() == 1;
}

@Override
public void setSchedulerMigrationDone() throws IOException {
setMetadata(SCHEDULER_MIGRATION_STATUS, "true");
}

@Override
public Optional<String> getVersion() throws IOException {
return getMetadata(AirbyteVersion.AIRBYTE_VERSION_KEY_NAME).findFirst();
Expand Down Expand Up @@ -911,27 +896,6 @@ public Map<JobsDatabaseSchema, Stream<JsonNode>> exportDatabase() throws IOExcep
return exportDatabase(DEFAULT_SCHEMA);
}

/**
* This is different from {@link #exportDatabase()} cause it exports all the tables in all the
* schemas available
*/
@Override
public Map<String, Stream<JsonNode>> dump() throws IOException {
final Map<String, Stream<JsonNode>> result = new HashMap<>();
for (final String schema : listSchemas()) {
final List<String> tables = listAllTables(schema);

for (final String table : tables) {
if (result.containsKey(table)) {
throw new RuntimeException("Multiple tables found with the same name " + table);
}
result.put(table.toUpperCase(), exportTable(schema, table));
}
}

return result;
}

private Map<JobsDatabaseSchema, Stream<JsonNode>> exportDatabase(final String schema) throws IOException {
final List<String> tables = listTables(schema);
final Map<JobsDatabaseSchema, Stream<JsonNode>> result = new HashMap<>();
Expand Down Expand Up @@ -978,25 +942,6 @@ public void purgeJobHistory(final LocalDateTime asOfDate) {
}
}

private List<String> listAllTables(final String schema) throws IOException {
if (schema != null) {
return jobDatabase.query(context -> context.meta().getSchemas(schema).stream()
.flatMap(s -> context.meta(s).getTables().stream())
.map(Named::getName)
.collect(Collectors.toList()));
} else {
return List.of();
}
}

private List<String> listSchemas() throws IOException {
return jobDatabase.query(context -> context.meta().getSchemas().stream()
.map(Named::getName)
.filter(c -> !SYSTEM_SCHEMA.contains(c))
.collect(Collectors.toList()));

}

private Stream<JsonNode> exportTable(final String schema, final String tableName) throws IOException {
final Table<Record> tableSql = getTable(schema, tableName);
try (final Stream<Record> records = jobDatabase.query(ctx -> ctx.select(DSL.asterisk()).from(tableSql).fetchStream())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,6 @@ List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID con
*/
Map<JobsDatabaseSchema, Stream<JsonNode>> exportDatabase() throws IOException;

Map<String, Stream<JsonNode>> dump() throws IOException;

/**
* Import all SQL tables from streams of JsonNode objects.
*
Expand All @@ -306,22 +304,6 @@ List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID con
*/
void setSecretMigrationDone() throws IOException;

/**
* Check if the scheduler has been migrated to temporal.
*
* TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next
* "major" version bump as it will no longer be needed.
*/
boolean isSchedulerMigrated() throws IOException;

/**
* Set that the scheduler migration has been performed.
*
* TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next
* "major" version bump as it will no longer be needed.
*/
void setSchedulerMigrationDone() throws IOException;

List<AttemptNormalizationStatus> getAttemptNormalizationStatusesForJob(final Long jobId) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -565,15 +565,6 @@ void testSecretMigrationMetadata() throws IOException {
assertTrue(isMigrated);
}

@Test
void testSchedulerMigrationMetadata() throws IOException {
boolean isMigrated = jobPersistence.isSchedulerMigrated();
assertFalse(isMigrated);
jobPersistence.setSchedulerMigrationDone();
isMigrated = jobPersistence.isSchedulerMigrated();
assertTrue(isMigrated);
}

@Test
void testAirbyteProtocolVersionMaxMetadata() throws IOException {
assertTrue(jobPersistence.getAirbyteProtocolVersionMax().isEmpty());
Expand Down
38 changes: 0 additions & 38 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.server;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.analytics.Deployment;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.analytics.TrackingClientSingleton;
Expand All @@ -18,10 +17,7 @@
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
Expand Down Expand Up @@ -70,16 +66,12 @@
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.TemporalEventRunner;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.net.http.HttpClient;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
Expand Down Expand Up @@ -256,14 +248,6 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
final HttpClient httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build();
final EventRunner eventRunner = new TemporalEventRunner(temporalClient);

// It is important that the migration to the temporal scheduler is performed before the server
// accepts any requests.
// This is why this migration is performed here instead of in the bootloader - so that the server
// blocks on this.
// TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next
// "major" version bump as it will no longer be needed.
migrateExistingConnectionsToTemporalScheduler(configRepository, jobPersistence, eventRunner);

final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);

final JsonSchemaValidator schemaValidator = new JsonSchemaValidator();
Expand Down Expand Up @@ -370,28 +354,6 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
workspacesHandler);
}

@VisibleForTesting
static void migrateExistingConnectionsToTemporalScheduler(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final EventRunner eventRunner)
throws JsonValidationException, ConfigNotFoundException, IOException {
// Skip the migration if it was already performed, to save on resources/startup time
if (jobPersistence.isSchedulerMigrated()) {
LOGGER.info("Migration to temporal scheduler has already been performed");
return;
}

LOGGER.info("Start migration to the new scheduler...");
final Set<UUID> connectionIds =
configRepository.listStandardSyncs().stream()
.filter(standardSync -> standardSync.getStatus() == Status.ACTIVE || standardSync.getStatus() == Status.INACTIVE)
.map(StandardSync::getConnectionId)
.collect(Collectors.toSet());
eventRunner.migrateSyncIfNeeded(connectionIds);
jobPersistence.setSchedulerMigrationDone();
LOGGER.info("Done migrating to the new scheduler...");
}

public static void main(final String[] args) {
try {
final Configs configs = new EnvConfigs();
Expand Down
65 changes: 0 additions & 65 deletions airbyte-server/src/test/java/io/airbyte/server/ServerAppTest.java

This file was deleted.