Skip to content

Commit da5cb37

Browse files
authored
YQ-4253 added CPU limit per process in CS (#17804)
1 parent 8703fb9 commit da5cb37

File tree

27 files changed

+424
-97
lines changed

27 files changed

+424
-97
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,20 @@ void TShardsScanningPolicy::FillRequestScanFeatures(const NKikimrTxDataShard::TK
126126
maxInFlight = ProtoConfig.GetScanLimit();
127127
}
128128
}
129+
130+
TConclusionStatus TCPULimits::DeserializeFromProto(const NKikimrKqp::TEvStartKqpTasksRequest& config) {
131+
const auto share = config.GetPoolMaxCpuShare();
132+
if (share <= 0 || 1 < share) {
133+
return TConclusionStatus::Fail("cpu share have to be in (0, 1] interval");
134+
}
135+
NActors::TExecutorPoolStats poolStats;
136+
TVector<NActors::TExecutorThreadStats> threadsStats;
137+
TActivationContext::ActorSystem()->GetPoolStats(TActivationContext::AsActorContext().SelfID.PoolID(), poolStats, threadsStats);
138+
CPUGroupThreadsLimit = Max<ui64>(poolStats.MaxThreadCount, 1) * share;
139+
CPUGroupName = config.GetSchedulerGroup();
140+
return TConclusionStatus::Success();
141+
}
142+
129143
}
130144
} // namespace NKikimr
131145

@@ -146,8 +160,10 @@ IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
146160

147161
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
148162
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
149-
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, TMaybe<NKikimrDataEvents::ELockMode> lockMode, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
150-
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, lockMode, meta, shardsScanningPolicy, counters, std::move(traceId));
163+
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, TMaybe<NKikimrDataEvents::ELockMode> lockMode,
164+
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId,
165+
const TCPULimits& cpuLimits) {
166+
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, lockMode, meta, shardsScanningPolicy, counters, std::move(traceId), cpuLimits);
151167
}
152168

153169
}

ydb/core/kqp/compute_actor/kqp_compute_actor.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ class TShardsScanningPolicy {
4444

4545
};
4646

47+
class TCPULimits {
48+
YDB_OPT(double, CPUGroupThreadsLimit);
49+
YDB_READONLY_DEF(TString, CPUGroupName);
50+
public:
51+
TConclusionStatus DeserializeFromProto(const NKikimrKqp::TEvStartKqpTasksRequest& config);
52+
};
53+
4754
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task,
4855
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
4956
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
@@ -62,7 +69,8 @@ IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, NYql::N
6269
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
6370
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
6471
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, TMaybe<NKikimrDataEvents::ELockMode> lockMode,
65-
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
72+
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId,
73+
const TCPULimits& cpuLimits);
6674

6775
NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
6876
TIntrusivePtr<TKqpCounters> counters,

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,16 @@ static constexpr ui64 MAX_SHARD_RESOLVES = 3;
2626
TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TComputeRuntimeSettings& settings,
2727
std::vector<NActors::TActorId>&& computeActors, const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId,
2828
const TMaybe<NKikimrDataEvents::ELockMode> lockMode, const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta,
29-
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId)
29+
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId,
30+
const TCPULimits& cpuLimits)
3031
: Meta(meta)
3132
, ScanDataMeta(Meta)
3233
, RuntimeSettings(settings)
3334
, TxId(txId)
3435
, LockTxId(lockTxId)
3536
, LockNodeId(lockNodeId)
3637
, LockMode(lockMode)
38+
, CPULimits(cpuLimits)
3739
, ComputeActorIds(std::move(computeActors))
3840
, Snapshot(snapshot)
3941
, ShardsScanningPolicy(shardsScanningPolicy)
@@ -498,6 +500,11 @@ std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEv
498500
ev->Record.SetOlapProgramType(NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
499501
}
500502

503+
if (const auto cpuGroupThreadsLimit = CPULimits.GetCPUGroupThreadsLimitOptional()) {
504+
ev->Record.SetCpuGroupThreadsLimit(*cpuGroupThreadsLimit);
505+
ev->Record.SetCpuGroupName(CPULimits.GetCPUGroupName());
506+
}
507+
501508
ev->Record.SetDataFormat(Meta.GetDataFormat());
502509
return ev;
503510
}

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped<TKqpScanFetcherAc
5353
const TMaybe<ui64> LockTxId;
5454
const ui32 LockNodeId;
5555
const TMaybe<NKikimrDataEvents::ELockMode> LockMode;
56+
const TCPULimits CPULimits;
5657

5758
public:
5859
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -63,7 +64,8 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped<TKqpScanFetcherAc
6364
std::vector<NActors::TActorId>&& computeActors,
6465
const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId, const TMaybe<NKikimrDataEvents::ELockMode> lockMode,
6566
const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta,
66-
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
67+
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId,
68+
const TCPULimits& cpuLimits);
6769

6870
static TVector<TSerializedTableRange> BuildSerializedTableRanges(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& readData);
6971

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,11 +314,17 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
314314
ActorIdToProto(taskCtx.ComputeActorId, startedTask->MutableActorId());
315315
}
316316

317+
TCPULimits cpuLimits;
318+
if (msg.GetPoolMaxCpuShare() > 0) {
319+
// Share <= 0 means disabled limit
320+
cpuLimits.DeserializeFromProto(msg).Validate();
321+
}
322+
317323
for (auto&& i : computesByStage) {
318324
for (auto&& m : i.second.MutableMetaInfo()) {
319325
Register(CreateKqpScanFetcher(msg.GetSnapshot(), std::move(m.MutableActorIds()),
320326
m.GetMeta(), runtimeSettingsBase, txId, lockTxId, lockNodeId, lockMode,
321-
scanPolicy, Counters, NWilson::TTraceId(ev->TraceId)));
327+
scanPolicy, Counters, NWilson::TTraceId(ev->TraceId), cpuLimits));
322328
}
323329
}
324330

ydb/core/protos/tx_datashard.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1762,6 +1762,8 @@ message TEvKqpScan {
17621762
optional string CSScanPolicy = 26;
17631763
optional NKikimrKqp.TEvKqpScanCursor ScanCursor = 27;
17641764
optional NKikimrDataEvents.ELockMode LockMode = 28;
1765+
optional double CpuGroupThreadsLimit = 29;
1766+
optional string CpuGroupName = 30;
17651767
}
17661768

17671769
message TEvCompactTable {

ydb/core/tx/columnshard/engines/reader/abstract/read_context.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ TReadContext::TReadContext(const std::shared_ptr<IStoragesManager>& storagesMana
1313
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
1414
const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId,
1515
const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy,
16-
const ui64 scanId)
16+
const ui64 scanId, const NConveyor::TCPULimitsConfig& cpuLimits)
1717
: StoragesManager(storagesManager)
1818
, DataAccessorsManager(dataAccessorsManager)
1919
, Counters(counters)
@@ -24,7 +24,7 @@ TReadContext::TReadContext(const std::shared_ptr<IStoragesManager>& storagesMana
2424
, ResourceSubscribeActorId(resourceSubscribeActorId)
2525
, ReadCoordinatorActorId(readCoordinatorActorId)
2626
, ComputeShardingPolicy(computeShardingPolicy)
27-
, ConveyorProcessGuard(NConveyor::TScanServiceOperator::StartProcess(ScanId)) {
27+
, ConveyorProcessGuard(NConveyor::TScanServiceOperator::StartProcess(ScanId, cpuLimits)) {
2828
Y_ABORT_UNLESS(ReadMetadata);
2929
if (ReadMetadata->HasResultSchema()) {
3030
Resolver = std::make_shared<NCommon::TIndexColumnResolver>(ReadMetadata->GetResultSchema()->GetIndexInfo());

ydb/core/tx/columnshard/engines/reader/abstract/read_context.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <ydb/core/tx/columnshard/data_accessor/manager.h>
88
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
99
#include <ydb/core/tx/conveyor/usage/abstract.h>
10+
#include <ydb/core/tx/conveyor/usage/config.h>
1011

1112
#include <ydb/library/accessor/accessor.h>
1213

@@ -148,7 +149,7 @@ class TReadContext {
148149
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
149150
const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId,
150151
const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy,
151-
const ui64 scanId);
152+
const ui64 scanId, const NConveyor::TCPULimitsConfig& cpuLimits);
152153
};
153154

154155
class IDataReader {

ydb/core/tx/columnshard/engines/reader/abstract/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ PEERDIR(
1313
ydb/core/tx/program
1414
ydb/core/protos
1515
ydb/core/tx/columnshard/data_sharing/protos
16+
ydb/core/tx/conveyor/usage
1617
)
1718

1819
GENERATE_ENUM_SERIALIZATION(read_metadata.h)

ydb/core/tx/columnshard/engines/reader/actor/actor.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
2121
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
2222
const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie, ui64 tabletId,
2323
TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange, NKikimrDataEvents::EDataFormat dataFormat,
24-
const NColumnShard::TScanCounters& scanCountersPool)
24+
const NColumnShard::TScanCounters& scanCountersPool, const NConveyor::TCPULimitsConfig& cpuLimits)
2525
: StoragesManager(storagesManager)
2626
, DataAccessorsManager(dataAccessorsManager)
2727
, ColumnShardActorId(columnShardActorId)
@@ -33,6 +33,7 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
3333
, RequestCookie(requestCookie)
3434
, DataFormat(dataFormat)
3535
, TabletId(tabletId)
36+
, CPULimits(cpuLimits)
3637
, ReadMetadataRange(readMetadataRange)
3738
, Timeout(timeout ? timeout : COMPUTE_HARD_TIMEOUT)
3839
, ScanCountersPool(scanCountersPool, TValidator::CheckNotNull(ReadMetadataRange)->GetProgram().GetGraphOptional())
@@ -53,7 +54,7 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
5354
ReadCoordinatorActorId = ctx.Register(new NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));
5455

5556
std::shared_ptr<TReadContext> context = std::make_shared<TReadContext>(StoragesManager, DataAccessorsManager, ScanCountersPool,
56-
ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy, ScanId);
57+
ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy, ScanId, CPULimits);
5758
ScanIterator = ReadMetadataRange->StartScan(context);
5859
auto startResult = ScanIterator->Start();
5960
StartInstant = TMonotonic::Now();

0 commit comments

Comments
 (0)