Skip to content

Commit 6d8d374

Browse files
Merge 61db074 into 6df1995
2 parents 6df1995 + 61db074 commit 6d8d374

File tree

6 files changed

+59
-1
lines changed

6 files changed

+59
-1
lines changed

ydb/core/tx/schemeshard/schemeshard_import__create.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
541541
Send(Self->SelfId(), CreateChangefeedPropose(Self, txId, item));
542542
}
543543

544+
void CreateConsumers(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
545+
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
546+
auto& item = importInfo->Items.at(itemIdx);
547+
item.SubState = ESubState::Proposed;
548+
549+
LOG_I("TImport::TTxProgress: CreateConsumers propose"
550+
<< ": info# " << importInfo->ToString()
551+
<< ", item# " << item.ToString(itemIdx)
552+
<< ", txId# " << txId);
553+
554+
Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);
555+
556+
Send(Self->SelfId(), CreateConsumersPropose(Self, txId, item));
557+
}
558+
544559
void AllocateTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
545560
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
546561
auto& item = importInfo->Items.at(itemIdx);
@@ -1046,6 +1061,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10461061

10471062
case EState::CreateChangefeed:
10481063
CreateChangefeed(importInfo, i, txId);
1064+
CreateConsumers(importInfo, i, txId);
10491065
itemIdx = i;
10501066
break;
10511067

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,5 +291,33 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
291291
return propose;
292292
}
293293

294+
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
295+
TSchemeShard* ss,
296+
TTxId txId,
297+
const TImportInfo::TItem& item
298+
) {
299+
Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size());
300+
301+
const auto& importChangefeedTopic = item.Changefeeds.GetChangefeeds()[item.NextChangefeedIdx];
302+
const auto& topic = importChangefeedTopic.GetTopic();
303+
304+
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
305+
auto& record = propose->Record;
306+
auto& modifyScheme = *record.AddTransaction();
307+
modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);
308+
auto& pqGroup = *modifyScheme.MutableAlterPersQueueGroup();
309+
310+
const TPath dstPath = TPath::Init(item.DstPathId, ss);
311+
modifyScheme.SetWorkingDir(dstPath.PathString());
312+
pqGroup.SetName(importChangefeedTopic.GetChangefeed().name());
313+
314+
for (const auto& consumer : topic.consumers()) {
315+
auto pqConsumer = *pqGroup.MutablePQTabletConfig()->AddConsumers();
316+
*pqConsumer.MutableName() = consumer.name();
317+
}
318+
319+
return propose;
320+
}
321+
294322
} // NSchemeShard
295323
} // NKikimr

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
5252
const TImportInfo::TItem& item
5353
);
5454

55+
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
56+
TSchemeShard* ss,
57+
TTxId txId,
58+
const TImportInfo::TItem& item
59+
);
60+
5561
} // NSchemeShard
5662
} // NKikimr

ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,12 @@ TCheckFunc RetentionPeriod(const TDuration& value) {
968968
};
969969
}
970970

971+
TCheckFunc ConsumerExist(const TString& name) {
972+
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
973+
UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetConsumers(0).GetName(), name);
974+
};
975+
}
976+
971977
void NoChildren(const NKikimrScheme::TEvDescribeSchemeResult& record) {
972978
ChildrenCount(0)(record);
973979
}

ydb/core/tx/schemeshard/ut_helpers/ls_checks.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ namespace NLs {
169169
TCheckFunc StreamAwsRegion(const TString& value);
170170
TCheckFunc StreamInitialScanProgress(ui32 total, ui32 completed);
171171
TCheckFunc RetentionPeriod(const TDuration& value);
172+
TCheckFunc ConsumerExist(const TString& name);
172173

173174
TCheckFunc HasBackupInFly(ui64 txId);
174175
void NoBackupInFly(const NKikimrScheme::TEvDescribeSchemeResult& record);

ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4897,7 +4897,8 @@ Y_UNIT_TEST_SUITE(TImportTests) {
48974897
{changefeedPath, GenerateTestData({EPathTypeCdcStream, changefeedDesc, std::move(attr)})},
48984898
[changefeedPath = TString(changefeedPath)](TTestBasicRuntime& runtime) {
48994899
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath, false, false, true), {
4900-
NLs::PathExist
4900+
NLs::PathExist,
4901+
NLs::ConsumerExist("my_consumer")
49014902
});
49024903
}
49034904
};

0 commit comments

Comments
 (0)