Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
58406fd
add
gongweibao May 8, 2020
3475a92
merge
gongweibao May 11, 2020
78e7c8f
merge
gongweibao May 11, 2020
cddda8c
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao May 11, 2020
a2e4abc
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao May 14, 2020
b454b4c
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao May 15, 2020
44558a7
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao May 15, 2020
d8941b6
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao May 18, 2020
593771b
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao May 18, 2020
2ecf315
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao May 20, 2020
2222d3e
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao May 26, 2020
49b650b
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao May 26, 2020
d3602c0
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Jun 2, 2020
b61e04e
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Jun 8, 2020
a48f911
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Jun 8, 2020
6d79b03
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Jun 10, 2020
def2fb7
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Jun 12, 2020
f5a1ba3
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Jun 19, 2020
ccd7409
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Jun 28, 2020
8264b0e
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Aug 3, 2020
0a8ddcf
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Aug 7, 2020
e6ae928
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Aug 21, 2020
ef89c67
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Sep 3, 2020
c45cbae
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Sep 17, 2020
31fbea7
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Sep 18, 2020
fa66e8a
Merge branch 'develop' of https://github.com/elasticdeeplearning/edl …
gongweibao Sep 21, 2020
b9d9aef
add test=develop
gongweibao Sep 21, 2020
dc18f9c
add test=develop
gongweibao Sep 21, 2020
11aeeab
add
gongweibao Sep 21, 2020
18a9640
add
gongweibao Sep 21, 2020
73d2ee0
add test=develop
gongweibao Sep 21, 2020
8f4f79d
fix test=develop
gongweibao Sep 21, 2020
4712bfc
fix test=develop
gongweibao Sep 21, 2020
aea9325
fix test=develop
gongweibao Sep 21, 2020
7ad6e4f
fix test=develop
gongweibao Sep 21, 2020
dea32aa
add
gongweibao Sep 21, 2020
a61f394
fix test=develop
gongweibao Sep 21, 2020
a4cbfbe
fix test=develop
gongweibao Sep 21, 2020
f0b42f1
fix cycle dependency test=develop
gongweibao Sep 21, 2020
b70baec
split
gongweibao Sep 21, 2020
50f6914
split
gongweibao Sep 21, 2020
2c150bf
split
gongweibao Sep 21, 2020
3fa2e61
split
gongweibao Sep 21, 2020
455141b
split
gongweibao Sep 21, 2020
9ff9e10
cleanup test=develop
gongweibao Sep 21, 2020
091f59a
add test=develop
gongweibao Sep 22, 2020
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions python/edl/collective/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
from edl.utils import pod_server_client
from edl.utils import status as edl_status
from edl.utils import train_process as edl_train_process
from edl.utils import resource_pods

from ..utils.leader_register import LeaderRegister
from edl.utils import leader_pod
from ..utils.log_utils import logger
from ..utils.pod import Pod
from ..utils.pod_server import PodServer
from ..utils.register import PodResourceRegister
from ..utils.watcher import Watcher
from ..utils import pod
from ..utils import pod_server
from ..utils import cluster_watcher


def edl_barrier(job_env, pod, timeout):
Expand All @@ -46,13 +46,13 @@ def edl_barrier(job_env, pod, timeout):
while True:
try:
etcd = etcd_db.get_global_etcd()
leader = etcd_leader.get_pod_leader(etcd)
leader = leader_pod.load_from_etcd(etcd)
if leader is None:
raise exceptions.EdlNotFoundLeader("can't get leader")

logger.debug("barrier on leader:{}".format(leader))

c = pod_server_client.PodServerClient(leader.endpoint)
c = pod_server_client.Client(leader.endpoint)
cluster = c.barrier(job_env.job_id, pod.get_id())
return cluster
except Exception as e:
Expand Down Expand Up @@ -86,7 +86,7 @@ def prepare(args):
sys.exit(0)

# local pod, and the pod's id does't change.
pod = Pod()
pod = edl_pod.Pod()
pod.from_env(job_env)

# update pod status
Expand Down Expand Up @@ -147,10 +147,10 @@ def launch(args):

# register pod resource to tell others:
# this resource can use to train
resource_register = PodResourceRegister(job_env, pod)
resource_register = resource_pods.Register(job_env, pod)

# seize the leader
leader_register = LeaderRegister(job_env, pod.get_id())
leader_register = leader_pod.Register(job_env, pod.get_id())

# register rank and watch the rank
# if the rank changed, the pods should restart the training proc.
Expand All @@ -163,7 +163,7 @@ def launch(args):
pod.get_id(), edl_status.Status.RUNNING)

# watcher after barrier
watcher = Watcher(job_env, cluster, pod)
watcher = cluster_watcher.Watcher(job_env, cluster, pod)

procs = edl_train_process.start(
cluster,
Expand Down Expand Up @@ -196,7 +196,7 @@ def launch(args):
edl_train_process.terminate(procs)

cluster = new_cluster
watcher = Watcher(job_env, cluster, pod)
watcher = cluster_watcher.Watcher(job_env, cluster, pod)

procs = edl_train_process.start(
job_env,
Expand Down
18 changes: 3 additions & 15 deletions python/edl/tests/unittests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 +74,17 @@ endfunction()
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
#FIXME
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
LIST(REMOVE_ITEM TEST_OPS test_train)
LIST(REMOVE_ITEM TEST_OPS test_data_reader)
LIST(REMOVE_ITEM TEST_OPS test_generate)
LIST(REMOVE_ITEM TEST_OPS test_resource_pods)
LIST(REMOVE_ITEM TEST_OPS test_state)
LIST(REMOVE_ITEM TEST_OPS test_pod)
LIST(REMOVE_ITEM TEST_OPS test_cluster)
LIST(REMOVE_ITEM TEST_OPS test_train)
foreach(TEST_OP ${TEST_OPS})
py_test_modules(${TEST_OP} MODULES ${TEST_OP})
bash_test_modules(${TEST_OP} START_BASH etcd_test.sh ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
endforeach(TEST_OP)

bash_test_modules(test_generate START_BASH etcd_test.sh ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
bash_test_modules(test_resource_pods START_BASH etcd_test.sh ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
bash_test_modules(test_state START_BASH etcd_test.sh ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
bash_test_modules(test_pod START_BASH etcd_test.sh ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
bash_test_modules(test_cluster START_BASH etcd_test.sh ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")

# bash unit test
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.sh")
string(REPLACE ".sh" "" TEST_OPS "${TEST_OPS}")
LIST(REMOVE_ITEM TEST_OPS test_master)
LIST(REMOVE_ITEM TEST_OPS test_launch)
LIST(REMOVE_ITEM TEST_OPS test_register)
LIST(REMOVE_ITEM TEST_OPS test_launch)
foreach(TEST_OP ${TEST_OPS})
bash_test_modules(${TEST_OP} START_BASH "${TEST_OP}.sh" ENVS "PYTHON_EXECUTABLE=${PYTHON_EXECUTABLE}")
endforeach(TEST_OP)
7 changes: 1 addition & 6 deletions python/edl/tests/unittests/dist_reader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
import time
import os
import atexit

from edl.utils.global_vars import *
from edl.utils.etcd_test_base import EtcdTestBase
from edl.utils.global_vars import *


class TestDistReader(EtcdTestBase):
Expand Down
5 changes: 2 additions & 3 deletions python/edl/tests/unittests/etcd_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
import time
import unittest
from edl.discovery.etcd_client import EtcdClient
import time
import threading

from edl.utils import string_utils


Expand Down
14 changes: 4 additions & 10 deletions python/edl/tests/unittests/etcd_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
import time
import threading
import os
import sys
import copy
import atexit

from edl.utils import env as edl_env
import edl.utils.log_utils as log_utils
import edl.utils.constants as constants
from edl.utils.etcd_db import get_global_etcd
import edl.utils.log_utils as log_utils
import os
import unittest
from edl.discovery.etcd_client import EtcdClient
from edl.utils import env as edl_env

g_etcd_endpoints = "127.0.0.1:2379"

Expand Down
13 changes: 5 additions & 8 deletions python/edl/tests/unittests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import unittest
import six

from edl.utils import constants
from edl.utils import resource_pods
import sys
import unittest
from edl.tests.unittests import etcd_test_base
from edl.utils import cluster as edl_cluster


class TestPod(etcd_test_base.EtcdTestBase):
class TestCluster(etcd_test_base.EtcdTestBase):
def setUp(self):
super(TestPod, self).setUp("test_cluster")
super(TestCluster, self).setUp("test_cluster")

def test_pod(self):
def test_cluster_basic(self):
cluster = edl_cluster.Cluster()

cluster2 = edl_cluster.Cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
from edl.discovery.etcd_client import EtcdClient
import time
import threading
import sys
import copy
import atexit

from edl.utils.pod_server import PodServer
from edl.utils.pod import Pod
from edl.utils.pod_server_client import PodServerClient
from edl.utils.exceptions import EdlBarrierError
from edl.utils import status as edl_status
from edl.utils import constants
import edl.utils.cluster as edl_cluster
from edl.utils.etcd_db import get_global_etcd
from edl.utils.leader_register import LeaderRegister
import unittest
from edl.tests.unittests import etcd_test_base
from edl.utils import constants
from edl.utils import status as edl_status
from edl.utils.exceptions import EdlBarrierError
from edl.utils import pod as edl_pod
from edl.utils import pod_server
from edl.utils import pod_server_client
from edl.utils import cluster_generator


class TestGenerate(etcd_test_base.EtcdTestBase):
class TestClusterGenerator(etcd_test_base.EtcdTestBase):
def setUp(self):
super(TestGenerate, self).setUp("test_generate")
super(TestClusterGenerator, self).setUp("test_cluster_generator")

def register_pod(self, job_env):
pod = Pod()
def _register_pod(self, job_env):
pod = edl_pod.Pod()
pod.from_env(job_env)
s = PodServer(self._job_env, pod)
s.start()
server = pod_server.PodServer(self._job_env, pod)
server.start()
self._etcd.set_server_permanent(constants.ETCD_POD_RESOURCE,
pod.get_id(), pod.to_json())
self._etcd.set_server_permanent(constants.ETCD_POD_STATUS,
Expand All @@ -50,49 +41,47 @@ def register_pod(self, job_env):
self._etcd.get_full_path(constants.ETCD_POD_RESOURCE,
pod.get_id()))

edl_status.save_pod_status_to_etcd(self._etcd,
pod.get_id(),
edl_status.Status.INITIAL)
edl_status.save_pod_status_to_etcd(
self._etcd, pod.get_id(), edl_status.Status.INITIAL, timeout=15)
print("set permanent:", self._etcd.get_full_path(
constants.ETCD_POD_STATUS, pod.get_id()))

return pod, s
return pod, server

def test_server(self):
pod_0, server_0 = self.register_pod(self._job_env)
def test_barrier(self):
pod_0, server_0 = self._register_pod(self._job_env)
self._etcd.set_server_permanent(constants.ETCD_POD_RANK,
constants.ETCD_POD_LEADER,
pod_0.get_id())
print("set permanent:", self._etcd.get_full_path(
constants.ETCD_POD_RANK, constants.ETCD_POD_LEADER))

pod_1, server_1 = self.register_pod(self._job_env)
pod_1, server_1 = self._register_pod(self._job_env)

generater = cluster_generator.Generator(self._job_env, pod_0.get_id())
ret = generater.start()
generater.start()

cluster_0 = None
clsuter_1 = None
try:
c = PodServerClient(pod_0.endpoint)
cluster_0 = c.barrier(
client = pod_server_client.Client(pod_0.endpoint)
cluster_0 = client.barrier(
self._job_env.job_id, pod_0.get_id(), timeout=0)

self.assertNotEqual(cluster_0, None)
except EdlBarrierError as e:
pass
except:
sys.exit(1)
finally:
generater.stop()

try:
cluster_1 = c.barrier(
cluster_1 = client.barrier(
self._job_env.job_id, pod_1.get_id(), timeout=15)
except:
generater.stop()
sys.exit(1)
finally:
generater.stop()

generater.stop()
self.assertNotEqual(cluster_1, None)


Expand Down
63 changes: 63 additions & 0 deletions python/edl/tests/unittests/test_leader_pod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import unittest
from edl.tests.unittests import etcd_test_base
from edl.utils import constants
from edl.utils import leader_pod
from edl.utils import pod as edl_pod
from edl.utils import resource_pods
from edl.utils import cluster_generator


class TestLeaderPod(etcd_test_base.EtcdTestBase):
def setUp(self):
super(TestLeaderPod, self).setUp("test_leader_pod")

def _add_pod(self):
pod = edl_pod.Pod()
pod.from_env(self._job_env)
resource_register = resource_pods.Register(
self._job_env,
pod_id=pod.pod_id,
pod_json=pod.to_json(),
ttl=constants.ETCD_TTL)
generator = cluster_generator.Generator(self._job_env, pod.pod_id)
leader_register = leader_pod.Register(
self._job_env, pod.pod_id, cluster_generator=generator)

return (pod, leader_register, resource_register)

def test_seize_leader(self):
pod0, leader_register0, resource_register0 = self._add_pod()
time.sleep(constants.ETCD_TTL)
pod1, leader_register1, resource_register1 = self._add_pod()

leader_id = leader_pod.get_pod_leader_id(self._etcd, timeout=15)
self.assertEqual(pod0.pod_id, leader_id)

leader_register0.stop()
time.sleep(constants.ETCD_TTL)

leader_id = leader_pod.get_pod_leader_id(self._etcd, timeout=15)
self.assertEqual(pod1.pod_id, leader_id)
leader_register1.stop()

resource_register0.stop()
resource_register1.stop()


if __name__ == '__main__':
unittest.main()
4 changes: 0 additions & 4 deletions python/edl/tests/unittests/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import unittest
import six

from edl.utils import constants
from edl.utils import resource_pods
from edl.tests.unittests import etcd_test_base
from edl.utils import pod as edl_pod

Expand Down
Loading