1212import datadog .trace .api .Trace ;
1313import io .airbyte .commons .features .FeatureFlags ;
1414import io .airbyte .commons .json .Jsons ;
15+ import io .airbyte .commons .logging .MdcScope ;
1516import io .airbyte .commons .protocol .AirbyteMessageSerDeProvider ;
1617import io .airbyte .commons .protocol .AirbyteMessageVersionedMigratorFactory ;
1718import io .airbyte .commons .temporal .TemporalUtils ;
@@ -120,7 +121,8 @@ public Optional<String> runJob() throws Exception {
120121 final AirbyteSource airbyteSource =
121122 WorkerConstants .RESET_JOB_SOURCE_DOCKER_IMAGE_STUB .equals (sourceLauncherConfig .getDockerImage ()) ? new EmptyAirbyteSource (
122123 featureFlags .useStreamCapableState ())
123- : new DefaultAirbyteSource (sourceLauncher , getStreamFactory (sourceLauncherConfig .getProtocolVersion ()));
124+ : new DefaultAirbyteSource (sourceLauncher ,
125+ getStreamFactory (sourceLauncherConfig .getProtocolVersion (), DefaultAirbyteSource .CONTAINER_LOG_MDC_BUILDER ));
124126
125127 MetricClientFactory .initialize (MetricEmittingApps .WORKER );
126128 final MetricClient metricClient = MetricClientFactory .getMetricClient ();
@@ -132,7 +134,8 @@ public Optional<String> runJob() throws Exception {
132134 Math .toIntExact (jobRunConfig .getAttemptId ()),
133135 airbyteSource ,
134136 new NamespacingMapper (syncInput .getNamespaceDefinition (), syncInput .getNamespaceFormat (), syncInput .getPrefix ()),
135- new DefaultAirbyteDestination (destinationLauncher , getStreamFactory (destinationLauncherConfig .getProtocolVersion ()),
137+ new DefaultAirbyteDestination (destinationLauncher , getStreamFactory (destinationLauncherConfig .getProtocolVersion (),
138+ DefaultAirbyteDestination .CONTAINER_LOG_MDC_BUILDER ),
136139 new VersionedAirbyteMessageBufferedWriterFactory (serDeProvider , migratorFactory , destinationLauncherConfig .getProtocolVersion ())),
137140 new AirbyteMessageTracker (),
138141 new RecordSchemaValidator (WorkerUtils .mapStreamNamesToSchemas (syncInput )),
@@ -146,10 +149,10 @@ public Optional<String> runJob() throws Exception {
146149 return Optional .of (Jsons .serialize (replicationOutput ));
147150 }
148151
149- private AirbyteStreamFactory getStreamFactory (final Version protocolVersion ) {
152+ private AirbyteStreamFactory getStreamFactory (final Version protocolVersion , final MdcScope . Builder mdcScope ) {
150153 return protocolVersion != null
151- ? new VersionedAirbyteStreamFactory <> (serDeProvider , migratorFactory , protocolVersion )
152- : new DefaultAirbyteStreamFactory ();
154+ ? new VersionedAirbyteStreamFactory (serDeProvider , migratorFactory , protocolVersion , mdcScope )
155+ : new DefaultAirbyteStreamFactory (mdcScope );
153156 }
154157
155158}
0 commit comments