Skip to content

Commit 4bcc0b6

Browse files
author
Yang Yang(Tony)
authored
[WIP] feature/parallel_gpu (#7293)
feature/parallel_gpu
1 parent df92776 commit 4bcc0b6

File tree

6 files changed

+89
-71
lines changed

6 files changed

+89
-71
lines changed

paddle/framework/lod_tensor.cc

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,19 @@ std::ostream &operator<<(std::ostream &os, const LoD &lod) {
4444
}
4545

4646
std::ostream &operator<<(std::ostream &os, const LoDTensor &t) {
47-
PADDLE_ENFORCE(platform::is_cpu_place(t.place()));
4847
PADDLE_ENFORCE(t.type().hash_code() == typeid(float).hash_code());
4948

49+
if (!platform::is_cpu_place(t.place())) {
50+
LoDTensor tt;
51+
framework::Copy(t, platform::CPUPlace(), &tt);
52+
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
53+
auto &dev_ctx = *pool.Get(t.place());
54+
dev_ctx.Wait();
55+
56+
os << tt;
57+
return os;
58+
}
59+
5060
os << "dim: " << t.dims() << "\n";
5161
os << "lod: " << t.lod() << "\n";
5262

@@ -211,67 +221,54 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor,
211221
DeserializeFromStream(is, static_cast<Tensor *>(tensor), dev_ctx);
212222
}
213223

224+
// TODO(tonyyang-svail): make this function support LoD
214225
std::vector<LoDTensor> LoDTensor::SplitLoDTensor(
215226
const std::vector<platform::Place> places) const {
216227
check_memory_size();
217-
// PADDLE_ENFORCE(lod().empty() || (lod().size() == 1 && lod()[0].empty())
218-
// , "Disable parallel lod for now");
219228
PADDLE_ENFORCE(lod().empty(), "Disable parallel lod for now");
220229
PADDLE_ENFORCE(dims()[0] % places.size() == 0,
221230
"Batch size should be divided by places size");
222231

223232
std::vector<LoDTensor> lods;
224233
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
225-
size_t begin = place_idx * dims()[0] / places.size();
226-
size_t end = (place_idx + 1) * dims()[0] / places.size();
227-
auto src = Slice(static_cast<int>(begin), static_cast<int>(end));
234+
int begin = place_idx * dims()[0] / places.size();
235+
int end = (place_idx + 1) * dims()[0] / places.size();
228236

229-
LoDTensor dst;
230-
dst.Resize(src.dims());
237+
auto src = Slice(begin, end);
231238
auto &dst_place = places[place_idx];
232-
auto dst_ptr = dst.mutable_data(dst_place, src.type());
233-
234-
// TODO(tonyyang-svail):
235-
// change the following to framework::Copy
236-
auto src_place = src.place();
237-
auto src_ptr = src.data<void>();
238-
auto size = src.numel() * SizeOfType(src.type());
239-
if (platform::is_cpu_place(src_place) &&
240-
platform::is_cpu_place(dst_place)) {
241-
memory::Copy(boost::get<platform::CPUPlace>(dst_place), dst_ptr,
242-
boost::get<platform::CPUPlace>(src_place), src_ptr, size);
243-
} else {
244-
PADDLE_THROW("Not Implemented");
245-
}
239+
LoDTensor dst;
240+
framework::Copy(src, dst_place, &dst);
246241

247242
lods.emplace_back(dst);
248243
}
249244

250245
return lods;
251246
}
252247

248+
// TODO(tonyyang-svail): make this function support LoD
253249
void LoDTensor::MergeLoDTensor(
254-
const std::vector<const LoDTensor *> &lod_tensors, platform::Place place) {
255-
PADDLE_ENFORCE(platform::is_cpu_place(place));
250+
const std::vector<const LoDTensor *> &lod_tensors,
251+
platform::Place dst_place) {
256252
PADDLE_ENFORCE(!lod_tensors.empty());
257-
258253
framework::DDim new_dim = lod_tensors[0]->dims();
259254
std::type_index new_type = lod_tensors[0]->type();
255+
auto new_layout = lod_tensors[0]->layout();
260256
for (auto *lod : lod_tensors) {
261257
PADDLE_ENFORCE(new_dim == lod->dims());
262258
PADDLE_ENFORCE(new_type == lod->type());
263-
PADDLE_ENFORCE(platform::is_cpu_place(lod->place()));
259+
PADDLE_ENFORCE(new_layout == lod->layout());
264260
}
265261
new_dim[0] *= lod_tensors.size();
266262
Resize(new_dim);
263+
set_layout(new_layout);
267264

268-
auto *dst_ptr = reinterpret_cast<uint8_t *>(mutable_data(place, new_type));
265+
mutable_data(dst_place, new_type);
266+
int begin = 0;
269267
for (auto *src : lod_tensors) {
270-
auto size = src->numel() * SizeOfType(src->type());
271-
memory::Copy(boost::get<platform::CPUPlace>(place), dst_ptr,
272-
boost::get<platform::CPUPlace>(src->place()),
273-
src->data<void>(), size);
274-
dst_ptr += size;
268+
int end = begin + src->dims()[0];
269+
auto dst = Slice(begin, end);
270+
framework::Copy(*src, dst_place, &dst);
271+
begin = end;
275272
}
276273
}
277274

paddle/framework/tensor_util.h

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@ namespace framework {
3131
*
3232
* @note Copy supports CPU <-> GPU, GPU <-> GPU.
3333
*/
34-
3534
inline void Copy(const Tensor& src, const platform::Place& dst_place,
3635
const platform::DeviceContext& ctx, Tensor* dst) {
36+
VLOG(3) << "Copy " << src.dims() << " from " << src.place() << " to "
37+
<< dst_place;
3738
src.check_memory_size();
3839

3940
dst->Resize(src.dims());
@@ -88,26 +89,25 @@ inline void Copy(const Tensor& src, const platform::Place& dst_place,
8889
}
8990

9091
/**
91-
* @brief Copy supports CPU <-> CPU
92+
* @brief Wrapper on
93+
* Copy(const Tensor& src, const platform::Place& dst_place,
94+
* const platform::DeviceContext& ctx, Tensor* dst);
95+
*
96+
* @param[in] src The external tensor.
97+
* @param[in] dst_place The dst place.
98+
*
99+
* @note Copy supports CPU <-> GPU, GPU <-> GPU.
92100
*/
93101
inline void Copy(const Tensor& src, const platform::Place& dst_place,
94102
Tensor* dst) {
95-
src.check_memory_size();
96-
dst->Resize(src.dims());
97-
dst->set_layout(src.layout());
98-
99-
auto src_place = src.place();
100-
auto src_ptr = src.data<void>();
101-
102-
auto dst_ptr = dst->mutable_data(dst_place, src.type());
103-
104-
auto size = src.numel() * SizeOfType(src.type());
105-
106-
PADDLE_ENFORCE(platform::is_cpu_place(src_place) &&
107-
platform::is_cpu_place(dst_place));
108-
109-
memory::Copy(boost::get<platform::CPUPlace>(dst_place), dst_ptr,
110-
boost::get<platform::CPUPlace>(src_place), src_ptr, size);
103+
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
104+
const platform::DeviceContext* dev_ctx;
105+
if (platform::is_gpu_place(src.place())) {
106+
dev_ctx = pool.Get(src.place());
107+
} else {
108+
dev_ctx = pool.Get(dst_place);
109+
}
110+
Copy(src, dst_place, *dev_ctx, dst);
111111
}
112112

113113
/**

paddle/framework/var_desc.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ const proto::TensorDesc &VarDesc::tensor_desc() const {
7474
case proto::VarDesc::LOD_TENSOR_ARRAY:
7575
return desc_.tensor_array().tensor();
7676
default:
77-
PADDLE_THROW("The type of var '", this->Name(), "' is unsupported.");
77+
PADDLE_THROW("The type of var %s is unsupported.", this->Name());
7878
}
7979
}
8080

paddle/operators/get_places_op.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,5 @@ class GetPlacesInferShape : public framework::InferShapeBase {
111111
namespace ops = paddle::operators;
112112

113113
REGISTER_OPERATOR(get_places, ops::GetPlacesOp, ops::GetPlacesOpProtoMaker,
114-
ops::GetPlacesInferVarType, ops::GetPlacesInferShape);
114+
ops::GetPlacesInferVarType, ops::GetPlacesInferShape,
115+
paddle::framework::EmptyGradOpMaker);

paddle/operators/parallel_do_op.cc

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ void SplitTensorAndMoveTensorToScopes(
3939
const std::vector<framework::Scope *> &sub_scopes,
4040
const std::vector<platform::Place> &places,
4141
const std::vector<std::string> &names) {
42+
PADDLE_ENFORCE_EQ(sub_scopes.size(), places.size());
4243
for (auto &argu : names) {
4344
auto *var = scope.FindVar(argu);
4445
const auto &tensor = var->Get<LoDTensor>();
@@ -54,6 +55,15 @@ void SplitTensorAndMoveTensorToScopes(
5455
}
5556
}
5657

58+
void WaitOnPlaces(const std::vector<platform::Place> places) {
59+
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
60+
61+
for (auto &place : places) {
62+
auto &dev_ctx = *pool.Get(place);
63+
dev_ctx.Wait();
64+
}
65+
}
66+
5767
class ParallelDoOp : public framework::OperatorBase {
5868
public:
5969
ParallelDoOp(const std::string &type,
@@ -71,19 +81,30 @@ class ParallelDoOp : public framework::OperatorBase {
7181
auto *block = Attr<framework::BlockDesc *>(kParallelBlock);
7282
auto *program = block->Program();
7383

74-
// TODO(tonyyang-svail): get places from input
75-
std::vector<platform::Place> places;
76-
places.emplace_back(platform::CPUPlace());
77-
places.emplace_back(platform::CPUPlace());
84+
auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>();
7885

7986
auto &sub_scopes = *scope.FindVar(Output(kParallelScopes))
8087
->GetMutable<std::vector<framework::Scope *>>();
8188
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
8289
sub_scopes.push_back(&scope.NewScope());
8390
}
8491

92+
// split input
8593
SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places,
8694
Inputs(kInputs));
95+
// copy parameter
96+
for (auto &param : Inputs(kParameters)) {
97+
PADDLE_ENFORCE(scope.FindVar(param)->IsType<LoDTensor>(),
98+
"Only support parameter type as LoDTensor");
99+
auto &src = scope.FindVar(param)->Get<LoDTensor>();
100+
for (size_t i = 0; i < places.size(); ++i) {
101+
auto &place = places[i];
102+
auto *sub_scope = sub_scopes[i];
103+
auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>();
104+
framework::Copy(src, place, dst);
105+
}
106+
}
107+
WaitOnPlaces(places);
87108

88109
std::vector<std::future<void>> workers;
89110
workers.reserve(places.size());
@@ -93,12 +114,6 @@ class ParallelDoOp : public framework::OperatorBase {
93114
auto &place = places[place_idx];
94115
auto *cur_scope = sub_scopes[place_idx];
95116

96-
// copy parameter
97-
// some version of boost lacks != for boost::variant
98-
if (!(dev_ctx.GetPlace() == place)) {
99-
PADDLE_THROW("Not Implemented");
100-
}
101-
102117
workers.emplace_back(framework::Async([program, cur_scope, place, block] {
103118
framework::Executor executor(place);
104119
executor.Run(*program, cur_scope, block->ID(),
@@ -108,6 +123,7 @@ class ParallelDoOp : public framework::OperatorBase {
108123
for (auto &worker : workers) {
109124
worker.wait();
110125
}
126+
WaitOnPlaces(places);
111127

112128
// merge output
113129
for (auto &o_name : Outputs(kOutputs)) {
@@ -121,6 +137,7 @@ class ParallelDoOp : public framework::OperatorBase {
121137
scope.FindVar(o_name)->GetMutable<LoDTensor>();
122138
lod_tensor_to_be_merged->MergeLoDTensor(lod_tensors, dev_ctx.GetPlace());
123139
}
140+
WaitOnPlaces(places);
124141
}
125142
};
126143

@@ -161,15 +178,14 @@ class ParallelDoGradOp : public OperatorBase {
161178
auto &sub_scopes = scope.FindVar(Input(kParallelScopes))
162179
->Get<std::vector<framework::Scope *>>();
163180

164-
// TODO(tonyyang-svail): get places from input
165-
std::vector<platform::Place> places;
166-
places.emplace_back(platform::CPUPlace());
167-
places.emplace_back(platform::CPUPlace());
181+
auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>();
168182

169183
// feed output@grad
170184
SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places,
171185
Inputs(framework::GradVarName(kOutputs)));
186+
WaitOnPlaces(places);
172187

188+
// for debugging
173189
for (auto &s : Inputs(framework::GradVarName(kOutputs))) {
174190
VLOG(3) << s;
175191
VLOG(3) << scope.FindVar(s)->Get<LoDTensor>();
@@ -196,10 +212,11 @@ class ParallelDoGradOp : public OperatorBase {
196212
for (auto &worker : workers) {
197213
worker.wait();
198214
}
215+
WaitOnPlaces(places);
199216

200217
// merge grad
201218
for (auto &s : Outputs(framework::GradVarName(kParameters))) {
202-
VLOG(3) << s;
219+
VLOG(3) << "merge grad " << s;
203220

204221
auto &t = sub_scopes[0]->FindVar(s)->Get<LoDTensor>();
205222
VLOG(3) << t;
@@ -216,7 +233,8 @@ class ParallelDoGradOp : public OperatorBase {
216233
auto sum_op = framework::OpRegistry::CreateOp(
217234
"sum", {{"X", {s, s_buf}}}, {{"Out", {s}}},
218235
framework::AttributeMap{});
219-
sum_op->Run(*sub_scopes[0], place);
236+
sum_op->Run(*sub_scopes[0], places[0]);
237+
WaitOnPlaces(places);
220238
}
221239

222240
VLOG(3) << t;
@@ -236,8 +254,10 @@ class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
236254
for (auto &input_param : this->InputNames()) {
237255
VLOG(3) << input_param;
238256
grad->SetInput(input_param, this->Input(input_param));
239-
grad->SetOutput(framework::GradVarName(input_param),
240-
this->InputGrad(input_param, false));
257+
if (input_param != kPlaces) {
258+
grad->SetOutput(framework::GradVarName(input_param),
259+
this->InputGrad(input_param, false));
260+
}
241261
}
242262

243263
for (auto &output_param : this->OutputNames()) {

python/paddle/v2/fluid/tests/test_parallel_op.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def setUp(self):
1818
append_batch_size=False,
1919
stop_gradient=False)
2020

21-
places = fluid.default_main_program().global_block().create_var()
21+
places = layers.get_places(device_count=4)
2222
pd = layers.ParallelDo(places=places)
2323

2424
with pd.do():

0 commit comments

Comments
 (0)