@@ -180,20 +180,19 @@ def init_gb(self, do_init: bool=None):
180180 initialize_space = self .initialize_space # True or False
181181 if initialize_space and self .gb_config .gb_type == "NebulaHandler" :
182182 self .gb .add_hosts ('storaged0' , 9779 )
183- print ('增加NebulaGraph Storage主机中,等待20秒' )
184- time .sleep (20 )
183+ self .waiting_host_initialize ()
185184
186185 if self .clear_history_data :
187186 # 初始化space
188187 self .gb .drop_space ('client' )
189188
190189 self .gb .create_space ('client' )
190+ self .waiting_space_initialize ()
191191
192192 # 创建node tags和edge types
193193 self .create_gb_tags_and_edgetypes ()
194+ self .waiting_tags_edgetypes_initialize ()
194195
195- print ('Node Tags和Edge Types初始化中,等待20秒......' )
196- time .sleep (20 )
197196 else :
198197 self .gb = None
199198
@@ -223,7 +222,54 @@ def init_sls(self, do_init: bool=None):
223222 self .embed_config , vb_config = self .vb_config )
224223 else :
225224 self .sls = None
225+
226+ def wait_for_condition (self , condition , timeout = 40 , wait_time = 0.5 , log_prefix = '' ):
227+ max_attempts = int (timeout / wait_time )
228+
229+ total_time = 0
230+
231+ for _ in range (max_attempts ):
232+ if condition ():
233+ logger .info (f'[{ log_prefix } ] initialized successfully, total waiting time: { total_time } s' )
234+ return
235+
236+ logger .info (f'[{ log_prefix } ] have waited for { total_time } s...' )
237+ total_time += wait_time
238+ time .sleep (wait_time )
239+
240+ raise Exception (f'[{ log_prefix } ] not initialized within { timeout } seconds' )
226241
242+ def waiting_host_initialize (self ):
243+ self .wait_for_condition (
244+ condition = lambda : any (
245+ host .get ('Host' ) == 'storaged0' and host .get ('Status' ) == 'ONLINE'
246+ for host in self .gb .show_hosts ()
247+ ),
248+ log_prefix = 'host_initializing'
249+ )
250+
251+ def waiting_tags_edgetypes_initialize (self ):
252+ '''
253+ Make sure that tags and edgetypes are created properly
254+ Refer to create_gb_tags_and_edgetypes for more details
255+ '''
256+ self .wait_for_condition (
257+ condition = lambda : (
258+ len (self .gb .show_tags ()) == len (TYPE2SCHEMA .items ()) - 1 and
259+ len (self .gb .show_edge_type ()) == (len (TYPE2SCHEMA .items ()) - 1 ) ** 2 * 3
260+ ),
261+ log_prefix = 'tags_edgetypes_initializing'
262+ )
263+
264+ def waiting_space_initialize (self ):
265+ self .wait_for_condition (
266+ condition = lambda :any (
267+ space .get ('Name' ) == 'client'
268+ for space in self .gb .show_space ()
269+ ),
270+ log_prefix = 'space_initializing'
271+ )
272+
227273 def _get_local_graph (
228274 self , nodes : List [GNode ], edges : List [GEdge ], rootid
229275 ) -> Tuple [List [str ], Graph ]:
0 commit comments