Skip to content
72 changes: 65 additions & 7 deletions ydb/core/tx/schemeshard/schemeshard_import__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
Send(Self->SelfId(), CreateChangefeedPropose(Self, txId, item));
}

void CreateConsumers(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items.at(itemIdx);
item.SubState = ESubState::Proposed;

LOG_I("TImport::TTxProgress: CreateConsumers propose"
<< ": info# " << importInfo->ToString()
<< ", item# " << item.ToString(itemIdx)
<< ", txId# " << txId);

Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);

Send(Self->SelfId(), CreateConsumersPropose(Self, txId, item));
}

void AllocateTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
auto& item = importInfo->Items.at(itemIdx);
Expand Down Expand Up @@ -622,6 +637,26 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
return path->LastTxId;
}

TTxId GetActiveCreateConsumerTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
const auto& item = importInfo->Items.at(itemIdx);

Y_ABORT_UNLESS(item.State == EState::CreateChangefeed);
Y_ABORT_UNLESS(item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateConsumers);
Y_ABORT_UNLESS(item.StreamImplPathId);

if (!Self->PathsById.contains(item.StreamImplPathId)) {
return InvalidTxId;
}

auto path = Self->PathsById.at(item.StreamImplPathId);
if (path->PathState != NKikimrSchemeOp::EPathStateAlter) {
return InvalidTxId;
}

return path->LastTxId;
}

static TString MakeIndexBuildUid(TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
const auto& item = importInfo->Items.at(itemIdx);
Expand Down Expand Up @@ -816,10 +851,6 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
TTxId txId = InvalidTxId;

switch (item.State) {
case EState::CreateChangefeed:
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
break;

case EState::Transferring:
if (!CancelTransferring(importInfo, itemIdx)) {
txId = GetActiveRestoreTxId(importInfo, itemIdx);
Expand Down Expand Up @@ -1045,7 +1076,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
break;

case EState::CreateChangefeed:
CreateChangefeed(importInfo, i, txId);
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
CreateChangefeed(importInfo, i, txId);
} else {
CreateConsumers(importInfo, i, txId);
}
itemIdx = i;
break;

Expand Down Expand Up @@ -1109,11 +1144,30 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
} else if (item.State == EState::Transferring) {
txId = GetActiveRestoreTxId(importInfo, itemIdx);
} else if (item.State == EState::CreateChangefeed) {
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
} else {
txId = GetActiveCreateConsumerTxId(importInfo, itemIdx);
}

}
}

if (txId == InvalidTxId) {

if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists && item.State == EState::CreateChangefeed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Хорошо было бы на эту логику тест добавить

if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
AllocateTxId(importInfo, itemIdx);
} else if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
AllocateTxId(importInfo, itemIdx);
} else {
item.State = EState::Done;
}
return;
}

return CancelAndPersist(db, importInfo, itemIdx, record.GetReason(), "unhappy propose");
}

Expand Down Expand Up @@ -1290,7 +1344,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
break;

case EState::CreateChangefeed:
if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
AllocateTxId(importInfo, itemIdx);
} else if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
AllocateTxId(importInfo, itemIdx);
} else {
item.State = EState::Done;
Expand Down
53 changes: 53 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,5 +294,58 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
return propose;
}

THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
TSchemeShard* ss,
TTxId txId,
TImportInfo::TItem& item
) {
Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size());

const auto& importChangefeedTopic = item.Changefeeds.GetChangefeeds()[item.NextChangefeedIdx];
const auto& topic = importChangefeedTopic.GetTopic();

auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
auto& record = propose->Record;
auto& modifyScheme = *record.AddTransaction();
modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);
auto& pqGroup = *modifyScheme.MutableAlterPersQueueGroup();

const TPath dstPath = TPath::Init(item.DstPathId, ss);
const TString changefeedPath = dstPath.PathString() + "/" + importChangefeedTopic.GetChangefeed().name();
modifyScheme.SetWorkingDir(changefeedPath);
modifyScheme.SetInternal(true);

pqGroup.SetName("streamImpl");

NKikimrSchemeOp::TDescribeOptions opts;
opts.SetReturnPartitioningInfo(false);
opts.SetReturnPartitionConfig(true);
opts.SetReturnBoundaries(true);
opts.SetReturnIndexTableBoundaries(true);
opts.SetShowPrivateTable(true);
auto describeSchemeResult = DescribePath(ss, TlsActivationContext->AsActorContext(),changefeedPath + "/streamImpl", opts);

const auto& response = describeSchemeResult->GetRecord().GetPathDescription();
item.StreamImplPathId = {response.GetSelf().GetSchemeshardId(), response.GetSelf().GetPathId()};
pqGroup.CopyFrom(response.GetPersQueueGroup());

pqGroup.ClearTotalGroupCount();
pqGroup.MutablePQTabletConfig()->ClearPartitionKeySchema();

auto* tabletConfig = pqGroup.MutablePQTabletConfig();
const auto& pqConfig = AppData()->PQConfig;

for (const auto& consumer : topic.consumers()) {
auto& addedConsumer = *tabletConfig->AddConsumers();
auto consumerName = NPersQueue::ConvertNewConsumerName(consumer.name(), pqConfig);
addedConsumer.SetName(consumerName);
if (consumer.important()) {
addedConsumer.SetImportant(true);
}
}

return propose;
}

} // NSchemeShard
} // NKikimr
6 changes: 6 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
const TImportInfo::TItem& item
);

THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
TSchemeShard* ss,
TTxId txId,
TImportInfo::TItem& item
);

} // NSchemeShard
} // NKikimr
7 changes: 7 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2846,6 +2846,11 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
Subscribed,
};

enum class EChangefeedState: ui8 {
CreateChangefeed = 0,
CreateConsumers,
};

TString DstPathName;
TPathId DstPathId;
Ydb::Table::CreateTableRequest Scheme;
Expand All @@ -2857,13 +2862,15 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {

EState State = EState::GetScheme;
ESubState SubState = ESubState::AllocateTxId;
EChangefeedState ChangefeedState = EChangefeedState::CreateChangefeed;
TTxId WaitTxId = InvalidTxId;
TActorId SchemeGetter;
TActorId SchemeQueryExecutor;
int NextIndexIdx = 0;
int NextChangefeedIdx = 0;
TString Issue;
int ViewCreationRetries = 0;
TPathId StreamImplPathId;

TItem() = default;

Expand Down
23 changes: 23 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,29 @@ THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
return DescribePath(self, ctx, pathId, options);
}

THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
TSchemeShard* self,
const TActorContext& ctx,
const TString& path,
const NKikimrSchemeOp::TDescribeOptions& opts
) {
NKikimrSchemeOp::TDescribePath params;
params.SetPath(path);
params.MutableOptions()->CopyFrom(opts);

return TPathDescriber(self, std::move(params)).Describe(ctx);
}

THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
TSchemeShard* self,
const TActorContext& ctx,
const TString& path
) {
NKikimrSchemeOp::TDescribeOptions options;
options.SetShowPrivateTable(true);
return DescribePath(self, ctx, path, options);
}

void TSchemeShard::DescribeTable(
const TTableInfo& tableInfo,
const NScheme::TTypeRegistry* typeRegistry,
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_path_describer.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,18 @@ THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
TPathId pathId
);

THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
TSchemeShard* self,
const TActorContext& ctx,
const TString& path,
const NKikimrSchemeOp::TDescribeOptions& opts
);

THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
TSchemeShard* self,
const TActorContext& ctx,
const TString& path
);

} // NSchemeShard
} // NKikimr
13 changes: 13 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,19 @@ TCheckFunc RetentionPeriod(const TDuration& value) {
};
}

TCheckFunc ConsumerExist(const TString& name) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
bool isExist = false;
for (const auto& consumer : record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetConsumers()) {
if (consumer.GetName() == name) {
isExist = true;
break;
}
}
UNIT_ASSERT(isExist);
};
}

void NoChildren(const NKikimrScheme::TEvDescribeSchemeResult& record) {
ChildrenCount(0)(record);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ namespace NLs {
TCheckFunc StreamAwsRegion(const TString& value);
TCheckFunc StreamInitialScanProgress(ui32 total, ui32 completed);
TCheckFunc RetentionPeriod(const TDuration& value);
TCheckFunc ConsumerExist(const TString& name);

TCheckFunc HasBackupInFly(ui64 txId);
void NoBackupInFly(const NKikimrScheme::TEvDescribeSchemeResult& record);
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5067,7 +5067,10 @@ Y_UNIT_TEST_SUITE(TImportTests) {
{changefeedPath, GenerateTestData({EPathTypeCdcStream, changefeedDesc, std::move(attr)})},
[changefeedPath = TString(changefeedPath)](TTestBasicRuntime& runtime) {
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath, false, false, true), {
NLs::PathExist
NLs::PathExist,
});
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath + "/streamImpl", false, false, true), {
NLs::ConsumerExist("my_consumer")
});
}
};
Expand Down
Loading