@@ -526,6 +526,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
526526 return true ;
527527 }
528528
529+ void CreateChangefeed (TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
530+ Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
531+ auto & item = importInfo->Items .at (itemIdx);
532+ item.SubState = ESubState::Proposed;
533+
534+ LOG_I (" TImport::TTxProgress: CreateChangefeed propose"
535+ << " : info# " << importInfo->ToString ()
536+ << " , item# " << item.ToString (itemIdx)
537+ << " , txId# " << txId);
538+
539+ Y_ABORT_UNLESS (item.WaitTxId == InvalidTxId);
540+
541+ Send (Self->SelfId (), CreateChangefeedPropose (Self, txId, item));
542+ }
543+
529544 void AllocateTxId (TImportInfo::TPtr importInfo, ui32 itemIdx) {
530545 Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
531546 auto & item = importInfo->Items .at (itemIdx);
@@ -588,6 +603,25 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
588603 return TTxId (ui64 ((*infoPtr)->Id ));
589604 }
590605
606+ TTxId GetActiveCreateChangefeedTxId (TImportInfo::TPtr importInfo, ui32 itemIdx) {
607+ Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
608+ const auto & item = importInfo->Items .at (itemIdx);
609+
610+ Y_ABORT_UNLESS (item.State == EState::CreateChangefeed);
611+ Y_ABORT_UNLESS (item.DstPathId );
612+
613+ if (!Self->PathsById .contains (item.DstPathId )) {
614+ return InvalidTxId;
615+ }
616+
617+ auto path = Self->PathsById .at (item.DstPathId );
618+ if (path->PathState != NKikimrSchemeOp::EPathStateAlter) {
619+ return InvalidTxId;
620+ }
621+
622+ return path->LastTxId ;
623+ }
624+
591625 static TString MakeIndexBuildUid (TImportInfo::TPtr importInfo, ui32 itemIdx) {
592626 Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
593627 const auto & item = importInfo->Items .at (itemIdx);
@@ -756,17 +790,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
756790 case EState::CreateSchemeObject:
757791 case EState::Transferring:
758792 case EState::BuildIndexes:
793+ case EState::CreateChangefeed:
759794 if (item.WaitTxId == InvalidTxId) {
760795 if (!IsCreatedByQuery (item) || item.PreparedCreationQuery ) {
796+ Cerr << " AllocateTxIdCreateChfg: " << IsCreatedByQuery (item) << " " << item.PreparedCreationQuery << Endl;
761797 AllocateTxId (importInfo, itemIdx);
762798 } else {
799+ Cerr << " NoAllocateTxIdCreateChfg SchemeQueryExecutor: " << IsCreatedByQuery (item) << " " << item.PreparedCreationQuery << Endl;
763800 const auto database = GetDatabase (*Self);
764801 item.SchemeQueryExecutor = ctx.Register (CreateSchemeQueryExecutor (
765802 Self->SelfId (), importInfo->Id , itemIdx, item.CreationQuery , database
766803 ));
767804 Self->RunningImportSchemeQueryExecutors .emplace (item.SchemeQueryExecutor );
768805 }
769806 } else {
807+ Cerr << " SubscribeTxCreateChfg: " << IsCreatedByQuery (item) << " " << item.PreparedCreationQuery << Endl;
770808 SubscribeTx (importInfo, itemIdx);
771809 }
772810 break ;
@@ -781,6 +819,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
781819 TTxId txId = InvalidTxId;
782820
783821 switch (item.State ) {
822+ case EState::CreateChangefeed:
823+ txId = GetActiveCreateChangefeedTxId (importInfo, itemIdx);
824+ break ;
825+
784826 case EState::Transferring:
785827 if (!CancelTransferring (importInfo, itemIdx)) {
786828 txId = GetActiveRestoreTxId (importInfo, itemIdx);
@@ -1004,6 +1046,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10041046 BuildIndex (importInfo, i, txId);
10051047 itemIdx = i;
10061048 break ;
1049+
1050+ case EState::CreateChangefeed:
1051+ CreateChangefeed (importInfo, i, txId);
1052+ itemIdx = i;
1053+ break ;
10071054
10081055 default :
10091056 break ;
@@ -1064,6 +1111,8 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10641111 txId = TTxId (record.GetPathCreateTxId ());
10651112 } else if (item.State == EState::Transferring) {
10661113 txId = GetActiveRestoreTxId (importInfo, itemIdx);
1114+ } else if (item.State == EState::CreateChangefeed) {
1115+ txId = GetActiveCreateChangefeedTxId (importInfo, itemIdx);
10671116 }
10681117 }
10691118
@@ -1216,6 +1265,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
12161265 if (item.NextIndexIdx < item.Scheme .indexes_size ()) {
12171266 item.State = EState::BuildIndexes;
12181267 AllocateTxId (importInfo, itemIdx);
1268+ } else if (item.NextChangefeedIdx < item.Changefeeds .changefeeds_size () &&
1269+ AppData ()->FeatureFlags .GetEnableChangefeedsImport ()) {
1270+ item.State = EState::CreateChangefeed;
1271+ AllocateTxId (importInfo, itemIdx);
12191272 } else {
12201273 item.State = EState::Done;
12211274 }
@@ -1229,11 +1282,23 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
12291282 } else {
12301283 if (++item.NextIndexIdx < item.Scheme .indexes_size ()) {
12311284 AllocateTxId (importInfo, itemIdx);
1285+ } else if (item.NextChangefeedIdx < item.Changefeeds .changefeeds_size () &&
1286+ AppData ()->FeatureFlags .GetEnableChangefeedsImport ()) {
1287+ item.State = EState::CreateChangefeed;
1288+ AllocateTxId (importInfo, itemIdx);
12321289 } else {
12331290 item.State = EState::Done;
12341291 }
12351292 }
12361293 break ;
1294+
1295+ case EState::CreateChangefeed:
1296+ if (++item.NextChangefeedIdx < item.Changefeeds .GetChangefeeds ().size ()) {
1297+ AllocateTxId (importInfo, itemIdx);
1298+ } else {
1299+ item.State = EState::Done;
1300+ }
1301+ break ;
12371302
12381303 default :
12391304 return SendNotificationsIfFinished (importInfo);
0 commit comments