Skip to content

Commit 1c8410a

Browse files
authored
Add register of edl. (#132)
1 parent 6678fcc commit 1c8410a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1853
-1084
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ set(EDL_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR})
44
set(EDL_BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR})
55
SET(EDL_INSTALL_DIR ${CMAKE_BINARY_DIR}/output)
66
SET(CMAKE_INSTALL_RPATH "$ORIGIN" "${CMAKE_INSTALL_RPATH}")
7-
project(paddle_edl)
7+
project(edl)
88

99
option(WITH_TESTING "Compile EDL with unit testing" ON)
1010
option(WITH_COVERAGE "Compile EDL with code coverage" OFF)

python/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ file(GLOB_RECURSE EDL_FILES collective/*.py demo/*.py demo/*.sh discovery/*.py d
22
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.in ${CMAKE_CURRENT_BINARY_DIR}/setup.py)
33
add_custom_command(
44
OUTPUT ${EDL_BINARY_DIR}/.timestamp
5-
COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/paddle_edl ${EDL_BINARY_DIR}/python/
5+
COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/edl ${EDL_BINARY_DIR}/python/
66
COMMAND python3.6 ./setup.py bdist_wheel --universal
77
DEPENDS ${EDL_FILES})
88
add_custom_target(edl_python ALL DEPENDS ${EDL_BINARY_DIR}/.timestamp)
9-
add_subdirectory(paddle_edl/tests/unittests)
9+
add_subdirectory(edl/tests/unittests)
File renamed without changes.
File renamed without changes.
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from six.moves.queue import Queue
16+
from threading import Lock, Thread
17+
import paddle.fluid as fluid
18+
from .dataset import FileSplitter
19+
from ..utils import data_server
20+
from ..utils.edl_env import TrainerEnv
21+
from ..utils import unique_name
22+
from ..utils.watcher import get_data_reader_leader
23+
import uuid
24+
25+
26+
class Record(object):
27+
def __init__(self, idx, data):
28+
# idx in a file
29+
self._idx = idx
30+
self._data = data # data is ()
31+
32+
33+
class FileMeta(object):
34+
def __init__(self, idx, path):
35+
self._idx = idx
36+
self._path = path
37+
38+
39+
class BatchData(object):
40+
def __init__(self, data_reader_id, b_id):
41+
self._data_reader_id = data_reader_id
42+
self._id = b_id
43+
# FileMeta->Records
44+
self._batch = {}
45+
self._size = None
46+
47+
def split_meta_and_data(object):
48+
b = BatchData(self.data_reader_id, self._id)
49+
b._size = self._size
50+
51+
a = []
52+
for fmeta, recs in self._batch:
53+
rs = []
54+
for rec in recs:
55+
r = Record(rec._idx, None)
56+
a.append(rec.data)
57+
rs.append(r)
58+
b._batch[fmeta] = rs
59+
60+
return b, a
61+
62+
63+
class DataCheckpoint(object):
64+
def __init__(self):
65+
# file_idx=>set(record_idx)
66+
self._restored_records = {}
67+
#self._file_idxs = {}
68+
self._restored_from = None
69+
70+
def save_checkpoint(self, path, batch_datas):
71+
pass
72+
73+
def load_checkpoint(self, path):
74+
pass
75+
76+
def is_processed(self, file_idx, path, record_idx):
77+
if file_idx not in self._restored_records:
78+
return False
79+
80+
rec_idxs = self._restored_records[file_idx]
81+
if record_idx not in rec_idxs:
82+
return False
83+
84+
return True
85+
86+
87+
class FileCache(object):
88+
def __init__(self, capcity=100):
89+
"""
90+
capcity: GB
91+
"""
92+
pass
93+
94+
def download(self, src_path, dst_path):
95+
pass
96+
97+
def _clean_up(self):
98+
pass
99+
100+
101+
class DistributedDataReader(object):
102+
def __init__(self,
103+
file_list,
104+
file_splitter_cls,
105+
splitted_data_field,
106+
batch_size,
107+
capcity=100):
108+
"""
109+
file_splitter_cls is the class name of dataset.See example in dataset.py
110+
file_list is the input data file list and it should be get by loader.For example, all data
111+
splitted_data_field: the file_splitter_cls's result field name by order
112+
file is on local or on hdfs.
113+
This class:
114+
1. get data file list from the leader.
115+
2. parse records from reader_cls' object.
116+
3. if there's no data local, try to pull data from other dataserver or raise StopIteration.
117+
118+
capcity: cached batch num
119+
120+
__next__: return meta, (splitted_data_field data)
121+
"""
122+
123+
self._id = str(uuid.uuid1())
124+
self._name = unique_name.generate("_datareader_")
125+
126+
#BatchData
127+
self._data_queue = Queue(capcity)
128+
129+
self._lock = Lock()
130+
self._file_list = file_list
131+
self._splitter_cls = file_splitter_cls
132+
self._leader = get_data_reader_leader()
133+
134+
self._data_checkpoint = DataCheckpoint()
135+
self._data_checkpoint.load_checkpoint(checkpoint_path)
136+
self._reach_end = False
137+
self._cache = {}
138+
self._file_cache = FileCache()
139+
self._b_id = 0
140+
self._env = TrainerEnv()
141+
142+
assert type(file_splitter_cls) == FileSplitter
143+
144+
self._register = DataReaderRegister(
145+
etcd_endoints=self._etcd_endpoints,
146+
job_id=self._job_id,
147+
rank=self._env.rank,
148+
reader=self)
149+
150+
self._t_read_data = Thread(target=self._read_data)
151+
self._t_read_data.start()
152+
153+
self._start_data_server()
154+
155+
def get_port(self):
156+
return self._data_server.port()
157+
158+
def _start_data_server(self):
159+
"""
160+
start and register the data server
161+
"""
162+
self._data_server = data_server.DataServer(self._file_list,
163+
self._env.world_rank)
164+
165+
def _shut_down(self):
166+
self._data_server.wait()
167+
pass
168+
169+
def __iter__(self):
170+
"""
171+
get data from queue
172+
"""
173+
self._local_file_list = self._data_client._get_file_list()
174+
self._reach_end = False
175+
if self._t_read_data is None:
176+
self._t_read_data = Thread(target=self._read_data)
177+
self._t_read_data.start()
178+
179+
def __next__(self):
180+
while True:
181+
b = self._data_queue.pop()
182+
if b is None:
183+
break
184+
yield b.split_meta_and_data() # meta, data
185+
186+
self._t_read_data.join()
187+
self._t_read_data = None
188+
self._reach_end = True
189+
raise StopIteration
190+
191+
def _set_batch_data(self, meta):
192+
"""
193+
get batch data meta
194+
"""
195+
# reach end
196+
if meta is None:
197+
self._data_queue.put(None)
198+
return False
199+
200+
if meta.is_local():
201+
b = self._cache.pop(meta._id)
202+
else:
203+
b = self._data_client.get_batch_data(meta)
204+
self._data_queue.put(b)
205+
if b is None:
206+
return False
207+
return True
208+
209+
def _process_one_file(self, idx, file_path):
210+
path = self._file_cache.download(file_path)
211+
for r in enumerate(self._splitter_cls(path)):
212+
rec_no = r[0]
213+
data = r[1:]
214+
if self._data_checkpoint.is_processed(idx, path, rec_no):
215+
logger.debug(
216+
"idx:{} file:{} rec_no:{} data_len:{} already processed".
217+
format(idx, path, rec_no, len(data)))
218+
continue
219+
220+
logger.debug("read idx:{} file:{} rec_no:{} data_len:{}".format(
221+
idx, path, rec_no, len(data)))
222+
223+
yield Record(rec_no, (data))
224+
225+
def _new_batch_data(self):
226+
self._b_id += 1
227+
b = BatchData(self._id, self._b_id)
228+
return b
229+
230+
def _process_file_list(self, metas):
231+
rec_map = {}
232+
b = self._new_batch_data()
233+
size = 0
234+
for m in metas:
235+
for rec in self._process_one_file(m._idx, m._path):
236+
if m not in b._batch:
237+
b._batch[m] = []
238+
b._batch[m].append(rec)
239+
size += 1
240+
if size >= self._batch_size:
241+
b._batch._size = size
242+
yield b
243+
size = 0
244+
else:
245+
continue
246+
247+
if size > 0:
248+
yield b
249+
250+
def _read_data(self):
251+
"""
252+
read data into queue
253+
"""
254+
while True:
255+
for batch_data in self._process_file_list(self._local_file_list):
256+
self._cache[batch_data._id] = batch_data
257+
# report and then get
258+
meta = self._data_client.get_batch_data_meta()
259+
if not self._set_batch_data(meta):
260+
break
261+
continue
262+
263+
break
264+
265+
logger.info("local data process completed.")
266+
while True:
267+
meta = self._data_client.get_batch_data_meta()
268+
if not self._set_batch_data(meta):
269+
break
270+
271+
@property
272+
def endpoint(self):
273+
return "{}:{}".format(utils.get_extern_ip(), self._serer.port)
274+
275+
def get_id(self):
276+
return self._id
277+
278+
@property
279+
def name(self):
280+
return self._name

python/paddle_edl/collective/dataset.py renamed to python/edl/collective/dataset.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import multiprocessing
1717

1818

19-
class DataReader():
19+
class FileSplitter(object):
2020
"""
2121
This the interface user should inherit. It will let's the framework knows the data file it's
2222
processing.
@@ -27,17 +27,22 @@ def __init__(self, data_file):
2727
self._data_file = data_file
2828

2929
def __iter__(self):
30+
"""
31+
yield idx, record data
32+
"""
3033
raise NotImplementedError()
3134

3235

33-
class TxtDataReader(DataReader):
36+
class TxtFileSplitter(FileSplitter):
3437
def __init__(self, data_file):
35-
super(self, TxtDataSet).__init__(data_file)
38+
super(TxtFileSplitter, self).__init__(data_file)
3639

3740
def __iter__(self):
41+
idx = 0
3842
with open(self._data_file, "rb") as f:
3943
for line in f:
4044
line = line.strip()
4145
if len(line) <= 0:
4246
continue
43-
yield line
47+
idx += 1
48+
yield idx, line

0 commit comments

Comments
 (0)