@@ -1150,25 +1150,26 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
11501150 } else if (item.State == EState::CreateChangefeed) {
11511151 if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
11521152 txId = GetActiveCreateChangefeedTxId (importInfo, itemIdx);
1153- if (record.GetStatus () == NKikimrScheme::StatusAlreadyExists) {
1154- item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
1155- AllocateTxId (importInfo, itemIdx);
1156- // return;
1157- }
11581153 } else {
11591154 txId = GetActiveCreateConsumerTxId (importInfo, itemIdx);
1160- if (record.GetStatus () == NKikimrScheme::StatusAlreadyExists) {
1161- if (++item.NextChangefeedIdx < item.Changefeeds .GetChangefeeds ().size ()) {
1162- item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
1163- AllocateTxId (importInfo, itemIdx);
1164- }
1165- // return;
1166- }
11671155 }
11681156
11691157 }
11701158 }
11711159
1160+ if (record.GetStatus () == NKikimrScheme::StatusAlreadyExists && item.State == EState::CreateChangefeed) {
1161+ if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1162+ item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
1163+ AllocateTxId (importInfo, itemIdx);
1164+ } else if (++item.NextChangefeedIdx < item.Changefeeds .GetChangefeeds ().size ()) {
1165+ item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
1166+ AllocateTxId (importInfo, itemIdx);
1167+ } else {
1168+ item.State = EState::Done;
1169+ }
1170+ return ;
1171+ }
1172+
11721173 if (txId == InvalidTxId) {
11731174 return CancelAndPersist (db, importInfo, itemIdx, record.GetReason (), " unhappy propose" );
11741175 }
0 commit comments