Skip to content

Commit 316a124

Browse files
authored
Pass job id from resetConnection to synchronousResetConnection to avoid race condition (#14434)
* pass job id from resetConnection to synchronousResetConnection to avoid race condition * return reset job ID in synchronous method
1 parent 29ea2e9 commit 316a124

File tree

2 files changed

+19
-31
lines changed

2 files changed

+19
-31
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,8 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List
386386
Optional.empty());
387387
}
388388

389+
Optional<Long> newJobId;
390+
389391
do {
390392
try {
391393
Thread.sleep(DELAY_BETWEEN_QUERY_MS);
@@ -394,23 +396,22 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List
394396
Optional.of("Didn't manage to reset a sync for: " + connectionId),
395397
Optional.empty());
396398
}
397-
} while (!newJobStarted(connectionId, oldJobId));
399+
newJobId = getNewJobId(connectionId, oldJobId);
400+
} while (newJobId.isEmpty());
398401

399402
log.info("end of reset submission");
400403

401-
final long jobId = ConnectionManagerUtils.getCurrentJobId(client, connectionId);
402-
403404
return new ManualOperationResult(
404405
Optional.empty(),
405-
Optional.of(jobId));
406+
newJobId);
406407
}
407408

408-
private boolean newJobStarted(final UUID connectionId, final long oldJobId) {
409+
private Optional<Long> getNewJobId(final UUID connectionId, final long oldJobId) {
409410
final long currentJobId = ConnectionManagerUtils.getCurrentJobId(client, connectionId);
410411
if (currentJobId == NON_RUNNING_JOB_ID || currentJobId == oldJobId) {
411-
return false;
412+
return Optional.empty();
412413
} else {
413-
return true;
414+
return Optional.of(currentJobId);
414415
}
415416
}
416417

@@ -426,16 +427,7 @@ public ManualOperationResult synchronousResetConnection(final UUID connectionId,
426427
return resetResult;
427428
}
428429

429-
try {
430-
ConnectionManagerUtils.getConnectionManagerWorkflow(client, connectionId);
431-
} catch (final Exception e) {
432-
log.error("Encountered exception retrieving workflow after reset.", e);
433-
return new ManualOperationResult(
434-
Optional.of(e.getMessage()),
435-
Optional.empty());
436-
}
437-
438-
final long oldJobId = ConnectionManagerUtils.getCurrentJobId(client, connectionId);
430+
final long resetJobId = resetResult.getJobId().get();
439431

440432
do {
441433
try {
@@ -445,15 +437,13 @@ public ManualOperationResult synchronousResetConnection(final UUID connectionId,
445437
Optional.of("Didn't manage to reset a sync for: " + connectionId),
446438
Optional.empty());
447439
}
448-
} while (ConnectionManagerUtils.getCurrentJobId(client, connectionId) == oldJobId);
440+
} while (ConnectionManagerUtils.getCurrentJobId(client, connectionId) == resetJobId);
449441

450442
log.info("End of reset");
451443

452-
final long jobId = ConnectionManagerUtils.getCurrentJobId(client, connectionId);
453-
454444
return new ManualOperationResult(
455445
Optional.empty(),
456-
Optional.of(jobId));
446+
Optional.of(resetJobId));
457447
}
458448

459449
private <T> T getWorkflowStub(final Class<T> workflowClass, final TemporalJobType jobType) {

airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -231,17 +231,15 @@ public void testSynchronousResetConnection() throws IOException {
231231
final WorkflowState mWorkflowState = mock(WorkflowState.class);
232232
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
233233
when(mWorkflowState.isDeleted()).thenReturn(false);
234-
final long jobId1 = 1L;
235-
final long jobId2 = 2L;
236-
final long jobId3 = 3L;
234+
final long resetJobId = 1L;
237235

238236
when(mConnectionManagerWorkflow.getJobInformation()).thenReturn(
239-
new JobInformation(jobId1, 0),
240-
new JobInformation(jobId2, 0),
241-
new JobInformation(jobId2, 0),
242-
new JobInformation(jobId2, 0),
243-
new JobInformation(jobId3, 0),
244-
new JobInformation(jobId3, 0));
237+
new JobInformation(NON_RUNNING_JOB_ID, 0),
238+
new JobInformation(NON_RUNNING_JOB_ID, 0),
239+
new JobInformation(resetJobId, 0),
240+
new JobInformation(resetJobId, 0),
241+
new JobInformation(NON_RUNNING_JOB_ID, 0),
242+
new JobInformation(NON_RUNNING_JOB_ID, 0));
245243

246244
doReturn(true).when(temporalClient).isWorkflowReachable(any(UUID.class));
247245

@@ -253,7 +251,7 @@ public void testSynchronousResetConnection() throws IOException {
253251
verify(streamResetPersistence).createStreamResets(CONNECTION_ID, streamsToReset);
254252
verify(mConnectionManagerWorkflow).resetConnection();
255253

256-
assertEquals(manualOperationResult.getJobId().get(), jobId3);
254+
assertEquals(manualOperationResult.getJobId().get(), resetJobId);
257255
}
258256

259257
}

0 commit comments

Comments
 (0)