Skip to content

Conversation

@typhoonzero
Copy link
Contributor

This is part job of #6508

After this PR we can let users define very extensible distributed program using python.

cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS send_op listen_and_serv_op sum_op executor)
else()
set(DEPS_OPS ${DEPS_OPS} send_op recv_op)
set(DEPS_OPS ${DEPS_OPS} send_op recv_op listen_and_serv_op)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is listen_and_serv_op defined in the case ! WITH_DISTRIBUTE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. listen_and_serv_op is a new name of recv_op. In this PR, send_op calls AsyncSendVariable RPC call as client side and recv_op calls AsyncGetVariable.

RecvOpMaker(OpProto *proto, OpAttrChecker *op_checker)
RecvOpMaker(OpProto* proto, OpAttrChecker* op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X", "(Tensor) Input tensor to be sent").AsDuplicable();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"X: Input tensor to be sent" is a little confusing, do you mean X is the Vars that the send OP will fetch? Btw, X is not used in Run. Do we need this Input?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we don't, will remove.

AddAttr<std::vector<std::string>>(
"ParamList", "type list of string",
"grad->param name mapping to find which parameters to optimize.")
AddAttr<std::vector<std::string>>("endpoints",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this attribute anymore? It seems not used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

auto outs = Outputs("Out");
std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
bool do_get = Attr<bool>("DoGet");
std::vector<std::string> endpoints =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

endpoints seems duplicate with epmap, can we remove it. And generate the endpoints vector during the iteration of epmap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe generating a new vector in each Run hurts performance, I'll keep it unchanged in this PR and add an todo comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the comment! In terms of the performance, I think we should try avoid premature optimization. If its really a performance issue, we can cache the vector. But I doubt if creating a vector every time will have any performance impact.

p = Process(target=self.init_serv, args=(place, ))
p.daemon = True
p.start()
time.sleep(5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change 5s to something lower (1s?). 5s per unit test is a lot of time considering how much unit tests we run everyday.

auto ins = Inputs("X");
auto outs = Outputs("Out");
std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
bool do_get = Attr<bool>("DoGet");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we infer do_get from outs.size() > 0? So we don't need an additional attribute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great idea! Thanks.

set_source_files_properties(recv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS send_op recv_op sum_op executor)
op_library(listen_and_serv_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(listen_and_serv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have not see listen_and_serv_op.cc, did you forgot to checkin?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find yesterday also.

@helinwang
Copy link
Contributor

helinwang commented Jan 29, 2018

I think there is missing file from this PR, maybe we can turn on distributed flag in TeamCity CI, so that we can let CI check for us?

@typhoonzero
Copy link
Contributor Author

@helinwang Thanks for reviewing. Sorry that I missed to add listen_and_serv_op.

// FIXME(Yancey1989): initialize rpc server with lazy mode.
rpc_service_->SetScope(&recv_scope);
rpc_service_->SetDevCtx(&dev_ctx);
auto param_list = Attr<std::vector<std::string>>("ParamList");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to remove ParamList and GradList attribute. It's only used for printing log:

VLOG(3) << "received grad: " << grad_var_name << " updating param: " << param_var_name; 

I think listen and serve OP should be something more general: accepts any input specified by the send OP, run the predefined block, serves any requested variable by the recv OP. So we probably don't need this gradient related log anymore.

<< " updating param: " << param_var_name;

if (fan_in > 1) {
grad_var_name = this->GetGradVarNameForTrainer(grad_var_name);
Copy link
Contributor

@helinwang helinwang Jan 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker for merging this PR:

I think the transpiler should handle this naming logic (e.g., the gradient for variable W will be W.trainer.0, and W.trainer.1 on trainer 0 and trainer 1 respectively), rather than baked in the listen and serve operator.
If the trainspiler handles this naming logic, we don't need the fan_in variable anymore: the operator can just wait all the vars being received, and run the optimize block. In this way the retry could be handled better as well: any retry will just override the previous sent variable.

Copy link
Contributor

@gongweibao gongweibao Jan 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think name the ProgramDesc maybe better than name the variable.

  • Name the variable need to hold the relationship between original and the expansion.It should be held in trainer and pserver also.It means more memory need.

What we want to do is distinguish where the variables come from, so a unique name attribute of send_op can do this.The send_op can send a variable with a ProgramDesc name.

And fan_in is useful when scaling down.

  • The Kubernetes Job is a set, and we don't know which trainer will be killed when we set smaller Job's parallelism.fan_in is a count and doesn't care who it is.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Kubernetes Job is a set, and we don't know which trainer will be killed when we set smaller Job's parallelism.fan_in is a count and doesn't care who it is.

We can achieve scaling down without fan_in as well. If we don't change fan_in, then the trainer will block because of the scale down. If we change fan_in, that means a new ProgramDesc is running on the pserver, we can do whatever we want in the new ProgramDesc, not just limited to changing fan_in only.

I think name the ProgramDesc maybe better than name the variable.

Sorry what do you mean by "name the ProgramDesc"? And why naming the variable needs more memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @helinwang

I think the transpiler should handle this naming logic (e.g., the gradient for variable W will be W.trainer.0, and W.trainer.1 on trainer 0 and trainer 1 respectively), rather than baked in the listen and serve operator.

That's exactly what I was thinking how to remove the executor create_var argument. I'll create an issue to do that in another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@helinwang Can we do this (including remove ParamList and GradList attribute.) in another PR for simpler review. I'm writing it currently on another branch now.

Copy link
Contributor

@helinwang helinwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

3 participants