@@ -43,6 +43,14 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
4343 return TStringBuilder () << settings.items (itemIdx).source_prefix () << " /permissions.pb" ;
4444 }
4545
46+ static TString ChangefeedDescriptionKey (const TString& changefeedPrefix) {
47+ return TStringBuilder () << changefeedPrefix << " /changefeed_description.pb" ;
48+ }
49+
50+ static TString TopicDescriptionKey (const TString& changefeedPrefix) {
51+ return TStringBuilder () << changefeedPrefix << " /topic_description.pb" ;
52+ }
53+
4654 static bool IsView (TStringBuf schemeKey) {
4755 return schemeKey.EndsWith (NYdb::NDump::NFiles::CreateView ().FileName );
4856 }
@@ -51,6 +59,41 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
5159 return errorType == S3Errors::RESOURCE_NOT_FOUND || errorType == S3Errors::NO_SUCH_KEY;
5260 }
5361
62+ void ListObjects (const TString& prefix) {
63+ auto request = Model::ListObjectsRequest ()
64+ .WithPrefix (prefix);
65+
66+ Send (Client, new TEvExternalStorage::TEvListObjectsRequest (request));
67+ }
68+
69+ void HandleChangefeeds (TEvExternalStorage::TEvListObjectsResponse::TPtr& ev) {
70+ const auto & result = ev.Get ()->Get ()->Result ;
71+
72+ LOG_D (" HandleChangefeeds TEvExternalStorage::TEvListObjectResponse"
73+ << " : self# " << SelfId ()
74+ << " , result# " << result);
75+
76+ if (!CheckResult (result, " ListObject" )) {
77+ return ;
78+ }
79+
80+ const auto & objects = result.GetResult ().GetContents ();
81+ ChangefeedsKeys.reserve (objects.size ());
82+
83+ for (const auto & obj : objects) {
84+ const TFsPath& path = obj.GetKey ();
85+ if (path.GetName () == " changefeed_description.pb" ) {
86+ ChangefeedsKeys.push_back (path.Dirname ());
87+ }
88+ }
89+
90+ if (!ChangefeedsKeys.empty ()) {
91+ HeadObject (ChangefeedDescriptionKey (ChangefeedsKeys[0 ]));
92+ } else {
93+ Reply ();
94+ }
95+ }
96+
5497 void HeadObject (const TString& key) {
5598 auto request = Model::HeadObjectRequest ()
5699 .WithKey (key);
@@ -128,6 +171,36 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
128171 GetObject (ChecksumKey, std::make_pair (0 , contentLength - 1 ));
129172 }
130173
174+ void HandleChangefeed (TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
175+ const auto & result = ev->Get ()->Result ;
176+
177+ LOG_D (" HandleChangefeeds TEvExternalStorage::TEvHeadObjectResponse"
178+ << " : self# " << SelfId ()
179+ << " , result# " << result);
180+
181+ if (!CheckResult (result, " HeadObject" )) {
182+ return ;
183+ }
184+
185+ const auto contentLength = result.GetResult ().GetContentLength ();
186+ GetObject (ChangefeedDescriptionKey (ChangefeedsKeys[IndexDownloadedChangefeed]), std::make_pair (0 , contentLength - 1 ));
187+ }
188+
189+ void HandleTopic (TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
190+ const auto & result = ev->Get ()->Result ;
191+
192+ LOG_D (" HandleTopic TEvExternalStorage::TEvHeadObjectResponse"
193+ << " : self# " << SelfId ()
194+ << " , result# " << result);
195+
196+ if (!CheckResult (result, " HeadObject" )) {
197+ return ;
198+ }
199+
200+ const auto contentLength = result.GetResult ().GetContentLength ();
201+ GetObject (TopicDescriptionKey (ChangefeedsKeys[IndexDownloadedChangefeed]), std::make_pair (0 , contentLength - 1 ));
202+ }
203+
131204 void GetObject (const TString& key, const std::pair<ui64, ui64>& range) {
132205 auto request = Model::GetObjectRequest ()
133206 .WithKey (key)
@@ -205,7 +278,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
205278 if (NeedDownloadPermissions) {
206279 StartDownloadingPermissions ();
207280 } else {
208- Reply ();
281+ StartDownloadingChangefeeds ();
209282 }
210283 };
211284
@@ -242,7 +315,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
242315 item.Permissions = std::move (permissions);
243316
244317 auto nextStep = [this ]() {
245- Reply ();
318+ StartDownloadingChangefeeds ();
246319 };
247320
248321 if (NeedValidateChecksums) {
@@ -274,6 +347,82 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
274347 ChecksumValidatedCallback ();
275348 }
276349
350+ void HandleChangefeed (TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
351+ const auto & msg = *ev->Get ();
352+ const auto & result = msg.Result ;
353+
354+ LOG_D (" HandleChangefeeds TEvExternalStorage::TEvGetObjectResponse"
355+ << " : self# " << SelfId ()
356+ << " , result# " << result);
357+
358+ if (!CheckResult (result, " GetObject" )) {
359+ return ;
360+ }
361+
362+ Y_ABORT_UNLESS (ItemIdx < ImportInfo->Items .size ());
363+ auto & item = ImportInfo->Items .at (ItemIdx);
364+
365+ LOG_T (" Trying to parse changefeed"
366+ << " : self# " << SelfId ()
367+ << " , body# " << SubstGlobalCopy (msg.Body , " \n " , " \\ n" ));
368+
369+ Ydb::Table::ChangefeedDescription changefeed;
370+ if (!google::protobuf::TextFormat::ParseFromString (msg.Body , &changefeed)) {
371+ return Reply (false , " Cannot parse permissions" );
372+ }
373+ item.Changefeeds [IndexDownloadedChangefeed].ChangefeedDescription = std::move (changefeed);
374+
375+ auto nextStep = [this ]() {
376+ HeadObject (TopicDescriptionKey (ChangefeedsKeys[IndexDownloadedChangefeed]));
377+ };
378+
379+ if (NeedValidateChecksums) {
380+ StartValidatingChecksum (ChangefeedDescriptionKey (ChangefeedsKeys[IndexDownloadedChangefeed]), msg.Body , nextStep);
381+ } else {
382+ nextStep ();
383+ }
384+ }
385+
386+ void HandleTopic (TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
387+ const auto & msg = *ev->Get ();
388+ const auto & result = msg.Result ;
389+
390+ LOG_D (" HandleChangefeeds TEvExternalStorage::TEvGetObjectResponse"
391+ << " : self# " << SelfId ()
392+ << " , result# " << result);
393+
394+ if (!CheckResult (result, " GetObject" )) {
395+ return ;
396+ }
397+
398+ Y_ABORT_UNLESS (ItemIdx < ImportInfo->Items .size ());
399+ auto & item = ImportInfo->Items .at (ItemIdx);
400+
401+ LOG_T (" Trying to parse changefeed"
402+ << " : self# " << SelfId ()
403+ << " , body# " << SubstGlobalCopy (msg.Body , " \n " , " \\ n" ));
404+
405+ Ydb::Topic::DescribeTopicResult topic;
406+ if (!google::protobuf::TextFormat::ParseFromString (msg.Body , &topic)) {
407+ return Reply (false , " Cannot parse permissions" );
408+ }
409+ item.Changefeeds [IndexDownloadedChangefeed].Topic = std::move (topic);
410+
411+ auto nextStep = [this ]() {
412+ if (++IndexDownloadedChangefeed == ChangefeedsKeys.size ()) {
413+ Reply ();
414+ } else {
415+ HeadObject (ChangefeedDescriptionKey (ChangefeedsKeys[IndexDownloadedChangefeed]));
416+ }
417+ };
418+
419+ if (NeedValidateChecksums) {
420+ StartValidatingChecksum (TopicDescriptionKey (ChangefeedsKeys[IndexDownloadedChangefeed]), msg.Body , nextStep);
421+ } else {
422+ nextStep ();
423+ }
424+ }
425+
277426 template <typename TResult>
278427 bool CheckResult (const TResult& result, const TStringBuf marker) {
279428 if (result.IsSuccess ()) {
@@ -312,12 +461,20 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
312461 TActor::PassAway ();
313462 }
314463
315- void Download ( const TString& key ) {
464+ void DownloadCommon ( ) {
316465 if (Client) {
317466 Send (Client, new TEvents::TEvPoisonPill ());
318467 }
319468 Client = RegisterWithSameMailbox (CreateS3Wrapper (ExternalStorageConfig->ConstructStorageOperator ()));
469+ }
470+
471+ void DownloadWithoutKey () {
472+ DownloadCommon ();
473+ ListObjects (ImportInfo->Settings .items (ItemIdx).source_prefix ());
474+ }
320475
476+ void Download (const TString& key) {
477+ DownloadCommon ();
321478 HeadObject (key);
322479 }
323480
@@ -337,6 +494,10 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
337494 Download (ChecksumKey);
338495 }
339496
497+ void DownloadChangefeeds () {
498+ DownloadWithoutKey ();
499+ }
500+
340501 void ResetRetries () {
341502 Attempt = 0 ;
342503 }
@@ -353,6 +514,12 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
353514 Become (&TThis::StateDownloadPermissions);
354515 }
355516
517+ void StartDownloadingChangefeeds () {
518+ ResetRetries ();
519+ DownloadChangefeeds ();
520+ Become (&TThis::StateDownloadChangefeeds);
521+ }
522+
356523 void StartValidatingChecksum (const TString& key, const TString& object, std::function<void ()> checksumValidatedCallback) {
357524 ChecksumKey = NBackup::ChecksumKey (key);
358525 Checksum = NBackup::ComputeChecksum (object);
@@ -413,6 +580,17 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
413580 }
414581 }
415582
583+ STATEFN (StateDownloadChangefeeds) {
584+ switch (ev->GetTypeRewrite ()) {
585+ hFunc (TEvExternalStorage::TEvListObjectsResponse, HandleChangefeeds);
586+ hFunc (TEvExternalStorage::TEvHeadObjectResponse, HandleChangefeed);
587+ hFunc (TEvExternalStorage::TEvGetObjectResponse, HandleChangefeed);
588+
589+ sFunc (TEvents::TEvWakeup, DownloadChangefeeds);
590+ sFunc (TEvents::TEvPoisonPill, PassAway);
591+ }
592+ }
593+
416594 STATEFN (StateDownloadChecksum) {
417595 switch (ev->GetTypeRewrite ()) {
418596 hFunc (TEvExternalStorage::TEvHeadObjectResponse, HandleChecksum);
@@ -432,6 +610,8 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
432610 const TString MetadataKey;
433611 TString SchemeKey;
434612 const TString PermissionsKey;
613+ TVector<TString> ChangefeedsKeys;
614+ ui64 IndexDownloadedChangefeed = 0 ;
435615
436616 const ui32 Retries;
437617 ui32 Attempt = 0 ;
0 commit comments