@@ -541,6 +541,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
541541 Send (Self->SelfId (), CreateChangefeedPropose (Self, txId, item));
542542 }
543543
544+ void CreateConsumers (TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
545+ Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
546+ auto & item = importInfo->Items .at (itemIdx);
547+ item.SubState = ESubState::Proposed;
548+
549+ LOG_I (" TImport::TTxProgress: CreateConsumers propose"
550+ << " : info# " << importInfo->ToString ()
551+ << " , item# " << item.ToString (itemIdx)
552+ << " , txId# " << txId);
553+
554+ Y_ABORT_UNLESS (item.WaitTxId == InvalidTxId);
555+
556+ Send (Self->SelfId (), CreateConsumersPropose (Self, txId, item));
557+ }
558+
544559 void AllocateTxId (TImportInfo::TPtr importInfo, ui32 itemIdx) {
545560 Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
546561 auto & item = importInfo->Items .at (itemIdx);
@@ -622,6 +637,26 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
622637 return path->LastTxId ;
623638 }
624639
640+ TTxId GetActiveCreateConsumerTxId (TImportInfo::TPtr importInfo, ui32 itemIdx) {
641+ Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
642+ const auto & item = importInfo->Items .at (itemIdx);
643+
644+ Y_ABORT_UNLESS (item.State == EState::CreateChangefeed);
645+ Y_ABORT_UNLESS (item.DstPathId );
646+ Y_ABORT_UNLESS (item.StreamImplPath );
647+
648+ if (!Self->PathsById .contains (item.StreamImplPath )) {
649+ return InvalidTxId;
650+ }
651+
652+ auto path = Self->PathsById .at (item.StreamImplPath );
653+ if (path->PathState != NKikimrSchemeOp::EPathStateAlter) {
654+ return InvalidTxId;
655+ }
656+
657+ return path->LastTxId ;
658+ }
659+
625660 static TString MakeIndexBuildUid (TImportInfo::TPtr importInfo, ui32 itemIdx) {
626661 Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
627662 const auto & item = importInfo->Items .at (itemIdx);
@@ -1045,7 +1080,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10451080 break ;
10461081
10471082 case EState::CreateChangefeed:
1048- CreateChangefeed (importInfo, i, txId);
1083+ if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1084+ CreateChangefeed (importInfo, i, txId);
1085+ } else {
1086+ CreateConsumers (importInfo, i, txId);
1087+ }
10491088 itemIdx = i;
10501089 break ;
10511090
@@ -1109,8 +1148,26 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
11091148 } else if (item.State == EState::Transferring) {
11101149 txId = GetActiveRestoreTxId (importInfo, itemIdx);
11111150 } else if (item.State == EState::CreateChangefeed) {
1112- txId = GetActiveCreateChangefeedTxId (importInfo, itemIdx);
1151+ if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1152+ txId = GetActiveCreateChangefeedTxId (importInfo, itemIdx);
1153+ } else {
1154+ txId = GetActiveCreateConsumerTxId (importInfo, itemIdx);
1155+ }
1156+
1157+ }
1158+ }
1159+
1160+ if (record.GetStatus () == NKikimrScheme::StatusAlreadyExists && item.State == EState::CreateChangefeed) {
1161+ if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1162+ item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
1163+ AllocateTxId (importInfo, itemIdx);
1164+ } else if (++item.NextChangefeedIdx < item.Changefeeds .GetChangefeeds ().size ()) {
1165+ item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
1166+ AllocateTxId (importInfo, itemIdx);
1167+ } else {
1168+ item.State = EState::Done;
11131169 }
1170+ return ;
11141171 }
11151172
11161173 if (txId == InvalidTxId) {
@@ -1290,7 +1347,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
12901347 break ;
12911348
12921349 case EState::CreateChangefeed:
1293- if (++item.NextChangefeedIdx < item.Changefeeds .GetChangefeeds ().size ()) {
1350+ if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1351+ item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
1352+ AllocateTxId (importInfo, itemIdx);
1353+ } else if (++item.NextChangefeedIdx < item.Changefeeds .GetChangefeeds ().size ()) {
1354+ item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
12941355 AllocateTxId (importInfo, itemIdx);
12951356 } else {
12961357 item.State = EState::Done;
0 commit comments