1111
1212import com .fasterxml .jackson .databind .JsonNode ;
1313import datadog .trace .api .Trace ;
14- import io .airbyte .commons .temporal .TemporalJobType ;
1514import io .airbyte .commons .temporal .TemporalWorkflowUtils ;
1615import io .airbyte .commons .temporal .exception .RetryableException ;
1716import io .airbyte .commons .temporal .scheduling .ConnectionManagerWorkflow ;
2120import io .airbyte .commons .temporal .scheduling .state .WorkflowState ;
2221import io .airbyte .commons .temporal .scheduling .state .listener .NoopStateListener ;
2322import io .airbyte .config .ConnectorJobOutput ;
24- import io .airbyte .config .ConnectorJobOutput .OutputType ;
2523import io .airbyte .config .FailureReason ;
2624import io .airbyte .config .FailureReason .FailureType ;
2725import io .airbyte .config .NormalizationSummary ;
2826import io .airbyte .config .StandardCheckConnectionInput ;
29- import io .airbyte .config .StandardCheckConnectionOutput ;
3027import io .airbyte .config .StandardSyncInput ;
3128import io .airbyte .config .StandardSyncOutput ;
3229import io .airbyte .config .StandardSyncSummary ;
4845import io .airbyte .workers .temporal .scheduling .activities .ConfigFetchActivity .ScheduleRetrieverOutput ;
4946import io .airbyte .workers .temporal .scheduling .activities .GenerateInputActivity ;
5047import io .airbyte .workers .temporal .scheduling .activities .GenerateInputActivity .GeneratedJobInput ;
51- import io .airbyte .workers .temporal .scheduling .activities .GenerateInputActivity .SyncInput ;
5248import io .airbyte .workers .temporal .scheduling .activities .GenerateInputActivity .SyncInputWithAttemptNumber ;
5349import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity ;
5450import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .AttemptCreationInput ;
55- import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .AttemptCreationOutput ;
56- import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .AttemptFailureInput ;
5751import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .AttemptNumberCreationOutput ;
5852import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .AttemptNumberFailureInput ;
5953import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .EnsureCleanJobStateInput ;
60- import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .JobCancelledInput ;
6154import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .JobCancelledInputWithAttemptNumber ;
6255import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .JobCheckFailureInput ;
6356import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .JobCreationInput ;
6457import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .JobCreationOutput ;
6558import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .JobFailureInput ;
66- import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .JobSuccessInput ;
6759import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .JobSuccessInputWithAttemptNumber ;
6860import io .airbyte .workers .temporal .scheduling .activities .JobCreationAndStatusUpdateActivity .ReportJobStartInput ;
6961import io .airbyte .workers .temporal .scheduling .activities .RecordMetricActivity ;
9789@ SuppressWarnings ("PMD.AvoidDuplicateLiterals" )
9890public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow {
9991
100- private static final int TASK_QUEUE_CHANGE_CURRENT_VERSION = 1 ;
101- private static final int AUTO_DISABLE_FAILING_CONNECTION_CHANGE_CURRENT_VERSION = 1 ;
102-
103- private static final String RENAME_ATTEMPT_ID_TO_NUMBER_TAG = "rename_attempt_id_to_number" ;
104- private static final int RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION = 1 ;
105-
106- private static final String CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG = "check_previous_job_or_attempt" ;
107- private static final int CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG_CURRENT_VERSION = 1 ;
108-
109- private static final String ENSURE_CLEAN_JOB_STATE = "ensure_clean_job_state" ;
110- private static final int ENSURE_CLEAN_JOB_STATE_CURRENT_VERSION = 1 ;
111-
112- private static final String CHECK_BEFORE_SYNC_TAG = "check_before_sync" ;
113- private static final int CHECK_BEFORE_SYNC_CURRENT_VERSION = 1 ;
114-
115- private static final String CHECK_JOB_OUTPUT_TAG = "check_job_output" ;
116- private static final int CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION = 1 ;
117-
118- private static final String DELETE_RESET_JOB_STREAMS_TAG = "delete_reset_job_streams" ;
119- private static final int DELETE_RESET_JOB_STREAMS_CURRENT_VERSION = 1 ;
120- private static final String RECORD_METRIC_TAG = "record_metric" ;
121- private static final int RECORD_METRIC_CURRENT_VERSION = 1 ;
122- private static final String WORKFLOW_CONFIG_TAG = "workflow_config" ;
123- private static final int WORKFLOW_CONFIG_CURRENT_VERSION = 1 ;
124- private static final String ROUTE_ACTIVITY_TAG = "route_activity" ;
125- private static final int ROUTE_ACTIVITY_CURRENT_VERSION = 1 ;
126-
12792 private WorkflowState workflowState = new WorkflowState (UUID .randomUUID (), new NoopStateListener ());
12893
12994 private final WorkflowInternalState workflowInternalState = new WorkflowInternalState ();
@@ -299,22 +264,12 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn
299264
300265 private void reportSuccess (final ConnectionUpdaterInput connectionUpdaterInput , final StandardSyncOutput standardSyncOutput ) {
301266 workflowState .setSuccess (true );
302- final int attemptCreationVersion =
303- Workflow .getVersion (RENAME_ATTEMPT_ID_TO_NUMBER_TAG , Workflow .DEFAULT_VERSION , RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION );
304-
305- if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION ) {
306- runMandatoryActivity (jobCreationAndStatusUpdateActivity ::jobSuccess , new JobSuccessInput (
307- workflowInternalState .getJobId (),
308- workflowInternalState .getAttemptNumber (),
309- connectionUpdaterInput .getConnectionId (),
310- standardSyncOutput ));
311- } else {
312- runMandatoryActivity (jobCreationAndStatusUpdateActivity ::jobSuccessWithAttemptNumber , new JobSuccessInputWithAttemptNumber (
313- workflowInternalState .getJobId (),
314- workflowInternalState .getAttemptNumber (),
315- connectionUpdaterInput .getConnectionId (),
316- standardSyncOutput ));
317- }
267+
268+ runMandatoryActivity (jobCreationAndStatusUpdateActivity ::jobSuccessWithAttemptNumber , new JobSuccessInputWithAttemptNumber (
269+ workflowInternalState .getJobId (),
270+ workflowInternalState .getAttemptNumber (),
271+ connectionUpdaterInput .getConnectionId (),
272+ standardSyncOutput ));
318273
319274 deleteResetJobStreams ();
320275
@@ -334,25 +289,15 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
334289 final StandardSyncOutput standardSyncOutput ,
335290 final FailureCause failureCause ,
336291 final Set <FailureReason > failureReasonsOverride ) {
337- final int attemptCreationVersion =
338- Workflow .getVersion (RENAME_ATTEMPT_ID_TO_NUMBER_TAG , Workflow .DEFAULT_VERSION , RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION );
339292
340293 final Set <FailureReason > failureReasons = failureReasonsOverride .isEmpty () ? workflowInternalState .getFailures () : failureReasonsOverride ;
341- if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION ) {
342- runMandatoryActivity (jobCreationAndStatusUpdateActivity ::attemptFailure , new AttemptFailureInput (
343- workflowInternalState .getJobId (),
344- workflowInternalState .getAttemptNumber (),
345- connectionUpdaterInput .getConnectionId (),
346- standardSyncOutput ,
347- FailureHelper .failureSummary (failureReasons , workflowInternalState .getPartialSuccess ())));
348- } else {
349- runMandatoryActivity (jobCreationAndStatusUpdateActivity ::attemptFailureWithAttemptNumber , new AttemptNumberFailureInput (
350- workflowInternalState .getJobId (),
351- workflowInternalState .getAttemptNumber (),
352- connectionUpdaterInput .getConnectionId (),
353- standardSyncOutput ,
354- FailureHelper .failureSummary (failureReasons , workflowInternalState .getPartialSuccess ())));
355- }
294+
295+ runMandatoryActivity (jobCreationAndStatusUpdateActivity ::attemptFailureWithAttemptNumber , new AttemptNumberFailureInput (
296+ workflowInternalState .getJobId (),
297+ workflowInternalState .getAttemptNumber (),
298+ connectionUpdaterInput .getConnectionId (),
299+ standardSyncOutput ,
300+ FailureHelper .failureSummary (failureReasons , workflowInternalState .getPartialSuccess ())));
356301
357302 final int maxAttempt = configFetchActivity .getMaxAttempt ().getMaxAttempt ();
358303 final int attemptNumber = connectionUpdaterInput .getAttemptNumber ();
@@ -371,17 +316,12 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
371316 runMandatoryActivity (jobCreationAndStatusUpdateActivity ::jobFailure , new JobFailureInput (connectionUpdaterInput .getJobId (),
372317 connectionUpdaterInput .getAttemptNumber (), connectionUpdaterInput .getConnectionId (), failureReason ));
373318
374- final int autoDisableConnectionVersion =
375- Workflow .getVersion ("auto_disable_failing_connection" , Workflow .DEFAULT_VERSION , AUTO_DISABLE_FAILING_CONNECTION_CHANGE_CURRENT_VERSION );
376-
377- if (autoDisableConnectionVersion != Workflow .DEFAULT_VERSION ) {
378- final AutoDisableConnectionActivityInput autoDisableConnectionActivityInput =
379- new AutoDisableConnectionActivityInput (connectionId , Instant .ofEpochMilli (Workflow .currentTimeMillis ()));
380- final AutoDisableConnectionOutput output = runMandatoryActivityWithOutput (
381- autoDisableConnectionActivity ::autoDisableFailingConnection , autoDisableConnectionActivityInput );
382- if (output .isDisabled ()) {
383- log .info ("Auto-disabled for constantly failing for Connection {}" , connectionId );
384- }
319+ final AutoDisableConnectionActivityInput autoDisableConnectionActivityInput =
320+ new AutoDisableConnectionActivityInput (connectionId , Instant .ofEpochMilli (Workflow .currentTimeMillis ()));
321+ final AutoDisableConnectionOutput output = runMandatoryActivityWithOutput (
322+ autoDisableConnectionActivity ::autoDisableFailingConnection , autoDisableConnectionActivityInput );
323+ if (output .isDisabled ()) {
324+ log .info ("Auto-disabled for constantly failing for Connection {}" , connectionId );
385325 }
386326
387327 // Record the failure metric
@@ -392,14 +332,6 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
392332 }
393333
394334 private ConnectorJobOutput getCheckResponse (final CheckConnectionInput checkInput ) {
395- final int checkJobOutputVersion =
396- Workflow .getVersion (CHECK_JOB_OUTPUT_TAG , Workflow .DEFAULT_VERSION , CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION );
397-
398- if (checkJobOutputVersion < CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION ) {
399- final StandardCheckConnectionOutput checkOutput = runMandatoryActivityWithOutput (checkActivity ::run , checkInput );
400- return new ConnectorJobOutput ().withOutputType (OutputType .CHECK_CONNECTION ).withCheckConnection (checkOutput );
401- }
402-
403335 return runMandatoryActivityWithOutput (checkActivity ::runWithJobOutput , checkInput );
404336 }
405337
@@ -412,26 +344,13 @@ private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity.
412344 final IntegrationLauncherConfig destinationLauncherConfig = jobInputs .getDestinationLauncherConfig ();
413345 final SyncCheckConnectionFailure checkFailure = new SyncCheckConnectionFailure (jobRunConfig );
414346
415- final int attemptCreationVersion =
416- Workflow .getVersion (CHECK_BEFORE_SYNC_TAG , Workflow .DEFAULT_VERSION , CHECK_BEFORE_SYNC_CURRENT_VERSION );
417-
418- if (attemptCreationVersion < CHECK_BEFORE_SYNC_CURRENT_VERSION ) {
419- // return early if this instance of the workflow was created beforehand
420- return checkFailure ;
421- }
422-
423347 final StandardCheckConnectionInput sourceConfiguration = new StandardCheckConnectionInput ().withConnectionConfiguration (sourceConfig );
424348 final CheckConnectionInput checkSourceInput = new CheckConnectionInput (jobRunConfig , sourceLauncherConfig , sourceConfiguration );
425349
426- final int checkJobOutputVersion =
427- Workflow .getVersion (CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG , Workflow .DEFAULT_VERSION , CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG_CURRENT_VERSION );
428- boolean isLastJobOrAttemptFailure = true ;
429-
430- if (checkJobOutputVersion >= CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG_CURRENT_VERSION ) {
431- final JobCheckFailureInput jobStateInput =
432- new JobCheckFailureInput (Long .parseLong (jobRunConfig .getJobId ()), jobRunConfig .getAttemptId ().intValue (), connectionId );
433- isLastJobOrAttemptFailure = runMandatoryActivityWithOutput (jobCreationAndStatusUpdateActivity ::isLastJobOrAttemptFailure , jobStateInput );
434- }
350+ final JobCheckFailureInput jobStateInput =
351+ new JobCheckFailureInput (Long .parseLong (jobRunConfig .getJobId ()), jobRunConfig .getAttemptId ().intValue (), connectionId );
352+ final boolean isLastJobOrAttemptFailure =
353+ runMandatoryActivityWithOutput (jobCreationAndStatusUpdateActivity ::isLastJobOrAttemptFailure , jobStateInput );
435354 if (isResetJob (sourceLauncherConfig ) || checkFailure .isFailed () || !isLastJobOrAttemptFailure ) {
436355 // reset jobs don't need to connect to any external source, so check connection is unnecessary
437356 log .info ("SOURCE CHECK: Skipped" );
@@ -693,14 +612,6 @@ private Duration getTimeToWait(final UUID connectionId) {
693612 }
694613
695614 private void ensureCleanJobState (final ConnectionUpdaterInput connectionUpdaterInput ) {
696- final int ensureCleanJobStateVersion =
697- Workflow .getVersion (ENSURE_CLEAN_JOB_STATE , Workflow .DEFAULT_VERSION , ENSURE_CLEAN_JOB_STATE_CURRENT_VERSION );
698-
699- // For backwards compatibility and determinism, skip if workflow existed before this change
700- if (ensureCleanJobStateVersion < ENSURE_CLEAN_JOB_STATE_CURRENT_VERSION ) {
701- return ;
702- }
703-
704615 if (connectionUpdaterInput .getJobId () != null ) {
705616 log .info ("This workflow is already attached to a job, so no need to clean job state." );
706617 return ;
@@ -710,13 +621,6 @@ private void ensureCleanJobState(final ConnectionUpdaterInput connectionUpdaterI
710621 }
711622
712623 private void recordMetric (final RecordMetricInput recordMetricInput ) {
713- final int recordMetricVersion =
714- Workflow .getVersion (RECORD_METRIC_TAG , Workflow .DEFAULT_VERSION , RECORD_METRIC_CURRENT_VERSION );
715-
716- if (recordMetricVersion < RECORD_METRIC_CURRENT_VERSION ) {
717- return ;
718- }
719-
720624 runMandatoryActivity (recordMetricActivity ::recordWorkflowCountMetric , recordMetricInput );
721625 }
722626
@@ -747,19 +651,6 @@ private Long getOrCreateJobId(final ConnectionUpdaterInput connectionUpdaterInpu
747651 * @return The attempt number
748652 */
749653 private Integer createAttempt (final long jobId ) {
750- final int attemptCreationVersion =
751- Workflow .getVersion (RENAME_ATTEMPT_ID_TO_NUMBER_TAG , Workflow .DEFAULT_VERSION , RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION );
752-
753- // Retrieve the attempt number but name it attempt id
754- if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION ) {
755- final AttemptCreationOutput attemptCreationOutput =
756- runMandatoryActivityWithOutput (
757- jobCreationAndStatusUpdateActivity ::createNewAttempt ,
758- new AttemptCreationInput (
759- jobId ));
760- return attemptCreationOutput .getAttemptId ();
761- }
762-
763654 final AttemptNumberCreationOutput attemptNumberCreationOutput =
764655 runMandatoryActivityWithOutput (
765656 jobCreationAndStatusUpdateActivity ::createNewAttemptNumber ,
@@ -775,20 +666,6 @@ private Integer createAttempt(final long jobId) {
775666 private GeneratedJobInput getJobInput () {
776667 final Long jobId = workflowInternalState .getJobId ();
777668 final Integer attemptNumber = workflowInternalState .getAttemptNumber ();
778- final int attemptCreationVersion =
779- Workflow .getVersion (RENAME_ATTEMPT_ID_TO_NUMBER_TAG , Workflow .DEFAULT_VERSION , RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION );
780-
781- if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION ) {
782- final SyncInput getSyncInputActivitySyncInput = new SyncInput (
783- attemptNumber ,
784- jobId );
785-
786- final GeneratedJobInput syncWorkflowInputs = runMandatoryActivityWithOutput (
787- getSyncInputActivity ::getSyncWorkflowInput ,
788- getSyncInputActivitySyncInput );
789-
790- return syncWorkflowInputs ;
791- }
792669
793670 final SyncInputWithAttemptNumber getSyncInputActivitySyncInput = new SyncInputWithAttemptNumber (
794671 attemptNumber ,
@@ -802,18 +679,6 @@ private GeneratedJobInput getJobInput() {
802679 }
803680
804681 private String getSyncTaskQueue () {
805- final int taskQueueChangeVersion =
806- Workflow .getVersion ("task_queue_change_from_connection_updater_to_sync" , Workflow .DEFAULT_VERSION , TASK_QUEUE_CHANGE_CURRENT_VERSION );
807-
808- if (taskQueueChangeVersion < TASK_QUEUE_CHANGE_CURRENT_VERSION ) {
809- return TemporalJobType .CONNECTION_UPDATER .name ();
810- }
811-
812- final int routeActivityVersion = Workflow .getVersion (ROUTE_ACTIVITY_TAG , Workflow .DEFAULT_VERSION , ROUTE_ACTIVITY_CURRENT_VERSION );
813-
814- if (routeActivityVersion < ROUTE_ACTIVITY_CURRENT_VERSION ) {
815- return TemporalJobType .SYNC .name ();
816- }
817682
818683 final RouteToSyncTaskQueueInput routeToSyncTaskQueueInput = new RouteToSyncTaskQueueInput (connectionId );
819684 final RouteToSyncTaskQueueOutput routeToSyncTaskQueueOutput = runMandatoryActivityWithOutput (
@@ -910,46 +775,21 @@ private void reportCancelled(final UUID connectionId) {
910775 final Integer attemptNumber = workflowInternalState .getAttemptNumber ();
911776 final Set <FailureReason > failures = workflowInternalState .getFailures ();
912777 final Boolean partialSuccess = workflowInternalState .getPartialSuccess ();
913- final int attemptCreationVersion =
914- Workflow .getVersion (RENAME_ATTEMPT_ID_TO_NUMBER_TAG , Workflow .DEFAULT_VERSION , RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION );
915-
916- if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION ) {
917- runMandatoryActivity (jobCreationAndStatusUpdateActivity ::jobCancelled ,
918- new JobCancelledInput (
919- jobId ,
920- attemptNumber ,
921- connectionId ,
922- FailureHelper .failureSummaryForCancellation (jobId , attemptNumber , failures , partialSuccess )));
923- } else {
924- runMandatoryActivity (jobCreationAndStatusUpdateActivity ::jobCancelledWithAttemptNumber ,
925- new JobCancelledInputWithAttemptNumber (
926- jobId ,
927- attemptNumber ,
928- connectionId ,
929- FailureHelper .failureSummaryForCancellation (jobId , attemptNumber , failures , partialSuccess )));
930- }
778+
779+ runMandatoryActivity (jobCreationAndStatusUpdateActivity ::jobCancelledWithAttemptNumber ,
780+ new JobCancelledInputWithAttemptNumber (
781+ jobId ,
782+ attemptNumber ,
783+ connectionId ,
784+ FailureHelper .failureSummaryForCancellation (jobId , attemptNumber , failures , partialSuccess )));
931785 }
932786
933787 private void deleteResetJobStreams () {
934- final int deleteResetJobStreamsVersion =
935- Workflow .getVersion (DELETE_RESET_JOB_STREAMS_TAG , Workflow .DEFAULT_VERSION , DELETE_RESET_JOB_STREAMS_CURRENT_VERSION );
936-
937- if (deleteResetJobStreamsVersion < DELETE_RESET_JOB_STREAMS_CURRENT_VERSION ) {
938- return ;
939- }
940-
941788 runMandatoryActivity (streamResetActivity ::deleteStreamResetRecordsForJob ,
942789 new DeleteStreamResetRecordsForJobInput (connectionId , workflowInternalState .getJobId ()));
943790 }
944791
945792 private Duration getWorkflowRestartDelaySeconds () {
946- final int workflowConfigVersion =
947- Workflow .getVersion (WORKFLOW_CONFIG_TAG , Workflow .DEFAULT_VERSION , WORKFLOW_CONFIG_CURRENT_VERSION );
948-
949- if (workflowConfigVersion < WORKFLOW_CONFIG_CURRENT_VERSION ) {
950- return Duration .ofMinutes (10L );
951- }
952-
953793 return workflowConfigActivity .getWorkflowRestartDelaySeconds ();
954794 }
955795
0 commit comments