Skip to content

Commit da76d4b

Browse files
authored
Add launcher unit test. (#147)
1 parent ba5b225 commit da76d4b

File tree

18 files changed

+386
-254
lines changed

18 files changed

+386
-254
lines changed

python/edl/collective/launch.py

Lines changed: 11 additions & 186 deletions
Original file line numberDiff line numberDiff line change
@@ -19,58 +19,19 @@
1919
from __future__ import print_function
2020

2121
import sys
22-
import time
23-
import traceback
2422
from edl.utils import args_utils
25-
from edl.utils import constants
2623
from edl.utils import env as edl_env
2724
from edl.utils import etcd_db
28-
from edl.utils import exceptions
25+
from edl.utils import launcher as edl_launcher
2926
from edl.utils import log_utils
30-
from edl.utils import pod_server_client
3127
from edl.utils import status as edl_status
32-
from edl.utils import train_process as edl_train_process
33-
from edl.utils import resource_pods
28+
from edl.utils.log_utils import logger
29+
from edl.utils import pod as edl_pod
3430

35-
from edl.utils import leader_pod
36-
from ..utils.log_utils import logger
37-
from ..utils import pod
38-
from ..utils import pod_server
39-
from ..utils import cluster_watcher
4031

41-
42-
def edl_barrier(job_env, pod, timeout):
43-
start = time.time()
44-
45-
log_time = time.time()
46-
while True:
47-
try:
48-
etcd = etcd_db.get_global_etcd()
49-
leader = leader_pod.load_from_etcd(etcd)
50-
if leader is None:
51-
raise exceptions.EdlNotFoundLeader("can't get leader")
52-
53-
logger.debug("barrier on leader:{}".format(leader))
54-
55-
c = pod_server_client.Client(leader.endpoint)
56-
cluster = c.barrier(job_env.job_id, pod.get_id())
57-
return cluster
58-
except Exception as e:
59-
if time.time() - log_time > 30:
60-
logger.info("wait to barrier now!")
61-
log_time = time.time()
62-
logger.debug("barrier error:{} {}".format(e,
63-
traceback.format_exc()))
64-
65-
if time.time() - start > timeout:
66-
message = "wait to barrier with all error:{} leader:[{}] current pod:[{}]".format(
67-
traceback.format_exc(), leader, pod)
68-
raise exceptions.EdlBarrierError(message)
69-
70-
time.sleep(3)
71-
72-
73-
def prepare(args):
32+
def main():
33+
log_utils.get_logger(log_level=10)
34+
args = args_utils.parse_args()
7435
args_dict = args_utils.convert_args_to_dict(args)
7536

7637
# job enviroment.
@@ -80,7 +41,7 @@ def prepare(args):
8041
# get global etcd and lock
8142
etcd = etcd_db.get_global_etcd(job_env.etcd_endpoints, job_env.job_id)
8243

83-
last_status = edl_status.load_job_status_from_etcd(etcd)
44+
last_status = edl_status.load_job_status_from_etcd(etcd, timeout=30)
8445
if last_status == edl_status.Status.SUCCEED:
8546
logger.info("job:{} has completed! Need't try!".format(job_env.job_id))
8647
sys.exit(0)
@@ -89,146 +50,10 @@ def prepare(args):
8950
pod = edl_pod.Pod()
9051
pod.from_env(job_env)
9152

92-
# update pod status
93-
edl_status.save_pod_status_to_etcd(etcd,
94-
pod.get_id(), edl_status.Status.INITIAL)
95-
96-
# launch pod server
97-
pod_server = PodServer(job_env, pod.get_id())
98-
pod_server.start(job_env, pod)
99-
logger.info("pod server started:[{}]".format(pod))
100-
101-
return job_env, pod, pod_server
102-
103-
104-
def job_exit(cluster,
105-
leader_register,
106-
resource_register,
107-
watcher,
108-
pod,
109-
trainer_flag,
110-
register_flag,
111-
barrier_flag,
112-
resource_flag,
113-
timeout=300):
114-
local_flag = trainer_flag & register_flag & barrier_flag
115-
etcd = etcd_db.get_global_etcd()
116-
edl_status.save_pod_flag_to_ecd(etcd, pod.get_id(), local_flag)
117-
118-
begin = time.time()
119-
while True:
120-
try:
121-
if leader_register.is_leader():
122-
if etcd.wait_resource(cluster, timeout=15):
123-
job_flag = trainer_flag & register_flag & barrier_flag & resource_flag
124-
edl_status.save_job_flag_to_etcd(etcd, job_flag)
125-
logger.info("set job status:{} ok!".format(job_flag))
126-
break
127-
raise exceptions.EdlWaitFollowersReleaseError(
128-
"can't wait resource")
129-
else:
130-
break
131-
except Exception as e:
132-
logger.warning("prepare job_exit meets error:{}".format(e))
133-
if time.time() - begin >= timeout:
134-
logger.warning("wait resource error")
135-
break
136-
137-
time.sleep(3)
138-
continue
139-
140-
leader_register.stop()
141-
watcher.stop()
142-
resource_register.stop()
143-
144-
145-
def launch(args):
146-
job_env, pod, pod_server = prepare(args)
147-
148-
# register pod resource to tell others:
149-
# this resource can use to train
150-
resource_register = resource_pods.Register(job_env, pod)
151-
152-
# seize the leader
153-
leader_register = leader_pod.Register(job_env, pod.get_id())
154-
155-
# register rank and watch the rank
156-
# if the rank changed, the pods should restart the training proc.
157-
# pod exit if barrier error
158-
cluster = edl_barrier(job_env, pod, timeout=600)
159-
160-
# update pod status
161-
etcd = etcd_db.get_global_etcd()
162-
edl_status.save_pod_status_to_etcd(etcd,
163-
pod.get_id(), edl_status.Status.RUNNING)
164-
165-
# watcher after barrier
166-
watcher = cluster_watcher.Watcher(job_env, cluster, pod)
167-
168-
procs = edl_train_process.start(
169-
cluster,
170-
pod,
171-
args.training_script,
172-
args.training_script_args,
173-
log_dir=args.log_dir)
174-
175-
trainer_flag = True
176-
register_flag = True
177-
barrier_flag = True
178-
while True:
179-
# check local status first
180-
alive, trainer_flag = edl_train_process.watch(procs, pod.trainers_num)
181-
if not alive or not trainer_flag:
182-
break
183-
184-
if resource_register.is_stopped() or leader_register.is_stopped():
185-
edl_train_process.terminate()
186-
register_flag = False
187-
break
188-
189-
# check job status second
190-
if watcher.changed:
191-
new_cluster = edl_barrier(job_env, pod, timeout=60)
192-
if not new_cluster:
193-
barrier_flag = False
194-
break
195-
196-
edl_train_process.terminate(procs)
197-
198-
cluster = new_cluster
199-
watcher = cluster_watcher.Watcher(job_env, cluster, pod)
200-
201-
procs = edl_train_process.start(
202-
job_env,
203-
cluster,
204-
pod,
205-
args.training_script,
206-
args.training_script_args,
207-
log_dir=args.log_dir)
208-
209-
time.sleep(3)
210-
211-
if not register_flag:
212-
logger.fatal("register meets error and local exit!")
213-
214-
if not leader_register.is_leader():
215-
leader_register.stop()
216-
217-
job_exit(
218-
cluster=cluster,
219-
leader_register=leader_register,
220-
resource_register=resource_register,
221-
watcher=watcher,
222-
pod=pod,
223-
trainer_flag=trainer_flag,
224-
register_flag=register_flag,
225-
barrier_flag=barrier_flag)
226-
227-
228-
def main():
229-
log_utils.get_logger(log_level=10)
230-
args = args_utils.parse_args()
231-
launch(args)
53+
launcher = edl_launcher.Launcher(
54+
job_env=job_env, pod=pod, etcd=etcd, args=args)
55+
launcher.init()
56+
launcher.launch()
23257

23358

23459
if __name__ == '__main__':

python/edl/tests/unittests/CMakeLists.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,14 @@ file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
7676
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
7777
LIST(REMOVE_ITEM TEST_OPS test_data_reader)
7878
LIST(REMOVE_ITEM TEST_OPS test_train)
79+
LIST(REMOVE_ITEM TEST_OPS test_launch)
7980
foreach(TEST_OP ${TEST_OPS})
8081
bash_test_modules(${TEST_OP} START_BASH etcd_test.sh ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
8182
endforeach(TEST_OP)
8283

8384
# bash unit test
8485
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.sh")
8586
string(REPLACE ".sh" "" TEST_OPS "${TEST_OPS}")
86-
LIST(REMOVE_ITEM TEST_OPS test_register)
87-
LIST(REMOVE_ITEM TEST_OPS test_launch)
8887
foreach(TEST_OP ${TEST_OPS})
8988
bash_test_modules(${TEST_OP} START_BASH "${TEST_OP}.sh" ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
9089
endforeach(TEST_OP)

python/edl/tests/unittests/del_from_etcd.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,14 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import edl.utils.constants as constants
1615
import os
17-
from edl.utils.etcd_db import get_global_etcd
16+
from edl.utils import constants
17+
from edl.discovery import etcd_client
18+
19+
g_etcd_endpoints = "127.0.0.1:2379"
1820

1921
job_id = os.environ["PADDLE_JOB_ID"]
2022
etcd_endpoints = os.environ["PADDLE_ETCD_ENDPOINTS"]
21-
22-
db = get_global_etcd([etcd_endpoints], job_id)
23-
etcd = db._etcd
24-
etcd.remove_service(constants.ETCD_POD_RESOURCE)
25-
etcd.remove_service(constants.ETCD_POD_RANK)
26-
etcd.remove_service(constants.ETCD_POD_STATUS)
27-
etcd.remove_service(constants.ETCD_JOB_STATUS)
28-
etcd.remove_service(constants.ETCD_TRAIN_STATUS)
29-
etcd.remove_service(constants.ETCD_CLUSTER)
30-
etcd.remove_service(constants.ETCD_READER)
23+
etcd = etcd_client.EtcdClient([g_etcd_endpoints], root=job_id)
24+
etcd.init()
25+
constants.clean_etcd(etcd)

python/edl/tests/unittests/etcd_test.sh

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,13 @@ fi
1515
# rm flag file
1616
rm -f ${name}_*.log
1717

18-
nohup etcd > ${name}_etcd.log 2>&1 &
19-
etcd_pid=$!
20-
2118
# start the unit test
2219
run_time=$(( $TEST_TIMEOUT - 10 ))
2320
echo "run_time: ${run_time}"
2421

2522
timeout -s SIGKILL ${run_time} ${PYTHON_EXECUTABLE} -u ${name}.py > ${name}_run.log 2>&1
2623
exit_code=$?
2724

28-
kill -9 $etcd_pid
29-
3025
echo "${name} faild with ${exit_code}"
3126
if [[ $exit_code -eq 0 ]]; then
3227
exit 0

python/edl/tests/unittests/etcd_test_base.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,6 @@
2424

2525

2626
class EtcdTestBase(unittest.TestCase):
27-
def _clean_etcd(self):
28-
self._etcd.remove_service(constants.ETCD_POD_RESOURCE)
29-
self._etcd.remove_service(constants.ETCD_POD_RANK)
30-
self._etcd.remove_service(constants.ETCD_POD_STATUS)
31-
self._etcd.remove_service(constants.ETCD_JOB_STATUS)
32-
self._etcd.remove_service(constants.ETCD_TRAIN_STATUS)
33-
self._etcd.remove_service(constants.ETCD_CLUSTER)
34-
self._etcd.remove_service(constants.ETCD_READER)
35-
3627
def setUp(self, job_id):
3728
log_utils.get_logger(log_level=10)
3829
self._etcd = EtcdClient([g_etcd_endpoints], root=job_id)
@@ -62,9 +53,9 @@ def setUp(self, job_id):
6253
os.environ.update(proc_env)
6354

6455
self._job_env = edl_env.JobEnv(None)
65-
self._clean_etcd()
56+
constants.clean_etcd(self._etcd)
6657

6758
def tearDown(self):
6859
os.environ.clear()
6960
os.environ.update(self._old_environ)
70-
self._clean_etcd()
61+
constants.clean_etcd(self._etcd)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import print_function
16+
17+
import sys
18+
from edl.tests.unittests import etcd_test_base
19+
from edl.utils import env as edl_env
20+
from edl.utils import etcd_db
21+
from edl.utils import pod as edl_pod
22+
from edl.utils import status as edl_status
23+
from edl.utils.log_utils import logger
24+
from edl.utils import launcher as edl_launcher
25+
26+
27+
class TestLauncher(etcd_test_base.EtcdTestBase):
28+
def setUp(self):
29+
super(TestLauncher, self).setUp("test_launcher")
30+
31+
def test_normal_exit(self):
32+
launcher = edl_launcher(self._job_env, self._pod, self._etcd, args)
33+
launcher.init()
34+
launcher.launch()
35+
36+
last_status = edl_status.load_job_status_from_etcd(self._etcd)
37+
if last_status == edl_status.Status.SUCCEED:
38+
logger.info("job:{} has completed! Need't try!".format(
39+
self._job_env.job_id))
40+
return
41+
self.assertFalse(True)

python/edl/tests/unittests/test_register.sh renamed to python/edl/tests/unittests/test_launch.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export PADDLE_JOB_ID="test_success_job"
2626
export PADDLE_ETCD_ENDPOINTS="127.0.0.1:2379"
2727
export PADDLE_EDLNODES_RANAGE="2:2"
2828
export PADDLE_EDL_ONLY_FOR_CE_TEST="1"
29-
export PADDLE_EDL_HDFS_CHECKPOINT_PATH="./success_job"
29+
export PADDLE_EDL_HDFS_PATH="./success_job"
3030
export PADDLE_EDL_HDFS_HOME="./hadoop"
3131

3232
#clean keys
@@ -43,7 +43,7 @@ export PADDLE_DEMO_EXIT_CODE=0
4343
timeout -s SIGKILL ${run_time} python -m edl.collective.launch --log_dir 01 launch_demo.py > ${name}_run_01.log 2>&1 &
4444
pid_01=$!
4545

46-
key="/${PADDLE_JOB_ID}/job_flag/nodes/complete"
46+
key="/${PADDLE_JOB_ID}/job_flag/nodes/job_status"
4747
value=`etcdctl get ${key}`
4848
echo "job complete flag:${value}"
4949

python/edl/utils/cluster_generator.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,10 @@ def _generate_cluster(self, timeout=600):
8080

8181
def stop(self):
8282
self._stop.set()
83-
with self._lock:
84-
if self._t_register:
85-
self._t_register.join()
83+
if self._t_register:
84+
self._t_register.join()
85+
86+
with self._lock:
8687
self._t_register = None
8788

8889
logger.debug("{} exit".format(self.__class__.__name__))

0 commit comments

Comments
 (0)