Skip to content

Conversation

@LiYuRio
Copy link
Contributor

@LiYuRio LiYuRio commented Dec 28, 2021

PR types

Others

PR changes

Others

Describe

  • 支持一个FleetExecutor管理多个carrier

截屏2021-12-28 下午4 20 14

@paddle-bot-old
Copy link

Thanks for your contribution!
Please wait for the result of CI firstly. See Paddle CI Manual for details.

@LiYuRio LiYuRio changed the title Support multi carrier [Fleet Executor] Support multi carrier Dec 28, 2021
namespace paddle {
namespace distributed {

template <typename KeyT, typename ValueT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这样的话,一个type到另一个type的组合含义是固定的?
如果以后需要interceptor id到thread id,carrier id到stream id的等其他int64到int64的映射,如何复用这个类呢?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有todo,下一个pr会改

Copy link
Contributor

@FeixLiu FeixLiu Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你是说用InterceptorId的structure� 来代替int 64那个todo?这样如果同时存在interceptor id到thread id,interceptor id到carrier id的映射怎么处理?还是说未来会更新这个global map类,把TODO放在这个类里?

Copy link
Contributor Author

@LiYuRio LiYuRio Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没有这个场景,而且要是有的话也可以加ThreadID和CarrierID

@FeixLiu FeixLiu requested a review from wangxicoding December 30, 2021 01:07
<< " to interceptor " << dst_id << ", which are in the same ranks.";
return EnqueueInterceptorMessage(msg);
int64_t carrier_id = *GlobalMap<int64_t, int64_t>::Get(dst_id);
return GlobalMap<int64_t, Carrier>::Get(carrier_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不同carrier没有互通的必要吧

Copy link
Contributor Author

@LiYuRio LiYuRio Dec 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这是因为如果src interceptor和dst interceptor对应不同的carrier,但这两个carrier在相同的rank下,src interceptor里调用的是自己carrier的send,这里不处理的话会出问题。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,不过我觉得不同carrier没有互通的必要,你这个场景肯定是src和dst有关联,那么应该属于一个carrier。不同carrier可以对应不同program,比如训练和预测program,跑一个epoch训练再预测一下,时间和关系上是相互独立的。

msg_bus_->IsInit(), true,
platform::errors::Unavailable("MessageBus has not been init yet."));
GetCarrier()->Start();
for (const auto& item : runtime_graph_->carrier_id_to_interceptor_ids()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同时跑多个carrier吗

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cpu carrier与gpu carrier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是想同时启动起来的,不能运行可以等在那

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

一个时刻应该只跑一个carrier吧,同时跑应该是通过graph的拓扑依赖,两个拓扑依赖的节点可以同时跑

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里应该跑一个carrier,然后不同Program对应不同的carrier,通过外面的接口选择具体是跑哪个carrier


private:
static std::unique_ptr<ValueT>* GetPPtr(KeyT id) {
static std::mutex mutex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

目前carrier使用场景都是先初始化固定好的,没有读写并发的情况下,可以不用加锁。当然也不排除之后carrier也是动态的

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个global map除了用在多carrier,interceptor id 到 carrier id也有一个map,这里会有并发的情况,就统一了一下,不过现在没有动态增删interceptor也可以把这个map的建立放到初始化里。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

后面再增加一个没有锁的类吧,这样都加锁确实开销很大

if (request->ctrl_message()) {
carrier_id = 0;
} else {
carrier_id = *GlobalMap<int64_t, int64_t>::Get(request->dst_id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里获取了carrier,前面Carrier中不用重新获取了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个和前面carrier的那个场景不一样,这是不同rank的消息收发,前面是相同rank不同carrier的消息收发

// TODO(liyurui): Remove this hard code.
int64_t carrier_id;
if (request->ctrl_message()) {
carrier_id = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

barrier那个逻辑应该可以直接放到这来了

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

把message bus的instance传下来?把message bus也放global map里好像也行

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,不过这样也有点麻烦了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

barrier那个逻辑应该可以直接放到这来了

是的

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用global map的话是不是carrier也不用存message bus的instance了,直接存id就行。

// NOTE: need Init msg_bus after carrier SetMsgBus
carrier->Init(0, interceptor_id_to_rank, {0});
msg_bus->Init(0, {{0, ip0}, {1, ip1}}, ip0);
carrier->SetMsgBus(msg_bus);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这行重复了

carrier.SetInterceptor(5, InterceptorFactory::Create("Compute", 5, node_f));
carrier->SetInterceptor(0, InterceptorFactory::Create("Compute", 0, node_a));
carrier->SetInterceptor(1,
InterceptorFactory::Create("Amplifier", 1, node_b));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

换行不整齐了🥺

Copy link
Contributor

@wangxicoding wangxicoding left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wangxicoding wangxicoding merged commit 3658405 into PaddlePaddle:develop Dec 30, 2021
@LiYuRio LiYuRio deleted the dev_multi_carriers branch December 31, 2021 09:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

3 participants