Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1975,27 +1975,6 @@ paths:
$ref: "#/components/schemas/WebBackendConnectionRead"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/web_backend/connections/updateNew:
post:
operationId: webBackendUpdateConnectionNew
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/WebBackendConnectionUpdate"
required: true
responses:
"200":
content:
application/json:
schema:
$ref: "#/components/schemas/WebBackendConnectionRead"
description: Successful operation
"422":
$ref: "#/components/responses/InvalidInputResponse"
summary: Update a connection
tags:
- web_backend
/v1/web_backend/connections/search:
post:
tags:
Expand Down Expand Up @@ -3355,8 +3334,6 @@ components:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
$ref: "#/components/schemas/ResourceRequirements"
withRefreshedCatalog:
type: boolean
skipReset:
type: boolean
operations:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,11 +822,6 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnection(webBackendConnectionUpdate));
}

@Override
public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendConnectionUpdate webBackendConnectionUpdate) {
return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnectionNew(webBackendConnectionUpdate));
}

@Override
public ConnectionStateType getStateType(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> webBackendConnectionsHandler.getStateType(connectionIdRequestBody));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,30 +362,6 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
throws ConfigNotFoundException, IOException, JsonValidationException {
final List<UUID> operationIds = updateOperations(webBackendConnectionUpdate);
final ConnectionUpdate connectionUpdate = toConnectionUpdate(webBackendConnectionUpdate, operationIds);

ConnectionRead connectionRead;
final boolean needReset = MoreBooleans.isTruthy(webBackendConnectionUpdate.getWithRefreshedCatalog());

connectionRead = connectionsHandler.updateConnection(connectionUpdate);

if (needReset) {
ManualOperationResult manualOperationResult = eventRunner.synchronousResetConnection(
webBackendConnectionUpdate.getConnectionId(),
// TODO (https://github.com/airbytehq/airbyte/issues/12741): change this to only get new/updated
// streams, instead of all
configRepository.getAllStreamsForConnection(webBackendConnectionUpdate.getConnectionId()));
verifyManualOperationResult(manualOperationResult);
manualOperationResult = eventRunner.startNewManualSync(webBackendConnectionUpdate.getConnectionId());
verifyManualOperationResult(manualOperationResult);
connectionRead = connectionsHandler.getConnection(connectionUpdate.getConnectionId());
}
return buildWebBackendConnectionRead(connectionRead);
}

public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendConnectionUpdate webBackendConnectionUpdate)
throws ConfigNotFoundException, IOException, JsonValidationException {
final List<UUID> operationIds = updateOperations(webBackendConnectionUpdate);
final ConnectionUpdate connectionUpdate = toConnectionUpdate(webBackendConnectionUpdate, operationIds);
final UUID connectionId = webBackendConnectionUpdate.getConnectionId();
final ConfiguredAirbyteCatalog existingConfiguredCatalog =
configRepository.getConfiguredCatalogForConnection(connectionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ void testToConnectionCreate() throws IOException {
assertEquals(expected, actual);
}

// TODO: remove withRefreshedCatalog param from this test when param is removed from code
@Test
void testToConnectionUpdate() throws IOException {
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
Expand Down Expand Up @@ -535,6 +534,14 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept
.syncCatalog(expected.getSyncCatalog())
.sourceCatalogId(expected.getCatalogId());

when(configRepository.getConfiguredCatalogForConnection(expected.getConnectionId()))
.thenReturn(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog());

final CatalogDiff catalogDiff = new CatalogDiff().transforms(List.of());
when(connectionsHandler.getDiff(any(), any())).thenReturn(catalogDiff);
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(expected.getConnectionId());
when(stateHandler.getState(connectionIdRequestBody)).thenReturn(new ConnectionState().stateType(ConnectionStateType.LEGACY));

when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(
new ConnectionRead().connectionId(expected.getConnectionId()));
when(connectionsHandler.updateConnection(any())).thenReturn(
Expand Down Expand Up @@ -576,48 +583,6 @@ void testUpdateConnectionWithOperations() throws JsonValidationException, Config
.syncCatalog(expected.getSyncCatalog())
.operations(List.of(operationCreateOrUpdate));

when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(
new ConnectionRead()
.connectionId(expected.getConnectionId())
.operationIds(connectionRead.getOperationIds()));
when(connectionsHandler.updateConnection(any())).thenReturn(
new ConnectionRead()
.connectionId(expected.getConnectionId())
.sourceId(expected.getSourceId())
.destinationId(expected.getDestinationId())
.operationIds(connectionRead.getOperationIds())
.name(expected.getName())
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
.prefix(expected.getPrefix())
.syncCatalog(expected.getSyncCatalog())
.status(expected.getStatus())
.schedule(expected.getSchedule()));
when(operationsHandler.updateOperation(operationUpdate)).thenReturn(new OperationRead().operationId(operationUpdate.getOperationId()));
when(operationsHandler.listOperationsForConnection(any())).thenReturn(operationReadList);

final WebBackendConnectionRead actualConnectionRead = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(connectionRead.getOperationIds(), actualConnectionRead.getOperationIds());
verify(operationsHandler, times(1)).updateOperation(operationUpdate);
}

@Test
void testUpdateConnectionWithOperationsNew() throws JsonValidationException, ConfigNotFoundException, IOException {
final WebBackendOperationCreateOrUpdate operationCreateOrUpdate = new WebBackendOperationCreateOrUpdate()
.name("Test Operation")
.operationId(connectionRead.getOperationIds().get(0));
final OperationUpdate operationUpdate = WebBackendConnectionsHandler.toOperationUpdate(operationCreateOrUpdate);
final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate()
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
.prefix(expected.getPrefix())
.connectionId(expected.getConnectionId())
.schedule(expected.getSchedule())
.status(expected.getStatus())
.syncCatalog(expected.getSyncCatalog())
.operations(List.of(operationCreateOrUpdate));

when(configRepository.getConfiguredCatalogForConnection(expected.getConnectionId()))
.thenReturn(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog());

Expand Down Expand Up @@ -646,62 +611,12 @@ void testUpdateConnectionWithOperationsNew() throws JsonValidationException, Con
when(operationsHandler.updateOperation(operationUpdate)).thenReturn(new OperationRead().operationId(operationUpdate.getOperationId()));
when(operationsHandler.listOperationsForConnection(any())).thenReturn(operationReadList);

final WebBackendConnectionRead actualConnectionRead = wbHandler.webBackendUpdateConnectionNew(updateBody);
final WebBackendConnectionRead actualConnectionRead = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(connectionRead.getOperationIds(), actualConnectionRead.getOperationIds());
verify(operationsHandler, times(1)).updateOperation(operationUpdate);
}

// TODO: remove in favor of test below when update endpoint is switched to new endpoint
@Test
void testUpdateConnectionWithUpdatedSchema() throws JsonValidationException, ConfigNotFoundException, IOException {
final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate()
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
.prefix(expected.getPrefix())
.connectionId(expected.getConnectionId())
.schedule(expected.getSchedule())
.status(expected.getStatus())
.syncCatalog(expectedWithNewSchema.getSyncCatalog())
.withRefreshedCatalog(true);

when(operationsHandler.listOperationsForConnection(any())).thenReturn(operationReadList);
when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(
new ConnectionRead().connectionId(expected.getConnectionId()));
final ConnectionRead connectionRead = new ConnectionRead()
.connectionId(expected.getConnectionId())
.sourceId(expected.getSourceId())
.destinationId(expected.getDestinationId())
.name(expected.getName())
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
.prefix(expected.getPrefix())
.syncCatalog(expectedWithNewSchema.getSyncCatalog())
.status(expected.getStatus())
.schedule(expected.getSchedule());
when(connectionsHandler.updateConnection(any())).thenReturn(connectionRead);
when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(connectionRead);

final List<io.airbyte.protocol.models.StreamDescriptor> connectionStreams = List.of(ConnectionHelpers.STREAM_DESCRIPTOR);
when(configRepository.getAllStreamsForConnection(expected.getConnectionId())).thenReturn(connectionStreams);

final ManualOperationResult successfulResult = ManualOperationResult.builder().jobId(Optional.empty()).failingReason(Optional.empty()).build();
when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult);
when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog());

final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(result.getConnectionId());
verify(schedulerHandler, times(0)).resetConnection(connectionId);
verify(schedulerHandler, times(0)).syncConnection(connectionId);
verify(connectionsHandler, times(1)).updateConnection(any());
final InOrder orderVerifier = inOrder(eventRunner);
orderVerifier.verify(eventRunner, times(1)).synchronousResetConnection(connectionId.getConnectionId(), connectionStreams);
orderVerifier.verify(eventRunner, times(1)).startNewManualSync(connectionId.getConnectionId());
}

@Test
void testUpdateConnectionWithUpdatedSchemaLegacy() throws JsonValidationException, ConfigNotFoundException, IOException {
final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate()
Expand Down Expand Up @@ -750,7 +665,7 @@ void testUpdateConnectionWithUpdatedSchemaLegacy() throws JsonValidationExceptio
when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult);
when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnectionNew(updateBody);
final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog());

Expand Down Expand Up @@ -816,7 +731,7 @@ void testUpdateConnectionWithUpdatedSchemaPerStream() throws JsonValidationExcep
when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult);
when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnectionNew(updateBody);
final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog());

Expand Down Expand Up @@ -870,7 +785,7 @@ void testUpdateConnectionNoStreamsToReset() throws JsonValidationException, Conf
when(connectionsHandler.updateConnection(any())).thenReturn(connectionRead);
when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(connectionRead);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnectionNew(updateBody);
final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog());

Expand Down Expand Up @@ -915,7 +830,7 @@ void testUpdateConnectionWithSkipReset() throws JsonValidationException, ConfigN
.schedule(expected.getSchedule());
when(connectionsHandler.updateConnection(any())).thenReturn(connectionRead);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnectionNew(updateBody);
final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,6 @@ public WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection
.sourceCatalogId(connection.getSourceCatalogId())
.status(connection.getStatus())
.prefix(connection.getPrefix())
.withRefreshedCatalog(true)
.skipReset(false);
}

Expand Down
Loading