Skip to content

Commit e2d9db7

Browse files
authored
YQ-2892 support emulate YT in KqpRun (#2562)
1 parent 9d0310a commit e2d9db7

33 files changed

+154
-52
lines changed

ydb/core/fq/libs/common/cache_ut.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
#include <ydb/services/ydb/ydb_common_ut.h>
44

5-
using namespace NFq;
5+
namespace NFq {
66

77
using TCache = TTtlCache<int,int>;
88

@@ -66,3 +66,5 @@ Y_UNIT_TEST_SUITE(Cache) {
6666
UNIT_ASSERT_VALUES_EQUAL(cache.Size(), 0);
6767
}
6868
}
69+
70+
} // namespace NFq

ydb/core/grpc_services/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ PEERDIR(
106106
ydb/core/io_formats/ydb_dump
107107
ydb/core/kesus/tablet
108108
ydb/core/kqp/common
109+
ydb/core/kqp/federated_query
109110
ydb/core/protos
110111
ydb/core/scheme
111112
ydb/core/sys_view

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
#include <ydb/library/yql/providers/generic/actors/yql_generic_source_factory.h>
1313
#include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h>
1414
#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h>
15-
#include <ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
1615
#include <ydb/core/protos/ssa.pb.h>
1716
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
1817

@@ -24,22 +23,20 @@ using TCallableActorBuilderFunc = std::function<
2423
IComputationNode*(
2524
TCallable& callable, const TComputationNodeFactoryContext& ctx, TKqpScanComputeContext& computeCtx)>;
2625

27-
TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* computeCtx) {
26+
TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* computeCtx, const std::optional<NKqp::TKqpFederatedQuerySetup>& federatedQuerySetup) {
2827
MKQL_ENSURE_S(computeCtx);
2928

30-
auto computeFactory = GetKqpBaseComputeFactory(computeCtx);
31-
auto ytComputeFactory = NYql::GetDqYtFactory();
29+
auto computeFactory = NKqp::MakeKqpFederatedQueryComputeFactory(
30+
GetKqpBaseComputeFactory(computeCtx),
31+
federatedQuerySetup
32+
);
3233

33-
return [computeFactory, ytComputeFactory, computeCtx]
34+
return [computeFactory, computeCtx]
3435
(TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
3536
if (auto compute = computeFactory(callable, ctx)) {
3637
return compute;
3738
}
3839

39-
if (auto ytCompute = ytComputeFactory(callable, ctx)) {
40-
return ytCompute;
41-
}
42-
4340
auto name = callable.GetType()->GetName();
4441

4542
if (name == "KqpWideReadTable"sv) {

ydb/core/kqp/compute_actor/kqp_compute_actor.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace NMiniKQL {
1313

1414
class TKqpScanComputeContext;
1515

16-
TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* computeCtx);
16+
TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* computeCtx, const std::optional<NKqp::TKqpFederatedQuerySetup>& federatedQuerySetup);
1717

1818
} // namespace NMiniKQL
1919

@@ -47,7 +47,8 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
4747
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
4848
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
4949
NWilson::TTraceId traceId,
50-
TIntrusivePtr<NActors::TProtoArenaHolder> arena);
50+
TIntrusivePtr<NActors::TProtoArenaHolder> arena,
51+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);
5152

5253
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
5354
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro
1414
IDqAsyncIoFactory::TPtr asyncIoFactory,
1515
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
1616
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
17-
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena)
17+
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
18+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
1819
: TBase(executerId, txId, task, std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena))
1920
, ComputeCtx(settings.StatsMode)
21+
, FederatedQuerySetup(federatedQuerySetup)
2022
{
2123
if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) {
2224
Meta.ConstructInPlace();
@@ -44,7 +46,7 @@ void TKqpComputeActor::DoBootstrap() {
4446
execCtx.RandomProvider = TAppData::RandomProvider.Get();
4547
execCtx.TimeProvider = TAppData::TimeProvider.Get();
4648
execCtx.ComputeCtx = &ComputeCtx;
47-
execCtx.ComputationFactory = NMiniKQL::GetKqpActorComputeFactory(&ComputeCtx);
49+
execCtx.ComputationFactory = NMiniKQL::GetKqpActorComputeFactory(&ComputeCtx, FederatedQuerySetup);
4850
execCtx.ApplyCtx = nullptr;
4951
execCtx.TypeEnv = nullptr;
5052
execCtx.PatternCache = GetKqpResourceManager()->GetPatternCache();
@@ -278,10 +280,11 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
278280
IDqAsyncIoFactory::TPtr asyncIoFactory,
279281
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
280282
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
281-
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena)
283+
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
284+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
282285
{
283286
return new TKqpComputeActor(executerId, txId, task, std::move(asyncIoFactory),
284-
functionRegistry, settings, memoryLimits, std::move(traceId), std::move(arena));
287+
functionRegistry, settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup);
285288
}
286289

287290
} // namespace NKqp

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ class TKqpComputeActor : public TDqSyncComputeActorBase<TKqpComputeActor> {
2929
IDqAsyncIoFactory::TPtr asyncIoFactory,
3030
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
3131
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
32-
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena);
32+
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
33+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);
3334

3435
void DoBootstrap();
3536

@@ -61,13 +62,15 @@ class TKqpComputeActor : public TDqSyncComputeActorBase<TKqpComputeActor> {
6162
NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr;
6263
TActorId SysViewActorId;
6364
const TDqTaskRunnerParameterProvider ParameterProvider;
65+
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
6466
};
6567

6668
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask* task,
6769
IDqAsyncIoFactory::TPtr asyncIoFactory,
6870
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
6971
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
70-
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena);
72+
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
73+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);
7174

7275
} // namespace NKqp
7376
} // namespace NKikimr

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ void TKqpScanComputeActor::DoBootstrap() {
181181
NDq::TDqTaskRunnerContext execCtx;
182182
execCtx.FuncRegistry = AppData()->FunctionRegistry;
183183
execCtx.ComputeCtx = &ComputeCtx;
184-
execCtx.ComputationFactory = NMiniKQL::GetKqpActorComputeFactory(&ComputeCtx);
184+
execCtx.ComputationFactory = NMiniKQL::GetKqpActorComputeFactory(&ComputeCtx, std::nullopt);
185185
execCtx.RandomProvider = TAppData::RandomProvider.Get();
186186
execCtx.TimeProvider = TAppData::TimeProvider.Get();
187187
execCtx.ApplyCtx = nullptr;

ydb/core/kqp/compute_actor/ya.make

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ PEERDIR(
2424
ydb/library/yql/dq/actors/compute
2525
ydb/library/yql/providers/generic/actors
2626
ydb/library/yql/providers/s3/actors
27-
ydb/library/yql/providers/yt/comp_nodes/dq
2827
ydb/library/yql/public/issue
2928
)
3029

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
128128
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
129129
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
130130
const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
131-
const bool enableOlapSink, ui32 statementResultIndex)
131+
const bool enableOlapSink, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
132132
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
133133
maximalSecretsSnapshotWaitTime, userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult
134134
)
135135
, AsyncIoFactory(std::move(asyncIoFactory))
136136
, EnableOlapSink(enableOlapSink)
137+
, FederatedQuerySetup(federatedQuerySetup)
137138
{
138139
Target = creator;
139140

@@ -2206,7 +2207,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
22062207

22072208
Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(),
22082209
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(),
2209-
ExecuterSpan, std::move(ResourceSnapshot), ExecuterRetriesConfig, useDataQueryPool, localComputeTasks, Request.MkqlMemoryLimit, AsyncIoFactory, singlePartitionOptAllowed, GetUserRequestContext());
2210+
ExecuterSpan, std::move(ResourceSnapshot), ExecuterRetriesConfig, useDataQueryPool, localComputeTasks, Request.MkqlMemoryLimit, AsyncIoFactory, singlePartitionOptAllowed, GetUserRequestContext(), FederatedQuerySetup);
22102211

22112212
auto err = Planner->PlanExecution();
22122213
if (err) {
@@ -2404,6 +2405,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24042405
private:
24052406
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
24062407
bool EnableOlapSink = false;
2408+
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
24072409

24082410
bool HasExternalSources = false;
24092411
bool SecretSnapshotRequired = false;
@@ -2445,10 +2447,11 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
24452447
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
24462448
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
24472449
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
2448-
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool enableOlapSink, ui32 statementResultIndex)
2450+
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool enableOlapSink, ui32 statementResultIndex,
2451+
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
24492452
{
24502453
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig,
2451-
std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, statementResultIndex);
2454+
std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink, statementResultIndex, federatedQuerySetup);
24522455
}
24532456

24542457
} // namespace NKqp

ydb/core/kqp/executer_actor/kqp_executer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/core/kqp/query_data/kqp_query_data.h>
77
#include <ydb/core/kqp/gateway/kqp_gateway.h>
88
#include <ydb/core/kqp/counters/kqp_counters.h>
9+
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
910
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
1011
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
1112
#include <ydb/core/protos/table_service_config.pb.h>
@@ -93,7 +94,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
9394
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
9495
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
9596
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
96-
const bool enableOlapSink, ui32 statementResultIndex);
97+
const bool enableOlapSink, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);
9798

9899
IActor* CreateKqpSchemeExecuter(
99100
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,

0 commit comments

Comments
 (0)