Skip to content

Commit ce9623e

Browse files
authored
Merge branch 'develop' into cpu_context
2 parents b98addd + 8784ec6 commit ce9623e

File tree

70 files changed

+2896
-840
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+2896
-840
lines changed

paddle/fluid/distributed/fleet_executor/dist_model.cc

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include <glog/logging.h>
1616

1717
#include "paddle/fluid/distributed/fleet_executor/dist_model.h"
18+
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
19+
#include "paddle/fluid/distributed/fleet_executor/task_node.h"
1820
#include "paddle/fluid/framework/block_desc.h"
1921
#include "paddle/fluid/framework/naive_executor.h"
2022
#include "paddle/fluid/framework/op_proto_maker.h"
@@ -68,9 +70,15 @@ bool DistModel::Init() {
6870
program_.reset(config_.program_desc);
6971
scope_.reset(config_.scope);
7072
}
73+
if (!PrepareFeedAndFetch()) {
74+
return false;
75+
}
7176
if (!CommInit()) {
7277
return false;
7378
}
79+
if (!PrepareFleetExe()) {
80+
return false;
81+
}
7482
return true;
7583
}
7684

@@ -298,6 +306,55 @@ bool DistModel::LoadParameters() {
298306
return true;
299307
}
300308

309+
bool DistModel::PrepareFleetExe() {
310+
task_node_.reset(new TaskNode(program_.get(), config_.local_rank));
311+
if (config_.local_rank - config_.mp_degree >= 0) {
312+
task_node_->AddUpstreamTask(config_.local_rank - config_.mp_degree);
313+
}
314+
if (config_.local_rank + config_.mp_degree < config_.nranks) {
315+
task_node_->AddDownstreamTask(config_.local_rank + config_.mp_degree);
316+
}
317+
task_node_->SetType("Compute");
318+
task_node_->Init();
319+
executor_desc_ = FleetExecutorDesc();
320+
executor_desc_.set_cur_rank(config_.local_rank);
321+
std::unordered_map<int64_t, int64_t> id_to_rank;
322+
for (int i = 0; i < config_.nranks; ++i) {
323+
RankInfo *rank_info = executor_desc_.add_cluster_info();
324+
rank_info->set_rank(i);
325+
rank_info->set_ip_port(config_.trainer_endpoints[i]);
326+
id_to_rank.insert({i, i});
327+
}
328+
fleet_exe.reset(new FleetExecutor(executor_desc_));
329+
fleet_exe->Init("inference", *(program_.get()), scope_.get(), place_, 1,
330+
{task_node_.get()}, id_to_rank);
331+
return true;
332+
}
333+
334+
bool DistModel::PrepareFeedAndFetch() {
335+
for (auto *op : program_->Block(0).AllOps()) {
336+
if (op->Type() == "feed") {
337+
VLOG(3) << "feed op with feed var: " << op->Output("Out")[0];
338+
int idx = BOOST_GET_CONST(int, op->GetAttr("col"));
339+
if (feeds_.size() <= static_cast<size_t>(idx)) {
340+
feeds_.resize(idx + 1);
341+
}
342+
feeds_[idx] = op;
343+
feed_names_[op->Output("Out")[0]] = idx;
344+
idx_to_feeds_[idx] = op->Output("Out")[0];
345+
} else if (op->Type() == "fetch") {
346+
VLOG(3) << "fetch op with fetch var: " << op->Input("X")[0];
347+
int idx = BOOST_GET_CONST(int, op->GetAttr("col"));
348+
if (fetches_.size() <= static_cast<size_t>(idx)) {
349+
fetches_.resize(idx + 1);
350+
}
351+
fetches_[idx] = op;
352+
id_to_fetches_[idx] = op->Input("X")[0];
353+
}
354+
}
355+
return true;
356+
}
357+
301358
void DistModel::Run(const std::vector<paddle::framework::Tensor> &input_data,
302359
std::vector<paddle::framework::Tensor> *output_data) {
303360
/* TODO(fleet exe dev): implement this funct */

paddle/fluid/distributed/fleet_executor/dist_model.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ class BlockDesc;
3232

3333
namespace distributed {
3434

35+
class TaskNode;
36+
class FleetExecutor;
37+
3538
struct DistModelConfig {
3639
std::string model_dir{};
3740
framework::ProgramDesc* program_desc{nullptr};
@@ -66,12 +69,21 @@ class DistModel {
6669
bool LoadParameters();
6770
bool PreparePlace();
6871
bool CommInit();
72+
bool PrepareFeedAndFetch();
73+
bool PrepareFleetExe();
6974
void InsertCommOp(std::string tmp_var_name, int nranks, int rank,
7075
const std::vector<std::string>& peer_endpoints,
7176
framework::BlockDesc* block, int ring_id);
7277

78+
std::vector<framework::OpDesc*> feeds_;
79+
std::map<std::string, int64_t> feed_names_;
80+
std::map<int64_t, std::string> idx_to_feeds_;
81+
std::vector<framework::OpDesc*> fetches_;
82+
std::map<int64_t, std::string> id_to_fetches_;
7383
DistModelConfig config_;
7484
FleetExecutorDesc executor_desc_;
85+
std::shared_ptr<FleetExecutor> fleet_exe;
86+
std::shared_ptr<TaskNode> task_node_;
7587
std::shared_ptr<framework::Scope> scope_;
7688
paddle::platform::Place place_;
7789
std::shared_ptr<framework::ProgramDesc> program_;

paddle/fluid/distributed/fleet_executor/fleet_executor.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ FleetExecutor::FleetExecutor(const std::string& exe_desc_str) {
3535
InitMessageBus();
3636
}
3737

38+
FleetExecutor::FleetExecutor(const FleetExecutorDesc& exe_desc)
39+
: exe_desc_(exe_desc) {
40+
// Message bus will be created and inited only once
41+
GlobalVal<MessageBus>::Create();
42+
InitMessageBus();
43+
}
44+
3845
FleetExecutor::~FleetExecutor() {
3946
for (const auto& carrier_id : carrier_ids_) {
4047
GlobalMap<std::string, Carrier>::Get(carrier_id)->Release();

paddle/fluid/distributed/fleet_executor/fleet_executor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class FleetExecutor final {
3636
public:
3737
FleetExecutor() = delete;
3838
explicit FleetExecutor(const std::string& exe_desc_str);
39+
explicit FleetExecutor(const FleetExecutorDesc& exe_desc);
3940
~FleetExecutor();
4041
void Init(const std::string& carrier_id,
4142
const framework::ProgramDesc& program_desc, framework::Scope* scope,

paddle/fluid/distributed/fleet_executor/task_node.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ TaskNode::TaskNode(paddle::framework::ProgramDesc* program, int64_t rank,
3838
task_id_ = task_node_cnt++;
3939
}
4040

41+
TaskNode::TaskNode(paddle::framework::ProgramDesc* program, int64_t rank)
42+
: program_(program), rank_(rank), task_id_(rank) {
43+
max_run_times_ = 1;
44+
max_slot_nums_ = 1;
45+
LOG(INFO)
46+
<< "Constructing TaskNode for DistModelInf. The TaskNode's id is: "
47+
<< rank
48+
<< ". And the TaskNode's max_run_time and max_slot_num will be set to 1.";
49+
}
50+
4151
void TaskNode::SetProgram(paddle::framework::ProgramDesc* program) {
4252
program_ = program;
4353
}

paddle/fluid/distributed/fleet_executor/task_node.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class TaskNode final {
4242
int64_t max_slot_nums);
4343
TaskNode(paddle::framework::ProgramDesc* program, int64_t rank,
4444
int64_t max_run_times, int64_t max_slot_nums);
45+
TaskNode(paddle::framework::ProgramDesc* program, int64_t rank);
4546
~TaskNode() = default;
4647

4748
void SetProgram(paddle::framework::ProgramDesc* program);

paddle/fluid/framework/operator.cc

Lines changed: 85 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,9 +1192,11 @@ void OperatorWithKernel::RunImpl(const Scope& scope,
11921192
platform::EventRole::kInnerOp);
11931193
if (run_pten_kernel_) {
11941194
pten::KernelContext pt_kernel_context;
1195+
// Do data transform before building KernelContext
1196+
PreparePtenData(exec_scope, *pt_kernel_, *pt_kernel_signature_,
1197+
runtime_ctx);
11951198
BuildPtenKernelContext(*runtime_ctx, dev_ctx, &pt_kernel_context);
11961199
(*pt_kernel_)(&pt_kernel_context);
1197-
WriteBackToOutputs(runtime_ctx, &pt_kernel_context);
11981200
} else {
11991201
(*kernel_func_)(
12001202
ExecutionContext(*this, exec_scope, *dev_ctx, *runtime_ctx));
@@ -1786,6 +1788,62 @@ KernelSignature OperatorWithKernel::GetExpectedPtenKernelArgs(
17861788
pten::TransToPtenKernelName(Type()));
17871789
}
17881790

1791+
Scope* OperatorWithKernel::PreparePtenData(
1792+
const Scope& scope, const pten::Kernel& pt_kernel,
1793+
const KernelSignature& pt_kernel_signature, RuntimeContext* ctx) const {
1794+
auto& input_names = std::get<0>(pt_kernel_signature.args);
1795+
auto input_defs = pt_kernel.args_def().input_defs();
1796+
PADDLE_ENFORCE_EQ(input_names.size(), input_defs.size(),
1797+
platform::errors::InvalidArgument(
1798+
"The size of inputs_args names (%d) must be equal to "
1799+
"the size of kernel input_defs (%d).",
1800+
input_names.size(), input_defs.size()));
1801+
Scope* new_scope = nullptr;
1802+
for (size_t i = 0; i < input_defs.size(); ++i) {
1803+
auto& in_def = input_defs.at(i);
1804+
auto& ins_vector = ctx->inputs.at(input_names[i]);
1805+
for (size_t offset = 0; offset < ins_vector.size(); ++offset) {
1806+
// Only tensor can be tranfer to another device.
1807+
auto* var = ins_vector[offset];
1808+
if (var == nullptr || !VarIsTensor(*var)) {
1809+
continue;
1810+
}
1811+
1812+
auto* tensor_in = GetLoDTensorOrSelectedRowsValueFromVar(*var);
1813+
if (!tensor_in->IsInitialized()) {
1814+
continue;
1815+
}
1816+
1817+
auto expected_place = pten::TransToFluidPlace(in_def.backend);
1818+
if (platform::is_same_place(tensor_in->place(), expected_place)) {
1819+
continue;
1820+
}
1821+
1822+
// TODO(zyfncg): Now there is no kernel which need to transform input
1823+
// data, so we commented out following code temporarily,
1824+
// and it will be used in the future.
1825+
1826+
// VLOG(3) << "PTen Transform Variable " << input_names[i] << " from "
1827+
// << tensor_in->place() << " to " << expected_place;
1828+
1829+
// if (!new_scope) {
1830+
// new_scope = &scope.NewScope();
1831+
// }
1832+
1833+
// // Create new var with the same name in transfer scopes
1834+
// auto* trans_var = new_scope->Var(input_names[i]);
1835+
// ins_vector[i] = trans_var;
1836+
1837+
// // Do transfer
1838+
// Tensor out;
1839+
// framework::TensorCopySync(*tensor_in, expected_place, &out);
1840+
// SetTensorToVariable(*var, out, trans_var);
1841+
}
1842+
}
1843+
1844+
return new_scope;
1845+
}
1846+
17891847
void OperatorWithKernel::BuildPtenKernelContext(
17901848
const RuntimeContext& ctx, platform::DeviceContext* dev_ctx,
17911849
pten::KernelContext* pt_kernel_context) const {
@@ -1818,7 +1876,6 @@ void OperatorWithKernel::BuildPtenKernelContext(
18181876
attr_names.size(), attr_defs.size()));
18191877

18201878
for (size_t i = 0; i < input_names.size(); ++i) {
1821-
auto& in_def = input_defs.at(i);
18221879
auto& ins_vector = ctx.inputs.at(input_names[i]);
18231880

18241881
// calcute the start and end index of the input tensors
@@ -1827,24 +1884,44 @@ void OperatorWithKernel::BuildPtenKernelContext(
18271884
size_t end_idx = start_idx + ins_vector.size();
18281885

18291886
for (size_t offset = 0; offset < ins_vector.size(); ++offset) {
1830-
pt_kernel_context->EmplaceBackInputWithoutSetRange(
1831-
experimental::MakePtenTensorBaseFromVar(*ins_vector[offset], in_def));
1887+
const framework::Tensor* tensor_in = nullptr;
1888+
auto* var = ins_vector[offset];
1889+
if (var->IsType<framework::LoDTensor>()) {
1890+
tensor_in = &(var->Get<framework::LoDTensor>());
1891+
} else {
1892+
PADDLE_THROW(platform::errors::Unimplemented(
1893+
"Unsupported input `%s` type when call pt kernel.",
1894+
framework::ToTypeName(var->Type())));
1895+
} // TODO(zyfncg): Add support for SelectedRows
1896+
1897+
pt_kernel_context->EmplaceBackInputWithoutSetRange(tensor_in);
18321898
}
18331899
pt_kernel_context->AssignInputRange(std::make_pair(start_idx, end_idx), i);
18341900
}
18351901

18361902
for (size_t i = 0; i < output_names.size(); ++i) {
1837-
auto& out_def = output_defs.at(i);
18381903
auto& outs_vector = ctx.outputs.at(output_names[i]);
18391904

18401905
size_t start_idx =
18411906
(i == 0 ? 0 : pt_kernel_context->OutputRangeAt(i - 1).second);
18421907
size_t end_idx = start_idx + outs_vector.size();
18431908

18441909
for (size_t offset = 0; offset < outs_vector.size(); ++offset) {
1845-
pt_kernel_context->EmplaceBackOutputWithoutSetRange(
1846-
experimental::MakePtenTensorBaseFromVar(outs_vector[offset],
1847-
out_def));
1910+
framework::Tensor* tensor_out = nullptr;
1911+
auto* var = outs_vector[offset];
1912+
if (var->template IsType<framework::LoDTensor>()) {
1913+
tensor_out = var->template GetMutable<framework::LoDTensor>();
1914+
} else {
1915+
PADDLE_THROW(platform::errors::Unimplemented(
1916+
"Unsupported output `%s` type when call pt kernel.",
1917+
framework::ToTypeName(var->Type())));
1918+
} // TODO(zyfncg): Add support for SelectedRows
1919+
1920+
experimental::ResetTensorByArgDef(tensor_out, output_defs.at(i));
1921+
SetAllocationForOutputTenosr(
1922+
tensor_out, pten::TransToFluidPlace(output_defs.at(i).backend));
1923+
1924+
pt_kernel_context->EmplaceBackOutputWithoutSetRange(tensor_out);
18481925
}
18491926

18501927
// Deal with the case that some outputs are NULL when run the kernel.

paddle/fluid/framework/operator.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,14 @@ class OperatorWithKernel : public OperatorBase {
588588
/* member functions for adapting to pten lib */
589589
void ChoosePtenKernel(const ExecutionContext& ctx) const;
590590

591+
/**
592+
* Transfer data place for pten kernel
593+
* Is this really needed?
594+
*/
595+
Scope* PreparePtenData(const Scope& scope, const pten::Kernel& pt_kernel,
596+
const KernelSignature& pt_kernel_signature,
597+
RuntimeContext* ctx) const;
598+
591599
void BuildPtenKernelContext(const RuntimeContext& ctx,
592600
platform::DeviceContext* dev_ctx,
593601
pten::KernelContext* pt_kernel_context) const;

paddle/fluid/framework/pten_utils.cc

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,17 +137,17 @@ KernelArgsNameMakerByOpProto::GetInputArgsNames() {
137137
auto& in = op_proto_->inputs()[i];
138138
auto& in_name = in.name();
139139
if ((in.has_extra() && in.extra()) || (in.has_quant() && in.quant())) {
140-
VLOG(3) << "Parse PtenKernel input: skip extra & quant input - "
140+
VLOG(6) << "Parse PtenKernel input: skip extra & quant input - "
141141
<< in_name;
142142
continue;
143143
}
144144
// If contains dispensable input, we should override the
145145
// GetExpectedPtenKernelArgs method self
146146
if (in.has_dispensable() && in.dispensable()) {
147-
VLOG(3) << "Parse PtenKernel input: skip dispensable input - " << in_name;
147+
VLOG(6) << "Parse PtenKernel input: skip dispensable input - " << in_name;
148148
continue;
149149
}
150-
VLOG(3) << "Parse PtenKernel input: " << in_name;
150+
VLOG(6) << "Parse PtenKernel input: " << in_name;
151151
input_names_.emplace_back(in_name);
152152
}
153153
return input_names_;
@@ -159,7 +159,7 @@ KernelArgsNameMakerByOpProto::GetOutputArgsNames() {
159159
auto& out = op_proto_->outputs()[i];
160160
auto& out_name = out.name();
161161
// TODO(chenweihang): outputs also need skip some cases
162-
VLOG(3) << "Parse PtenKernel output: " << out_name;
162+
VLOG(6) << "Parse PtenKernel output: " << out_name;
163163
output_names_.emplace_back(out_name);
164164
}
165165
return output_names_;
@@ -173,17 +173,17 @@ KernelArgsNameMakerByOpProto::GetAttrsArgsNames() {
173173
if (attr_name == "use_mkldnn" || attr_name == "op_role" ||
174174
attr_name == "op_role_var" || attr_name == "op_namescope" ||
175175
attr_name == "op_callstack" || attr_name == "op_device") {
176-
VLOG(3) << "Parse PtenKernel attribute: skip needless attr - "
176+
VLOG(6) << "Parse PtenKernel attribute: skip needless attr - "
177177
<< attr_name;
178178
continue;
179179
}
180180
if ((attr.has_extra() && attr.extra()) ||
181181
(attr.has_quant() && attr.quant())) {
182-
VLOG(3) << "Parse PtenKernel attribute: skip extra & quant attr - "
182+
VLOG(6) << "Parse PtenKernel attribute: skip extra & quant attr - "
183183
<< attr_name;
184184
continue;
185185
}
186-
VLOG(3) << "Parse PtenKernel attribute: " << attr_name;
186+
VLOG(6) << "Parse PtenKernel attribute: " << attr_name;
187187
attr_names_.emplace_back(attr_name);
188188
}
189189

@@ -196,5 +196,23 @@ KernelSignature KernelArgsNameMakerByOpProto::GetKernelSignature() {
196196
GetOutputArgsNames());
197197
}
198198

199+
void SetAllocationForOutputTenosr(pten::DenseTensor* tensor,
200+
const platform::Place& place) {
201+
if (!tensor->IsInitialized() || !(tensor->place() == place)) {
202+
int dtype_size = tensor->dtype() == DataType::UNDEFINED
203+
? 0
204+
: experimental::SizeOf(tensor->dtype());
205+
int64_t numels = product(tensor->dims());
206+
numels = numels < 0 ? 0 : numels;
207+
auto tmp_allocation_ptr = memory::Alloc(place, numels * dtype_size);
208+
auto& deleter = tmp_allocation_ptr.get_deleter();
209+
auto* allocation_ptr = tmp_allocation_ptr.release();
210+
auto shared_allocation =
211+
std::shared_ptr<pten::Allocation>(allocation_ptr, deleter);
212+
213+
tensor->ResetHolder(shared_allocation);
214+
}
215+
}
216+
199217
} // namespace framework
200218
} // namespace paddle

0 commit comments

Comments
 (0)