Skip to content

Conversation

@wangxicoding
Copy link
Contributor

@wangxicoding wangxicoding commented Nov 19, 2021

PR types

Others

PR changes

Others

Describe

Add demo compute interceptor

@paddle-bot-old
Copy link

Thanks for your contribution!
Please wait for the result of CI firstly. See Paddle CI Manual for details.

@wangxicoding wangxicoding force-pushed the add_compute_interceptor branch from 6782f96 to 2e75832 Compare November 19, 2021 08:35
@wangxicoding wangxicoding reopened this Nov 19, 2021
@wangxicoding wangxicoding force-pushed the add_compute_interceptor branch from acd19ea to 3793ee5 Compare November 19, 2021 09:46
@wangxicoding wangxicoding changed the title Add compute interceptor [fleet_executor] Add compute interceptor Nov 19, 2021
@wangxicoding wangxicoding force-pushed the add_compute_interceptor branch from 3793ee5 to d6a4958 Compare November 19, 2021 11:25
void ComputeInterceptor::Compute(const InterceptorMessage& msg) {
if (msg.message_type() == DATA_IS_READY) {
auto src_id = msg.src_id();
upstream_deps_.erase(src_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里感觉erase不太好,这个Compute会被调用很多次,它的上下游是固定的话,一次compute清空,下一次算还要加回来,感觉。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,我也觉得不太好。特别是有多个上游情况下,如果有个上游产出多次,这个还会出错。不过目前是demo,所以没那么讲究。
可能还是新建一个空的,然后填充对比比较好一些,但这样还是有上游产出多次的问题需要考虑下怎么解决。

Copy link
Contributor

@FeixLiu FeixLiu Nov 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得完全不考虑micro scope可能有问题🤨

比如C同时依赖A,B的情况,且A的运行速度比B快。需要跑两个micro steps。

time event C's upstream_deps()
0 C初始化 (A, B)
1 A的micro step 0完成,A->C: DATA_IS_READY (B)
2 A的micro step 1完成,A->C: DATA_IS_READY (B)
3 B的micro step 0完成,B->C: DATA_IS_READY (),C开始执行micro step 0
4 C重新构建upstream_deps (A, B)
5 B的micro step 1完成,B->C: DATA_IS_READY (A)

这样时间2这一刻,A给C发送的micro step1的DATA_IS_READY怎么处理?且在最后,AB都结束了两个micro steps的运行,但是C永远会在等A的第二个micro step的DATA_IS_READY。

现阶段我们的上下游依赖很简单,应该都是单依赖的。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,现在这个写的是个demo compute,后续需要有buffer作为流控,需要一个个buffer写,一个buffer写满了才能计算一个

void Interceptor::PoolTheMailbox() {
// pool the local mailbox, parse the Message
while (true) {
for (;;) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😶‍🌫️

void ComputeInterceptor::Compute(const InterceptorMessage& msg) {
if (msg.message_type() == DATA_IS_READY) {
auto src_id = msg.src_id();
upstream_deps_.erase(src_id);
Copy link
Contributor

@FeixLiu FeixLiu Nov 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得完全不考虑micro scope可能有问题🤨

比如C同时依赖A,B的情况,且A的运行速度比B快。需要跑两个micro steps。

time event C's upstream_deps()
0 C初始化 (A, B)
1 A的micro step 0完成,A->C: DATA_IS_READY (B)
2 A的micro step 1完成,A->C: DATA_IS_READY (B)
3 B的micro step 0完成,B->C: DATA_IS_READY (),C开始执行micro step 0
4 C重新构建upstream_deps (A, B)
5 B的micro step 1完成,B->C: DATA_IS_READY (A)

这样时间2这一刻,A给C发送的micro step1的DATA_IS_READY怎么处理?且在最后,AB都结束了两个micro steps的运行,但是C永远会在等A的第二个micro step的DATA_IS_READY。

现阶段我们的上下游依赖很简单,应该都是单依赖的。

int64_t max_run_times_;
int64_t max_slot_nums_;

std::string type_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是做什么的?区分interceptor种类用的?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,carrier构建用的,这段逻辑还没有加


InterceptorMessage msg;
msg.set_message_type(DATA_IS_READY);
a->Send(1, msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是不是可以给StopInterceptor加一个 finish的flag,这里wait那个flag。然后就可以delete那三个new出来的指针了?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个后续讨论一下,我觉得应该是通过析构来

Copy link
Contributor

@FeixLiu FeixLiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

TODO: add buffer

@wangxicoding wangxicoding merged commit 964e20e into PaddlePaddle:develop Nov 22, 2021
@wangxicoding wangxicoding deleted the add_compute_interceptor branch November 22, 2021 03:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

3 participants