-
Couldn't load subscription status.
- Fork 5.9k
Rename rpc ops #7947
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rename rpc ops #7947
Conversation
| cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS send_op listen_and_serv_op sum_op executor) | ||
| else() | ||
| set(DEPS_OPS ${DEPS_OPS} send_op recv_op) | ||
| set(DEPS_OPS ${DEPS_OPS} send_op recv_op listen_and_serv_op) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is listen_and_serv_op defined in the case ! WITH_DISTRIBUTE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. listen_and_serv_op is a new name of recv_op. In this PR, send_op calls AsyncSendVariable RPC call as client side and recv_op calls AsyncGetVariable.
paddle/operators/recv_op.cc Outdated
| RecvOpMaker(OpProto *proto, OpAttrChecker *op_checker) | ||
| RecvOpMaker(OpProto* proto, OpAttrChecker* op_checker) | ||
| : OpProtoAndCheckerMaker(proto, op_checker) { | ||
| AddInput("X", "(Tensor) Input tensor to be sent").AsDuplicable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"X: Input tensor to be sent" is a little confusing, do you mean X is the Vars that the send OP will fetch? Btw, X is not used in Run. Do we need this Input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No we don't, will remove.
paddle/operators/recv_op.cc Outdated
| AddAttr<std::vector<std::string>>( | ||
| "ParamList", "type list of string", | ||
| "grad->param name mapping to find which parameters to optimize.") | ||
| AddAttr<std::vector<std::string>>("endpoints", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this attribute anymore? It seems not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
| auto outs = Outputs("Out"); | ||
| std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap"); | ||
| bool do_get = Attr<bool>("DoGet"); | ||
| std::vector<std::string> endpoints = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
endpoints seems duplicate with epmap, can we remove it. And generate the endpoints vector during the iteration of epmap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe generating a new vector in each Run hurts performance, I'll keep it unchanged in this PR and add an todo comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the comment! In terms of the performance, I think we should try avoid premature optimization. If its really a performance issue, we can cache the vector. But I doubt if creating a vector every time will have any performance impact.
| p = Process(target=self.init_serv, args=(place, )) | ||
| p.daemon = True | ||
| p.start() | ||
| time.sleep(5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe change 5s to something lower (1s?). 5s per unit test is a lot of time considering how much unit tests we run everyday.
paddle/operators/send_op.cc Outdated
| auto ins = Inputs("X"); | ||
| auto outs = Outputs("Out"); | ||
| std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap"); | ||
| bool do_get = Attr<bool>("DoGet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we infer do_get from outs.size() > 0? So we don't need an additional attribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great idea! Thanks.
| set_source_files_properties(recv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) | ||
| cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS send_op recv_op sum_op executor) | ||
| op_library(listen_and_serv_op DEPS ${DISTRIBUTE_DEPS}) | ||
| set_source_files_properties(listen_and_serv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have not see listen_and_serv_op.cc, did you forgot to checkin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't find yesterday also.
| I think there is missing file from this PR, maybe we can turn on distributed flag in TeamCity CI, so that we can let CI check for us? |
| @helinwang Thanks for reviewing. Sorry that I missed to add |
| // FIXME(Yancey1989): initialize rpc server with lazy mode. | ||
| rpc_service_->SetScope(&recv_scope); | ||
| rpc_service_->SetDevCtx(&dev_ctx); | ||
| auto param_list = Attr<std::vector<std::string>>("ParamList"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to remove ParamList and GradList attribute. It's only used for printing log:
VLOG(3) << "received grad: " << grad_var_name << " updating param: " << param_var_name; I think listen and serve OP should be something more general: accepts any input specified by the send OP, run the predefined block, serves any requested variable by the recv OP. So we probably don't need this gradient related log anymore.
| << " updating param: " << param_var_name; | ||
| | ||
| if (fan_in > 1) { | ||
| grad_var_name = this->GetGradVarNameForTrainer(grad_var_name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker for merging this PR:
I think the transpiler should handle this naming logic (e.g., the gradient for variable W will be W.trainer.0, and W.trainer.1 on trainer 0 and trainer 1 respectively), rather than baked in the listen and serve operator.
If the trainspiler handles this naming logic, we don't need the fan_in variable anymore: the operator can just wait all the vars being received, and run the optimize block. In this way the retry could be handled better as well: any retry will just override the previous sent variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think name the ProgramDesc maybe better than name the variable.
- Name the variable need to hold the relationship between original and the expansion.It should be held in trainer and pserver also.It means more memory need.
What we want to do is distinguish where the variables come from, so a unique name attribute of send_op can do this.The send_op can send a variable with a ProgramDesc name.
And fan_in is useful when scaling down.
- The Kubernetes Job is a set, and we don't know which trainer will be killed when we set smaller Job's parallelism.
fan_inis a count and doesn't care who it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Kubernetes Job is a set, and we don't know which trainer will be killed when we set smaller Job's parallelism.fan_in is a count and doesn't care who it is.
We can achieve scaling down without fan_in as well. If we don't change fan_in, then the trainer will block because of the scale down. If we change fan_in, that means a new ProgramDesc is running on the pserver, we can do whatever we want in the new ProgramDesc, not just limited to changing fan_in only.
I think name the ProgramDesc maybe better than name the variable.
Sorry what do you mean by "name the ProgramDesc"? And why naming the variable needs more memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @helinwang
I think the transpiler should handle this naming logic (e.g., the gradient for variable W will be W.trainer.0, and W.trainer.1 on trainer 0 and trainer 1 respectively), rather than baked in the listen and serve operator.
That's exactly what I was thinking how to remove the executor create_var argument. I'll create an issue to do that in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@helinwang Can we do this (including remove ParamList and GradList attribute.) in another PR for simpler review. I'm writing it currently on another branch now.
… rename_rpc_ops
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
This is part job of #6508
After this PR we can let users define very extensible distributed program using python.