11import json
2- from typing import Optional , Dict , Union
2+ import os
3+ import subprocess
4+ import tempfile
5+ import time
6+ from subprocess import CompletedProcess
7+ from typing import Optional , Dict , Union , Any
8+
9+ import fsspec
10+ import yaml
11+ from kubernetes import config , client
12+ from kubernetes .watch import watch
313
414from ads .opctl .backend .marketplace .marketplace_type import (
515 HelmMarketplaceListingDetails ,
@@ -40,7 +50,7 @@ class LocalMarketplaceOperatorBackend(Backend):
4050 """
4151
4252 def __init__ (
43- self , config : Optional [Dict ], operator_info : OperatorInfo = None
53+ self , config : Optional [Dict ], operator_info : OperatorInfo = None
4454 ) -> None :
4555 """
4656 Instantiates the operator backend.
@@ -70,8 +80,62 @@ def __init__(
7080
7181 self .operator_info = operator_info
7282
83+ def _set_kubernete_env (self ) -> None :
84+ os .environ ["OCI_CLI_AUTH" ] = 'security_token'
85+ os .environ ["OCI_CLI_PROFILE" ] = 'SESSION_PROFILE'
86+
87+ def _run_helm_install (self , name , chart , ** kwargs ) -> CompletedProcess :
88+ self ._set_kubernete_env ()
89+ flags = []
90+ for key , value in kwargs .items ():
91+ flags .extend ([f"--{ key } " , f"{ value } " ])
92+
93+ helm_cmd = ["helm" , "install" , name , chart , * flags ]
94+ print ("Running Command:" , " " .join (helm_cmd ))
95+ return subprocess .run (helm_cmd )
96+
97+ def _save_helm_value_to_yaml (self , helm_values : Dict [str , Any ]) -> str :
98+ override_value_path = os .path .join (tempfile .TemporaryDirectory ().name , f"values.yaml" )
99+ with fsspec .open (override_value_path , mode = "w" ) as f :
100+ f .write (yaml .dump (helm_values ))
101+ return override_value_path
102+
103+ def _delete_temp_file (self , temp_file ) -> bool :
104+ if os .path .exists (temp_file ):
105+ os .remove (temp_file )
106+ return True
107+
108+ return False
109+
110+ def _wait_for_pod_ready (self , namespace , pod_name ):
111+ # Configs can be set in Configuration class directly or using helper utility
112+ self ._set_kubernete_env ()
113+ config .load_kube_config ()
114+ v1 = client .CoreV1Api ()
115+
116+ def is_pod_ready (pod ):
117+ for condition in pod .status .conditions :
118+ if condition .type == "Ready" :
119+ return condition .status == 'True'
120+ return False
121+
122+ start_time = time .time ()
123+ timeout_seconds = 10 * 60
124+ sleep_time = 20
125+ while True :
126+ pod = v1 .list_namespaced_pod (namespace = namespace , label_selector = f"app.kubernetes.io/instance={ pod_name } " ).items [0 ]
127+ if is_pod_ready (pod ):
128+ return 0
129+ if time .time () - start_time >= timeout_seconds :
130+ print ("Timed out waiting for pad to get ready." )
131+ break
132+ print (f"Waiting for pod { pod_name } to be ready..." )
133+ time .sleep (sleep_time )
134+ return - 1
135+
73136 def _run_with_python (self , ** kwargs : Dict ) -> int :
74- """Runs the operator within a local python environment.
137+ """
138+ Runs the operator within a local python environment.
75139
76140 Returns
77141 -------
@@ -92,10 +156,28 @@ def _run_with_python(self, **kwargs: Dict) -> int:
92156 listing_details : MarketplaceListingDetails = operator .get_listing_details (
93157 operator_spec
94158 )
95-
159+ # operator_spec = self.operator_config['spec']
160+ # helm_values = operator_spec['helmValues']
96161 ##Perform backend logic##
162+ # name = 'fs-dp-api-test'
163+ # chart = 'oci://iad.ocir.io/idogsu2ylimg/feature-store-dataplane-api/helm-chart/feature-store-dp-api'
164+ if isinstance (listing_details , HelmMarketplaceListingDetails ):
165+ override_value_path = self ._save_helm_value_to_yaml (listing_details .helm_values )
166+ helm_install_status = self ._run_helm_install (
167+ name = listing_details .name ,
168+ chart = listing_details .chart ,
169+ ** {
170+ "version" : listing_details .version ,
171+ "namespace" : listing_details .namespace ,
172+ "values" : override_value_path
173+ }
174+ )
97175
98- return 0
176+ self ._delete_temp_file (override_value_path )
177+ if helm_install_status .returncode == 0 :
178+ return self ._wait_for_pod_ready (listing_details .namespace , listing_details .name )
179+ else :
180+ return - 1
99181
100182 def run (self , ** kwargs : Dict ) -> None :
101183 """Runs the operator."""
@@ -130,11 +212,11 @@ def run(self, **kwargs: Dict) -> None:
130212 )
131213
132214 def init (
133- self ,
134- uri : Union [str , None ] = None ,
135- overwrite : bool = False ,
136- runtime_type : Union [str , None ] = None ,
137- ** kwargs : Dict ,
215+ self ,
216+ uri : Union [str , None ] = None ,
217+ overwrite : bool = False ,
218+ runtime_type : Union [str , None ] = None ,
219+ ** kwargs : Dict ,
138220 ) -> Union [str , None ]:
139221 """Generates a starter YAML specification for the operator local runtime.
140222
@@ -177,8 +259,8 @@ def init(
177259
178260 return (
179261 operator_runtime_const .MARKETPLACE_RUNTIME_MAP [runtime_type ]
180- .init (** RUNTIME_KWARGS_MAP [runtime_type ])
181- .to_yaml (
262+ .init (** RUNTIME_KWARGS_MAP [runtime_type ])
263+ .to_yaml (
182264 uri = uri ,
183265 overwrite = overwrite ,
184266 note = note ,
0 commit comments