- Notifications
You must be signed in to change notification settings - Fork 5.9k
Cpp parallel executor #9080
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cpp parallel executor #9080
Conversation
doc/design/parallel_executor.md Outdated
| opt = fluid.optimizer.SGDOptimizer() | ||
| opt.minimize(avg_cost) | ||
| | ||
| # change Executor -> ParallelExecutor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can let the user still use the Executor interface, and add an optional argument "gpu_list", and underlying if there are multiple GPUs available (either len(gpu_list) > 0, or gpu_list == None and multiple GPUs initialized), create and return the parallel executor instance.
doc/design/parallel_executor.md Outdated
| // e.g. sgd should wait for allreduce to be finished | ||
| CallBack->BeforeOp(op); | ||
| | ||
| op->Run(*local_scope, place_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, the reason we need callback is ParallelExecutor will call Executor::Run, but need to be notified before and after each OP::Run. Do we even need the Executor implementation anymore? Maybe we can consolidate them into a single executor, so that we don't need the callback anymore.
And it will be easier for the Python side, Python always create the same executor.
| std::vector<OpHandle *> to_run; | ||
| for (auto *var : to_remove) { | ||
| for (auto *op : var->pending_ops_) { | ||
| if (var->name_ == "mean_0.tmp_0@GRAD") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the purpose of this special case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just debug code... Sorry
| struct OpHandle { | ||
| std::vector<VarHandle *> inputs_; | ||
| std::vector<VarHandle *> outputs_; | ||
| platform::DeviceContext *dev_ctx_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add framework::Scope* scope_?
| } | ||
| | ||
| std::vector<LoDTensor> ParallelExecutor::Run( | ||
| const std::vector<std::string> &fetch_tensors) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instantiate Variables here?
Paddle/paddle/fluid/framework/executor.cc
Lines 276 to 305 in 41894da
| Scope* local_scope = scope; | |
| if (create_vars) { | |
| if (create_local_scope) { | |
| local_scope = &scope->NewScope(); | |
| for (auto& var : block.AllVars()) { | |
| if (var->Name() == framework::kEmptyVarName) { | |
| continue; | |
| } | |
| if (var->Persistable()) { | |
| auto* ptr = scope->Var(var->Name()); | |
| CreateTensor(ptr, var->GetType()); | |
| VLOG(3) << "Create Variable " << var->Name() | |
| << " global, which pointer is " << ptr; | |
| } else { | |
| auto* ptr = local_scope->Var(var->Name()); | |
| CreateTensor(ptr, var->GetType()); | |
| VLOG(3) << "Create Variable " << var->Name() | |
| << " locally, which pointer is " << ptr; | |
| } | |
| } | |
| } else { | |
| for (auto& var : block.AllVars()) { | |
| auto* ptr = local_scope->Var(var->Name()); | |
| CreateTensor(ptr, var->GetType()); | |
| VLOG(3) << "Create variable " << var->Name() << ", which pointer is " | |
| << ptr; | |
| } | |
| } // if (create_local_scope) | |
| } // if (create_vars) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We need to instantiate variables here. We might extract this routine to a global function.
| } | ||
| }; | ||
| | ||
| member_->pool_.Run(op_run); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we should add a callback after we push operator run job to memory pool. In this callback, we change pending_var state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pending_var state has been changed during L404-L406
| member_->local_scopes_.size() != 1) { // Is CUDA | ||
| BuildNCCLCommunicator(); | ||
| BCastParamsToGPUs(startup_program); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do not initialize parameters on the respective devices?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since randomize result might not same when seed = 0
… cpp_parallel_executor
| void RunOp(std::unordered_map<VarHandleBase*, bool>& pending_vars, | ||
| OpHandle* op) const; | ||
| | ||
| void PolishGraphToSupportDataHarzaeds() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataHarzaeds --> DataHazards
17f0491 to c0c2e15 Compare 9441175 to 201f79d Compare There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Since the operators in a sub-block will be executed by a control flow operator, e.g., While. The behaviour between control flow operators and computational operators should be same.
… cpp_parallel_executor
…o cpp_parallel_executor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have finished my review. @chengduoZH Let's verify the correctness and speed of transfomer and resnext. If they are OK, let's merge it soon so that everyone can start improving it
| namespace details { | ||
| | ||
| struct FetchOpHandle : public OpHandleBase { | ||
| FeedFetchList *data_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a "private:" then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Can you add your reply as comments in the code?
| // if size is 0. We just make sure it does. | ||
| if (size <= 0) return nullptr; | ||
| void* p; | ||
| int prev_id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest this to be a PADDLE_ENFORCE. if the current behavior works. Otherwise, reader will think the Allocator currently works on multiple GPUs.
| PADDLE_THROW("Nobody should wait FetchOp. Unexpceted Error"); | ||
| } | ||
| | ||
| void FetchOpHandle::WaitAndMergeCPUTensors() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean really wait on anything? Maybe just MergeCPUTensors?
| // FIXME: Currently ScaleLossGradOp only use device_count as scale | ||
| // factor. So it does not depend on any other operators. | ||
| // VarHandle *loss = GetVarHandle(loss_var_name, place); | ||
| // loss->pending_ops_.emplace_back(op_handle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this op doesn't not depend on anything, when will it be scheduled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be run at the first time.
| * | ||
| * https://en.wikipedia.org/wiki/Hazard_(computer_architecture)#Write_after_read_(WAR) | ||
| */ | ||
| static void PolishGraphToSupportDataHazards(SSAGraph *graph); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add more comments to describe when does data hazard happens?
| std::unordered_set<OpHandleBase *> ready_ops; | ||
| | ||
| auto InsertPendingVar = [&pending_vars, &ready_vars](VarHandleBase &var) { | ||
| pending_vars.insert(&var); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be skipped if generated_op_ is nullptr?
| PADDLE_ENFORCE_EQ(actual.size(), expect.size()); | ||
| for (int j = 0; j < actual.size(); ++j) { | ||
| PADDLE_ENFORCE(actual[i] == expect[i] || expect[i] == -1); | ||
| // PADDLE_ENFORCE(actual[i] == expect[i] || expect[i] == -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update this line?
| // Create local scopes. | ||
| for (auto &scope : local_scopes_) { | ||
| auto &local_scope = scope->NewScope(); | ||
| *scope->Var("@TMP_SCOPE@")->GetMutable<Scope *>() = &local_scope; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are all the scopes using the same var name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When Op::Run, there are some temporary variables will be created in local scopes. So, here just use Variable @TMP_SCOPE@ to holds these temporary variables. They will be destroied after a period.
| auto *dep_var = new DummyVarHandle(); | ||
| read_op->AddOutput(dep_var); | ||
| write_op->AddInput(dep_var); | ||
| graph->dep_vars_.emplace(dep_var); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be called data_hazard_vars_?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments have been added.
… cpp_parallel_executor
I just add a dependency engine to parse the dependencies of operators. There are still a lot of jobs need to be done.
Complete Broadcast parameters.Use thread pool to invoke operator parallelly.Complete NCCL AllReduce OpHandle in this implementation.I just use
VarHandleandOpHandleto parseProgramas a SSA form graph. A variable is assigned by only one OpHandle. When all inputs ofOpHandleis ready, theOpHandlecan be run.The speed of ResNeXt152 is