Skip to content

Commit b58637d

Browse files
authored
Add leader_pod unittest (#145)
1 parent 644d4f7 commit b58637d

23 files changed

+227
-201
lines changed

python/edl/collective/launch.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@
3030
from edl.utils import pod_server_client
3131
from edl.utils import status as edl_status
3232
from edl.utils import train_process as edl_train_process
33+
from edl.utils import resource_pods
3334

34-
from ..utils.leader_register import LeaderRegister
35+
from edl.utils import leader_pod
3536
from ..utils.log_utils import logger
36-
from ..utils.pod import Pod
37-
from ..utils.pod_server import PodServer
38-
from ..utils.register import PodResourceRegister
39-
from ..utils.watcher import Watcher
37+
from ..utils import pod
38+
from ..utils import pod_server
39+
from ..utils import cluster_watcher
4040

4141

4242
def edl_barrier(job_env, pod, timeout):
@@ -46,13 +46,13 @@ def edl_barrier(job_env, pod, timeout):
4646
while True:
4747
try:
4848
etcd = etcd_db.get_global_etcd()
49-
leader = etcd_leader.get_pod_leader(etcd)
49+
leader = leader_pod.load_from_etcd(etcd)
5050
if leader is None:
5151
raise exceptions.EdlNotFoundLeader("can't get leader")
5252

5353
logger.debug("barrier on leader:{}".format(leader))
5454

55-
c = pod_server_client.PodServerClient(leader.endpoint)
55+
c = pod_server_client.Client(leader.endpoint)
5656
cluster = c.barrier(job_env.job_id, pod.get_id())
5757
return cluster
5858
except Exception as e:
@@ -86,7 +86,7 @@ def prepare(args):
8686
sys.exit(0)
8787

8888
# local pod, and the pod's id does't change.
89-
pod = Pod()
89+
pod = edl_pod.Pod()
9090
pod.from_env(job_env)
9191

9292
# update pod status
@@ -147,10 +147,10 @@ def launch(args):
147147

148148
# register pod resource to tell others:
149149
# this resource can use to train
150-
resource_register = PodResourceRegister(job_env, pod)
150+
resource_register = resource_pods.Register(job_env, pod)
151151

152152
# seize the leader
153-
leader_register = LeaderRegister(job_env, pod.get_id())
153+
leader_register = leader_pod.Register(job_env, pod.get_id())
154154

155155
# register rank and watch the rank
156156
# if the rank changed, the pods should restart the training proc.
@@ -163,7 +163,7 @@ def launch(args):
163163
pod.get_id(), edl_status.Status.RUNNING)
164164

165165
# watcher after barrier
166-
watcher = Watcher(job_env, cluster, pod)
166+
watcher = cluster_watcher.Watcher(job_env, cluster, pod)
167167

168168
procs = edl_train_process.start(
169169
cluster,
@@ -196,7 +196,7 @@ def launch(args):
196196
edl_train_process.terminate(procs)
197197

198198
cluster = new_cluster
199-
watcher = Watcher(job_env, cluster, pod)
199+
watcher = cluster_watcher.Watcher(job_env, cluster, pod)
200200

201201
procs = edl_train_process.start(
202202
job_env,

python/edl/tests/unittests/CMakeLists.txt

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -74,29 +74,17 @@ endfunction()
7474
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
7575
#FIXME
7676
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
77-
LIST(REMOVE_ITEM TEST_OPS test_train)
7877
LIST(REMOVE_ITEM TEST_OPS test_data_reader)
79-
LIST(REMOVE_ITEM TEST_OPS test_generate)
80-
LIST(REMOVE_ITEM TEST_OPS test_resource_pods)
81-
LIST(REMOVE_ITEM TEST_OPS test_state)
82-
LIST(REMOVE_ITEM TEST_OPS test_pod)
83-
LIST(REMOVE_ITEM TEST_OPS test_cluster)
78+
LIST(REMOVE_ITEM TEST_OPS test_train)
8479
foreach(TEST_OP ${TEST_OPS})
85-
py_test_modules(${TEST_OP} MODULES ${TEST_OP})
80+
bash_test_modules(${TEST_OP} START_BASH etcd_test.sh ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
8681
endforeach(TEST_OP)
8782

88-
bash_test_modules(test_generate START_BASH etcd_test.sh ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
89-
bash_test_modules(test_resource_pods START_BASH etcd_test.sh ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
90-
bash_test_modules(test_state START_BASH etcd_test.sh ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
91-
bash_test_modules(test_pod START_BASH etcd_test.sh ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
92-
bash_test_modules(test_cluster START_BASH etcd_test.sh ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
93-
9483
# bash unit test
9584
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.sh")
9685
string(REPLACE ".sh" "" TEST_OPS "${TEST_OPS}")
97-
LIST(REMOVE_ITEM TEST_OPS test_master)
98-
LIST(REMOVE_ITEM TEST_OPS test_launch)
9986
LIST(REMOVE_ITEM TEST_OPS test_register)
87+
LIST(REMOVE_ITEM TEST_OPS test_launch)
10088
foreach(TEST_OP ${TEST_OPS})
10189
bash_test_modules(${TEST_OP} START_BASH "${TEST_OP}.sh" ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
10290
endforeach(TEST_OP)

python/edl/tests/unittests/dist_reader_test.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import unittest
16-
import time
17-
import os
18-
import atexit
19-
20-
from edl.utils.global_vars import *
2115
from edl.utils.etcd_test_base import EtcdTestBase
16+
from edl.utils.global_vars import *
2217

2318

2419
class TestDistReader(EtcdTestBase):

python/edl/tests/unittests/etcd_client_test.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import threading
16+
import time
1517
import unittest
1618
from edl.discovery.etcd_client import EtcdClient
17-
import time
18-
import threading
19-
2019
from edl.utils import string_utils
2120

2221

python/edl/tests/unittests/etcd_test_base.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,13 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import unittest
16-
import time
17-
import threading
18-
import os
19-
import sys
2015
import copy
21-
import atexit
22-
23-
from edl.utils import env as edl_env
24-
import edl.utils.log_utils as log_utils
2516
import edl.utils.constants as constants
26-
from edl.utils.etcd_db import get_global_etcd
17+
import edl.utils.log_utils as log_utils
18+
import os
19+
import unittest
2720
from edl.discovery.etcd_client import EtcdClient
21+
from edl.utils import env as edl_env
2822

2923
g_etcd_endpoints = "127.0.0.1:2379"
3024

python/edl/tests/unittests/test_cluster.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,18 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import time
15-
import unittest
16-
import six
1714

18-
from edl.utils import constants
19-
from edl.utils import resource_pods
15+
import sys
16+
import unittest
2017
from edl.tests.unittests import etcd_test_base
2118
from edl.utils import cluster as edl_cluster
2219

2320

24-
class TestPod(etcd_test_base.EtcdTestBase):
21+
class TestCluster(etcd_test_base.EtcdTestBase):
2522
def setUp(self):
26-
super(TestPod, self).setUp("test_cluster")
23+
super(TestCluster, self).setUp("test_cluster")
2724

28-
def test_pod(self):
25+
def test_cluster_basic(self):
2926
cluster = edl_cluster.Cluster()
3027

3128
cluster2 = edl_cluster.Cluster()

python/edl/tests/unittests/test_generate.py renamed to python/edl/tests/unittests/test_cluster_generator.py

Lines changed: 26 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -12,36 +12,27 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import unittest
16-
from edl.discovery.etcd_client import EtcdClient
17-
import time
18-
import threading
1915
import sys
20-
import copy
21-
import atexit
22-
23-
from edl.utils.pod_server import PodServer
24-
from edl.utils.pod import Pod
25-
from edl.utils.pod_server_client import PodServerClient
26-
from edl.utils.exceptions import EdlBarrierError
27-
from edl.utils import status as edl_status
28-
from edl.utils import constants
29-
import edl.utils.cluster as edl_cluster
30-
from edl.utils.etcd_db import get_global_etcd
31-
from edl.utils.leader_register import LeaderRegister
16+
import unittest
3217
from edl.tests.unittests import etcd_test_base
18+
from edl.utils import constants
19+
from edl.utils import status as edl_status
20+
from edl.utils.exceptions import EdlBarrierError
21+
from edl.utils import pod as edl_pod
22+
from edl.utils import pod_server
23+
from edl.utils import pod_server_client
3324
from edl.utils import cluster_generator
3425

3526

36-
class TestGenerate(etcd_test_base.EtcdTestBase):
27+
class TestClusterGenerator(etcd_test_base.EtcdTestBase):
3728
def setUp(self):
38-
super(TestGenerate, self).setUp("test_generate")
29+
super(TestClusterGenerator, self).setUp("test_cluster_generator")
3930

40-
def register_pod(self, job_env):
41-
pod = Pod()
31+
def _register_pod(self, job_env):
32+
pod = edl_pod.Pod()
4233
pod.from_env(job_env)
43-
s = PodServer(self._job_env, pod)
44-
s.start()
34+
server = pod_server.PodServer(self._job_env, pod)
35+
server.start()
4536
self._etcd.set_server_permanent(constants.ETCD_POD_RESOURCE,
4637
pod.get_id(), pod.to_json())
4738
self._etcd.set_server_permanent(constants.ETCD_POD_STATUS,
@@ -50,49 +41,47 @@ def register_pod(self, job_env):
5041
self._etcd.get_full_path(constants.ETCD_POD_RESOURCE,
5142
pod.get_id()))
5243

53-
edl_status.save_pod_status_to_etcd(self._etcd,
54-
pod.get_id(),
55-
edl_status.Status.INITIAL)
44+
edl_status.save_pod_status_to_etcd(
45+
self._etcd, pod.get_id(), edl_status.Status.INITIAL, timeout=15)
5646
print("set permanent:", self._etcd.get_full_path(
5747
constants.ETCD_POD_STATUS, pod.get_id()))
5848

59-
return pod, s
49+
return pod, server
6050

61-
def test_server(self):
62-
pod_0, server_0 = self.register_pod(self._job_env)
51+
def test_barrier(self):
52+
pod_0, server_0 = self._register_pod(self._job_env)
6353
self._etcd.set_server_permanent(constants.ETCD_POD_RANK,
6454
constants.ETCD_POD_LEADER,
6555
pod_0.get_id())
6656
print("set permanent:", self._etcd.get_full_path(
6757
constants.ETCD_POD_RANK, constants.ETCD_POD_LEADER))
6858

69-
pod_1, server_1 = self.register_pod(self._job_env)
59+
pod_1, server_1 = self._register_pod(self._job_env)
7060

7161
generater = cluster_generator.Generator(self._job_env, pod_0.get_id())
72-
ret = generater.start()
62+
generater.start()
7363

74-
cluster_0 = None
75-
clsuter_1 = None
7664
try:
77-
c = PodServerClient(pod_0.endpoint)
78-
cluster_0 = c.barrier(
65+
client = pod_server_client.Client(pod_0.endpoint)
66+
cluster_0 = client.barrier(
7967
self._job_env.job_id, pod_0.get_id(), timeout=0)
8068

8169
self.assertNotEqual(cluster_0, None)
8270
except EdlBarrierError as e:
8371
pass
8472
except:
8573
sys.exit(1)
74+
finally:
8675
generater.stop()
8776

8877
try:
89-
cluster_1 = c.barrier(
78+
cluster_1 = client.barrier(
9079
self._job_env.job_id, pod_1.get_id(), timeout=15)
9180
except:
92-
generater.stop()
9381
sys.exit(1)
82+
finally:
83+
generater.stop()
9484

95-
generater.stop()
9685
self.assertNotEqual(cluster_1, None)
9786

9887

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import time
16+
import unittest
17+
from edl.tests.unittests import etcd_test_base
18+
from edl.utils import constants
19+
from edl.utils import leader_pod
20+
from edl.utils import pod as edl_pod
21+
from edl.utils import resource_pods
22+
from edl.utils import cluster_generator
23+
24+
25+
class TestLeaderPod(etcd_test_base.EtcdTestBase):
26+
def setUp(self):
27+
super(TestLeaderPod, self).setUp("test_leader_pod")
28+
29+
def _add_pod(self):
30+
pod = edl_pod.Pod()
31+
pod.from_env(self._job_env)
32+
resource_register = resource_pods.Register(
33+
self._job_env,
34+
pod_id=pod.pod_id,
35+
pod_json=pod.to_json(),
36+
ttl=constants.ETCD_TTL)
37+
generator = cluster_generator.Generator(self._job_env, pod.pod_id)
38+
leader_register = leader_pod.Register(
39+
self._job_env, pod.pod_id, cluster_generator=generator)
40+
41+
return (pod, leader_register, resource_register)
42+
43+
def test_seize_leader(self):
44+
pod0, leader_register0, resource_register0 = self._add_pod()
45+
time.sleep(constants.ETCD_TTL)
46+
pod1, leader_register1, resource_register1 = self._add_pod()
47+
48+
leader_id = leader_pod.get_pod_leader_id(self._etcd, timeout=15)
49+
self.assertEqual(pod0.pod_id, leader_id)
50+
51+
leader_register0.stop()
52+
time.sleep(constants.ETCD_TTL)
53+
54+
leader_id = leader_pod.get_pod_leader_id(self._etcd, timeout=15)
55+
self.assertEqual(pod1.pod_id, leader_id)
56+
leader_register1.stop()
57+
58+
resource_register0.stop()
59+
resource_register1.stop()
60+
61+
62+
if __name__ == '__main__':
63+
unittest.main()

python/edl/tests/unittests/test_pod.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,8 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
import time
1514
import unittest
16-
import six
1715

18-
from edl.utils import constants
19-
from edl.utils import resource_pods
2016
from edl.tests.unittests import etcd_test_base
2117
from edl.utils import pod as edl_pod
2218

0 commit comments

Comments
 (0)