Skip to content

Conversation

@LiYuRio
Copy link
Contributor

@LiYuRio LiYuRio commented Jan 5, 2022

PR types

Others

PR changes

Others

Describe

  • Message Bus的初始化移动到FleetExecutor构造阶段,一个FleetExecutor只有一个Message Bus,不会重复创建。
  • 和brpc远端通信相关的全部移入message bus,增加DispatchMsgToCarrier的接口将远端信息交给carrier。

截屏2022-01-06 下午2 05 31

@paddle-bot-old
Copy link

paddle-bot-old bot commented Jan 5, 2022

Thanks for your contribution!
Please wait for the result of CI firstly. See Paddle CI Manual for details.

// Message bus will be created and inited only once
GlobalVal<MessageBus>::Create();
InitMessageBus();
GlobalVal<MessageBus>::Get()->Barrier();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在carrier之前init message bus可能存在问题。message bus初始化之后就是可以收消息了,如果收到消息别的rank发来的消息的时候,本rank还没有创建carrier,这个消息怎么办?还是在init至少一个carrier之后再init message bus吧。

dst_interceptor->EnqueueRemoteInterceptorMessage(interceptor_message);
}
PADDLE_ENFORCE_EQ(
interceptor_message.ctrl_message(), false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

要不之后把Barrier单独拎一个service出来

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得可以,正在想这个问题,多carrier之后,不仅message bus需要barrier,carrier之间也需要barrier

InitMessageBus();

// Wait for all message bus connected.
msg_bus_->Barrier();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个barrier不能少吧,需要下游的Interceptor建立起来才能开始发消息。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已修改

static T value;
return &value;
static std::unique_ptr<T>* GetPPtr() {
static std::unique_ptr<T> ptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥换成了unique_ptr,而且返回unique_ptr的指针...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

换成指针是因为不想MessageBus乱复制,把它的拷贝复制那些禁掉了。换成unique_ptr是不想手动管理内存,要不还得有个Release接口。

GlobalVal<std::string>::Set(new std::string(carrier_id));
MessageBus* msg_bus = GlobalVal<MessageBus>::Create();
msg_bus->Init(0, {{0, ip0}, {1, ip1}}, ip0);
msg_bus->Barrier();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个Barrier得在Interceptor建立好后加吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

啊,单测忘看了

GlobalVal<std::string>::Set(carrier_id);
Carrier* carrier = GlobalMap<std::string, Carrier>::Get(carrier_id);
// Set current running carrier
if (*GlobalVal<std::string>::Get() != carrier_id) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这么写好像有个假设条件,所有Carrier都会同时切换/不切换,要不就会hang在那里。

Copy link
Contributor

@wangxicoding wangxicoding left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wangxicoding wangxicoding merged commit 769e5bc into PaddlePaddle:develop Jan 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

3 participants