1212
1313namespace NActors {
1414
15+ class TSharedExecutorPool : public ISharedExecutorPool {
16+ public:
17+ TSharedExecutorPool (const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16 > poolsWithThreads);
18+
19+ // IThreadPool
20+ void Prepare (TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override ;
21+ void Start () override ;
22+ void PrepareStop () override ;
23+ void Shutdown () override ;
24+ bool Cleanup () override ;
25+
26+ TSharedExecutorThreadCtx *GetSharedThread (i16 poolId) override ;
27+ void GetSharedStats (i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override ;
28+ void GetSharedStatsForHarmonizer (i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override ;
29+ TCpuConsumption GetThreadCpuConsumption (i16 poolId, i16 threadIdx) override ;
30+ std::vector<TCpuConsumption> GetThreadsCpuConsumption (i16 poolId) override ;
31+
32+ i16 ReturnOwnHalfThread (i16 pool) override ;
33+ i16 ReturnBorrowedHalfThread (i16 pool) override ;
34+ void GiveHalfThread (i16 from, i16 to) override ;
35+
36+ i16 GetSharedThreadCount () const override ;
37+
38+ TSharedPoolState GetState () const override ;
39+
40+ void Init (const std::vector<IExecutorPool*>& pools, bool withThreads) override ;
41+
42+ private:
43+ TSharedPoolState State;
44+
45+ std::vector<IExecutorPool*> Pools;
46+
47+ i16 PoolCount;
48+ i16 SharedThreadCount;
49+ std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;
50+
51+ std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
52+ std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;
53+
54+ TDuration TimePerMailbox;
55+ ui32 EventsPerMailbox;
56+ ui64 SoftProcessingDurationTs;
57+ }; // class TSharedExecutorPool
58+
1559TSharedExecutorPool::TSharedExecutorPool (const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16 > poolsWithThreads)
1660 : State(poolCount, poolsWithThreads.size())
1761 , Pools(poolCount)
@@ -29,40 +73,47 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config
2973 }
3074}
3175
32- void TSharedExecutorPool::Prepare (TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
33- // ActorSystem = actorSystem;
34-
35- ScheduleReaders.reset (new NSchedulerQueue::TReader[SharedThreadCount]);
36- ScheduleWriters.reset (new NSchedulerQueue::TWriter[SharedThreadCount]);
37-
38- std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools ();
39- std::vector<IExecutorPool*> poolByThread (SharedThreadCount);
40- for (IExecutorPool* pool : poolsBasic) {
41- Pools[pool->PoolId ] = dynamic_cast <TBasicExecutorPool*>(pool);
42- i16 threadIdx = State.ThreadByPool [pool->PoolId ];
43- if (threadIdx >= 0 ) {
44- poolByThread[threadIdx] = pool;
45- }
76+ void TSharedExecutorPool::Init (const std::vector<IExecutorPool*>& pools, bool withThreads) {
77+ std::vector<IExecutorPool*> poolByThread (SharedThreadCount);
78+ for (IExecutorPool* pool : pools) {
79+ Pools[pool->PoolId ] = pool;
80+ i16 threadIdx = State.ThreadByPool [pool->PoolId ];
81+ if (threadIdx >= 0 ) {
82+ poolByThread[threadIdx] = pool;
4683 }
84+ }
4785
48- for (i16 i = 0 ; i != SharedThreadCount; ++i) {
49- // !TODO
50- Threads[i].ExecutorPools [0 ].store (dynamic_cast <TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
86+ for (i16 i = 0 ; i != SharedThreadCount; ++i) {
87+ // !TODO
88+ Threads[i].ExecutorPools [0 ].store (dynamic_cast <TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
89+ if (withThreads) {
5190 Threads[i].Thread .reset (
5291 new TSharedExecutorThread (
5392 -1 ,
54- actorSystem ,
55- &Threads[i],
56- PoolCount,
57- " SharedThread" ,
58- SoftProcessingDurationTs,
59- TimePerMailbox,
93+ nullptr ,
94+ &Threads[i],
95+ PoolCount,
96+ " SharedThread" ,
97+ SoftProcessingDurationTs,
98+ TimePerMailbox,
6099 EventsPerMailbox));
61- ScheduleWriters[i].Init (ScheduleReaders[i]);
62100 }
101+ }
102+ }
103+
104+ void TSharedExecutorPool::Prepare (TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
105+ ScheduleReaders.reset (new NSchedulerQueue::TReader[SharedThreadCount]);
106+ ScheduleWriters.reset (new NSchedulerQueue::TWriter[SharedThreadCount]);
107+
108+ std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools ();
109+ Init (poolsBasic, true );
63110
64- *scheduleReaders = ScheduleReaders.get ();
65- *scheduleSz = SharedThreadCount;
111+ for (i16 i = 0 ; i != SharedThreadCount; ++i) {
112+ ScheduleWriters[i].Init (ScheduleReaders[i]);
113+ }
114+
115+ *scheduleReaders = ScheduleReaders.get ();
116+ *scheduleSz = SharedThreadCount;
66117}
67118
68119void TSharedExecutorPool::Start () {
@@ -99,24 +150,27 @@ TSharedExecutorThreadCtx* TSharedExecutorPool::GetSharedThread(i16 pool) {
99150 return &Threads[threadIdx];
100151}
101152
102- void TSharedExecutorPool::ReturnOwnHalfThread (i16 pool) {
153+ i16 TSharedExecutorPool::ReturnOwnHalfThread (i16 pool) {
103154 i16 threadIdx = State.ThreadByPool [pool];
104- TBasicExecutorPool * borrowingPool = Threads[threadIdx].ExecutorPools [1 ].exchange (nullptr , std::memory_order_acq_rel);
155+ IExecutorPool * borrowingPool = Threads[threadIdx].ExecutorPools [1 ].exchange (nullptr , std::memory_order_acq_rel);
105156 Y_ABORT_UNLESS (borrowingPool);
106- State.BorrowedThreadByPool [State.PoolByBorrowedThread [threadIdx]] = -1 ;
157+ i16 borrowedPool = State.PoolByBorrowedThread [threadIdx];
158+ State.BorrowedThreadByPool [borrowedPool] = -1 ;
107159 State.PoolByBorrowedThread [threadIdx] = -1 ;
108160 // TODO(kruall): Check on race
109161 borrowingPool->ReleaseSharedThread ();
162+ return borrowedPool;
110163}
111164
112- void TSharedExecutorPool::ReturnBorrowedHalfThread (i16 pool) {
165+ i16 TSharedExecutorPool::ReturnBorrowedHalfThread (i16 pool) {
113166 i16 threadIdx = State.BorrowedThreadByPool [pool];
114- TBasicExecutorPool * borrowingPool = Threads[threadIdx].ExecutorPools [1 ].exchange (nullptr , std::memory_order_acq_rel);
167+ IExecutorPool * borrowingPool = Threads[threadIdx].ExecutorPools [1 ].exchange (nullptr , std::memory_order_acq_rel);
115168 Y_ABORT_UNLESS (borrowingPool);
116169 State.BorrowedThreadByPool [State.PoolByBorrowedThread [threadIdx]] = -1 ;
117170 State.PoolByBorrowedThread [threadIdx] = -1 ;
118171 // TODO(kruall): Check on race
119172 borrowingPool->ReleaseSharedThread ();
173+ return State.PoolByThread [threadIdx];
120174}
121175
122176void TSharedExecutorPool::GiveHalfThread (i16 from, i16 to) {
@@ -127,14 +181,14 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
127181 if (borrowedThreadIdx != -1 ) {
128182 i16 originalPool = State.PoolByThread [borrowedThreadIdx];
129183 if (originalPool == to) {
130- return ReturnOwnHalfThread (to);
184+ ReturnOwnHalfThread (to);
131185 } else {
132186 ReturnOwnHalfThread (originalPool);
133187 }
134188 from = originalPool;
135189 }
136190 i16 threadIdx = State.ThreadByPool [from];
137- TBasicExecutorPool * borrowingPool = Pools[to];
191+ IExecutorPool * borrowingPool = Pools[to];
138192 Threads[threadIdx].ExecutorPools [1 ].store (borrowingPool, std::memory_order_release);
139193 State.BorrowedThreadByPool [to] = threadIdx;
140194 State.PoolByBorrowedThread [threadIdx] = to;
@@ -143,16 +197,16 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
143197}
144198
145199void TSharedExecutorPool::GetSharedStats (i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
146- statsCopy.resize (SharedThreadCount + 1 );
200+ statsCopy.resize (SharedThreadCount);
147201 for (i16 i = 0 ; i < SharedThreadCount; ++i) {
148- Threads[i].Thread ->GetSharedStats (poolId, statsCopy[i + 1 ]);
202+ Threads[i].Thread ->GetSharedStats (poolId, statsCopy[i]);
149203 }
150204}
151205
152206void TSharedExecutorPool::GetSharedStatsForHarmonizer (i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
153- statsCopy.resize (SharedThreadCount + 1 );
207+ statsCopy.resize (SharedThreadCount);
154208 for (i16 i = 0 ; i < SharedThreadCount; ++i) {
155- Threads[i].Thread ->GetSharedStatsForHarmonizer (poolId, statsCopy[i + 1 ]);
209+ Threads[i].Thread ->GetSharedStatsForHarmonizer (poolId, statsCopy[i]);
156210 }
157211}
158212
@@ -181,4 +235,34 @@ TSharedPoolState TSharedExecutorPool::GetState() const {
181235 return State;
182236}
183237
238+ ISharedExecutorPool *CreateSharedExecutorPool (const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16 > poolsWithThreads) {
239+ return new TSharedExecutorPool (config, poolCount, poolsWithThreads);
240+ }
241+
242+ TString TSharedPoolState::ToString () const {
243+ TStringBuilder builder;
244+ builder << ' {' ;
245+ builder << " ThreadByPool: [" ;
246+ for (ui32 i = 0 ; i < ThreadByPool.size (); ++i) {
247+ builder << ThreadByPool[i] << (i == ThreadByPool.size () - 1 ? " " : " , " );
248+ }
249+ builder << " ], " ;
250+ builder << " PoolByThread: [" ;
251+ for (ui32 i = 0 ; i < PoolByThread.size (); ++i) {
252+ builder << PoolByThread[i] << (i == PoolByThread.size () - 1 ? " " : " , " );
253+ }
254+ builder << " ], " ;
255+ builder << " BorrowedThreadByPool: [" ;
256+ for (ui32 i = 0 ; i < BorrowedThreadByPool.size (); ++i) {
257+ builder << BorrowedThreadByPool[i] << (i == BorrowedThreadByPool.size () - 1 ? " " : " , " );
258+ }
259+ builder << " ], " ;
260+ builder << " PoolByBorrowedThread: [" ;
261+ for (ui32 i = 0 ; i < PoolByBorrowedThread.size (); ++i) {
262+ builder << PoolByBorrowedThread[i] << (i == PoolByBorrowedThread.size () - 1 ? " " : " , " );
263+ }
264+ builder << ' ]' ;
265+ return builder << ' }' ;
266+ }
267+
184268}
0 commit comments