3939 * application. Unlike {@link KubePodProcess} there is no heartbeat mechanism that requires the
4040 * launching pod and the launched pod to co-exist for the duration of execution for the launched
4141 * pod.
42- *
42+ * <p>
4343 * Instead, this process creates the pod and interacts with a document store on cloud storage to
4444 * understand the state of the created pod.
45- *
45+ * <p>
4646 * The document store is considered to be the truth when retrieving the status for an async pod
4747 * process. If the store isn't updated by the underlying pod, it will appear as failed.
4848 */
@@ -190,10 +190,12 @@ public boolean hasExited() {
190190 public boolean waitFor (final long timeout , final TimeUnit unit ) throws InterruptedException {
191191 // implementation copied from Process.java since this isn't a real Process
192192 long remainingNanos = unit .toNanos (timeout );
193- if (hasExited ())
193+ if (hasExited ()) {
194194 return true ;
195- if (timeout <= 0 )
195+ }
196+ if (timeout <= 0 ) {
196197 return false ;
198+ }
197199
198200 final long deadline = System .nanoTime () + remainingNanos ;
199201 do {
@@ -202,8 +204,9 @@ public boolean waitFor(final long timeout, final TimeUnit unit) throws Interrupt
202204 // We are waiting polling every 500ms for status. The trade-off here is between how often
203205 // we poll our status storage (GCS) and how reactive we are to detect that a process is done.
204206 Thread .sleep (Math .min (TimeUnit .NANOSECONDS .toMillis (remainingNanos ) + 1 , 500 ));
205- if (hasExited ())
207+ if (hasExited ()) {
206208 return true ;
209+ }
207210 remainingNanos = deadline - System .nanoTime ();
208211 } while (remainingNanos > 0 );
209212
@@ -236,7 +239,7 @@ private boolean checkStatus(final AsyncKubePodStatus status) {
236239
237240 /**
238241 * Checks terminal states first, then running, then initialized. Defaults to not started.
239- *
242+ * <p>
240243 * The order matters here!
241244 */
242245 public AsyncKubePodStatus getDocStoreStatus () {
@@ -298,6 +301,33 @@ public void create(final Map<String, String> allLabels,
298301 final List <ContainerPort > containerPorts = KubePodProcess .createContainerPortList (portMap );
299302 containerPorts .add (new ContainerPort (serverPort , null , null , null , null ));
300303
304+ final var initContainer = new ContainerBuilder ()
305+ .withName (KubePodProcess .INIT_CONTAINER_NAME )
306+ .withImage ("busybox:1.35" )
307+ .withVolumeMounts (volumeMounts )
308+ .withCommand (List .of (
309+ "sh" ,
310+ "-c" ,
311+ String .format ("""
312+ i=0
313+ until [ $i -gt 60 ]
314+ do
315+ echo "$i - waiting for config file transfer to complete..."
316+ # check if the upload-complete file exists, if so exit without error
317+ if [ -f "%s/%s" ]; then
318+ exit 0
319+ fi
320+ i=$((i+1))
321+ sleep 1
322+ done
323+ echo "config files did not transfer in time"
324+ # no upload-complete file was created in time, exit with error
325+ exit 1
326+ """ ,
327+ KubePodProcess .CONFIG_DIR ,
328+ KubePodProcess .SUCCESS_FILE_NAME )))
329+ .build ();
330+
301331 final var mainContainer = new ContainerBuilder ()
302332 .withName (KubePodProcess .MAIN_CONTAINER_NAME )
303333 .withImage (kubePodInfo .mainContainerInfo ().image ())
@@ -316,9 +346,11 @@ public void create(final Map<String, String> allLabels,
316346 .withLabels (allLabels )
317347 .endMetadata ()
318348 .withNewSpec ()
319- .withServiceAccount ("airbyte-admin" ).withAutomountServiceAccountToken (true )
349+ .withServiceAccount ("airbyte-admin" )
350+ .withAutomountServiceAccountToken (true )
320351 .withRestartPolicy ("Never" )
321352 .withContainers (mainContainer )
353+ .withInitContainers (initContainer )
322354 .withVolumes (volumes )
323355 .endSpec ()
324356 .build ();
@@ -332,9 +364,9 @@ public void create(final Map<String, String> allLabels,
332364 kubernetesClient .pods ()
333365 .inNamespace (kubePodInfo .namespace ())
334366 .withName (kubePodInfo .name ())
335- .waitUntilCondition (p -> {
336- return ! p . getStatus (). getContainerStatuses (). isEmpty () && p .getStatus ().getContainerStatuses ().get (0 ).getState ().getWaiting () == null ;
337- }, 5 , TimeUnit .MINUTES );
367+ .waitUntilCondition (p -> ! p . getStatus (). getInitContainerStatuses (). isEmpty ()
368+ && p .getStatus ().getInitContainerStatuses ().get (0 ).getState ().getWaiting () == null ,
369+ 5 , TimeUnit .MINUTES );
338370
339371 final var podStatus = kubernetesClient .pods ()
340372 .inNamespace (kubePodInfo .namespace ())
@@ -343,7 +375,7 @@ public void create(final Map<String, String> allLabels,
343375 .getStatus ();
344376
345377 final var containerState = podStatus
346- .getContainerStatuses ()
378+ .getInitContainerStatuses ()
347379 .get (0 )
348380 .getState ();
349381
@@ -378,7 +410,7 @@ public static void copyFilesToKubeConfigVolumeMain(final Pod podDefinition, fina
378410 // several issues with copying files. See https://github.com/airbytehq/airbyte/issues/8643 for
379411 // details.
380412 final String command = String .format ("kubectl cp %s %s/%s:%s -c %s" , tmpFile , podDefinition .getMetadata ().getNamespace (),
381- podDefinition .getMetadata ().getName (), containerPath , "main" );
413+ podDefinition .getMetadata ().getName (), containerPath , KubePodProcess . INIT_CONTAINER_NAME );
382414 log .info (command );
383415
384416 proc = Runtime .getRuntime ().exec (command );
0 commit comments