Skip to content

Commit 6b1f6a8

Browse files
Merge 462116f into 0a7a679
2 parents 0a7a679 + 462116f commit 6b1f6a8

File tree

6 files changed

+173
-0
lines changed

6 files changed

+173
-0
lines changed

ydb/library/backup/backup.cpp

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
#include "util.h"
44

55
#include <ydb/public/api/protos/ydb_table.pb.h>
6+
#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
67
#include <ydb/public/lib/ydb_cli/common/recursive_remove.h>
8+
#include <ydb/public/lib/ydb_cli/common/retry_func.h>
79
#include <ydb/public/lib/ydb_cli/dump/util/util.h>
810
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
11+
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
912
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
1013
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
1114
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
@@ -34,6 +37,8 @@ namespace NYdb::NBackup {
3437

3538
static constexpr const char *SCHEME_FILE_NAME = "scheme.pb";
3639
static constexpr const char *PERMISSIONS_FILE_NAME = "permissions.pb";
40+
static constexpr const char *CHANGEFEED_DESCRIPTION_FILE_NAME = "changefeed_description.pb";
41+
static constexpr const char *TOPIC_DESCRIPTION_FILE_NAME = "topic_description.pb";
3742
static constexpr const char *INCOMPLETE_DATA_FILE_NAME = "incomplete.csv";
3843
static constexpr const char *INCOMPLETE_FILE_NAME = "incomplete";
3944
static constexpr const char *EMPTY_FILE_NAME = "empty_dir";
@@ -471,6 +476,53 @@ void BackupPermissions(TDriver driver, const TString& dbPrefix, const TString& p
471476
outFile.Write(permissionsStr.data(), permissionsStr.size());
472477
}
473478

479+
TFsPath CreateDirectory(const TFsPath& folderPath, const TString& name) {
480+
TFsPath childFolderPath = folderPath.Child(name);
481+
LOG_D("Process " << childFolderPath.GetPath().Quote());
482+
childFolderPath.MkDir();
483+
return childFolderPath;
484+
}
485+
486+
Ydb::Table::ChangefeedDescription ProtoFromChangefeedDesc(const NTable::TChangefeedDescription& changefeedDesc) {
487+
Ydb::Table::ChangefeedDescription protoChangeFeedDesc;
488+
changefeedDesc.SerializeTo(protoChangeFeedDesc);
489+
return protoChangeFeedDesc;
490+
}
491+
492+
NTopic::TDescribeTopicResult GetTopicDescription(TDriver driver, const TString& path) {
493+
NYdb::NTopic::TTopicClient client(driver);
494+
return NConsoleClient::RetryFunction([&]() {
495+
return client.DescribeTopic(path).GetValueSync();
496+
});
497+
}
498+
499+
void WriteProtoToFile(const google::protobuf::Message& proto, const TFsPath& folderPath, const TString& fileName) {
500+
TString changefeedStr;
501+
google::protobuf::TextFormat::PrintToString(proto, &changefeedStr);
502+
LOG_D("Write changefeed into " << folderPath.Child(fileName).GetPath().Quote());
503+
TFile outFile(folderPath.Child(fileName), CreateAlways | WrOnly);
504+
outFile.Write(changefeedStr.data(), changefeedStr.size());
505+
}
506+
507+
void BackupChangefeeds(TDriver driver, const TString& dbPrefix, const TString& path, const TFsPath& folderPath) {
508+
509+
const auto dirPath = JoinDatabasePath(dbPrefix, path);
510+
auto desc = DescribeTable(driver, dirPath);
511+
512+
for (const auto& changefeedDesc : desc.GetChangefeedDescriptions()) {
513+
TFsPath changefeedDirPath = CreateDirectory(folderPath, changefeedDesc.GetName());
514+
515+
auto protoChangeFeedDesc = ProtoFromChangefeedDesc(changefeedDesc);
516+
const auto descTopicResult = GetTopicDescription(driver, JoinDatabasePath(dirPath, changefeedDesc.GetName()));
517+
NConsoleClient::ThrowOnError(descTopicResult);
518+
const auto& topicDescription = descTopicResult.GetTopicDescription();
519+
const auto protoTopicDescription = NYdb::TProtoAccessor::GetProto(topicDescription);
520+
521+
WriteProtoToFile(protoChangeFeedDesc, changefeedDirPath, CHANGEFEED_DESCRIPTION_FILE_NAME);
522+
WriteProtoToFile(protoTopicDescription, changefeedDirPath, TOPIC_DESCRIPTION_FILE_NAME);
523+
}
524+
}
525+
474526
void BackupTable(TDriver driver, const TString& dbPrefix, const TString& backupPrefix, const TString& path,
475527
const TFsPath& folderPath, bool schemaOnly, bool preservePoolKinds, bool ordered) {
476528
Y_ENSURE(!path.empty());
@@ -489,6 +541,7 @@ void BackupTable(TDriver driver, const TString& dbPrefix, const TString& backupP
489541
TFile outFile(folderPath.Child(SCHEME_FILE_NAME), CreateAlways | WrOnly);
490542
outFile.Write(schemaStr.data(), schemaStr.size());
491543

544+
BackupChangefeeds(driver, dbPrefix, path, folderPath);
492545
BackupPermissions(driver, dbPrefix, path, folderPath);
493546

494547
if (!schemaOnly) {

ydb/public/lib/ydb_cli/dump/dump.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ struct TRestoreSettings: public TOperationRequestSettings<TRestoreSettings> {
100100
FLUENT_SETTING_DEFAULT(bool, DryRun, false);
101101
FLUENT_SETTING_DEFAULT(bool, RestoreData, true);
102102
FLUENT_SETTING_DEFAULT(bool, RestoreIndexes, true);
103+
FLUENT_SETTING_DEFAULT(bool, RestoreChangefeed, true);
103104
FLUENT_SETTING_DEFAULT(bool, RestoreACL, true);
104105
FLUENT_SETTING_DEFAULT(bool, SkipDocumentTables, false);
105106
FLUENT_SETTING_DEFAULT(bool, SavePartialResult, false);

ydb/public/lib/ydb_cli/dump/restore_impl.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,15 @@ TRestoreResult TRestoreClient::RestoreTable(const TFsPath& fsPath, const TString
363363
LOG_D("Skip restoring indexes of " << dbPath.Quote());
364364
}
365365

366+
if (settings.RestoreChangefeed_) {
367+
auto result = RestoreChangefeeds(dbPath, dumpedDesc);
368+
if (!result.IsSuccess()) {
369+
return result;
370+
}
371+
} else if (!dumpedDesc.GetChangefeedDescriptions().empty()) {
372+
LOG_D("Skip restoring changefeeds of " << dbPath.Quote());
373+
}
374+
366375
return RestorePermissions(fsPath, dbPath, settings, oldEntries);
367376
}
368377

@@ -626,6 +635,52 @@ TRestoreResult TRestoreClient::RestoreIndexes(const TString& dbPath, const TTabl
626635
return Result<TRestoreResult>();
627636
}
628637

638+
TRestoreResult TRestoreClient::RestoreChangefeeds(const TString& dbPath, const TTableDescription& desc) {
639+
TMaybe<TTableDescription> actualDesc;
640+
auto descResult = DescribeTable(TableClient, dbPath, actualDesc);
641+
if (!descResult.IsSuccess()) {
642+
return Result<TRestoreResult>(dbPath, std::move(descResult));
643+
}
644+
645+
for (const auto& changefeed : desc.GetChangefeedDescriptions()) {
646+
if (FindPtr(actualDesc->GetChangefeedDescriptions(), changefeed)) {
647+
continue;
648+
}
649+
LOG_D("Restore changefeed " << changefeed.GetName().Quote() << " on " << dbPath.Quote());
650+
651+
TOperation::TOperationId buildChangefeedId;
652+
auto buildChangefeedStatus = TableClient.RetryOperationSync([&, &outId = buildChangefeedId](TSession session) {
653+
auto settings = TAlterTableSettings().AppendAddChangefeeds(changefeed);
654+
auto result = session.AlterTableLong(dbPath, settings).GetValueSync();
655+
if (IsOperationStarted(result.Status())) {
656+
outId = result.Id();
657+
}
658+
return result.Status();
659+
});
660+
661+
if (!IsOperationStarted(buildChangefeedStatus)) {
662+
LOG_E("Error building changefeed " << changefeed.GetName().Quote() << " on " << dbPath.Quote());
663+
return Result<TRestoreResult>(dbPath, std::move(buildChangefeedStatus));
664+
}
665+
666+
auto waitForIndexBuildStatus = WaitForIndexBuild(OperationClient, buildChangefeedId);
667+
if (!waitForIndexBuildStatus.IsSuccess()) {
668+
LOG_E("Error building changefeed " << changefeed.GetName().Quote() << " on " << dbPath.Quote());
669+
return Result<TRestoreResult>(dbPath, std::move(waitForIndexBuildStatus));
670+
}
671+
672+
auto forgetStatus = NConsoleClient::RetryFunction([&]() {
673+
return OperationClient.Forget(buildChangefeedId).GetValueSync();
674+
});
675+
if (!forgetStatus.IsSuccess()) {
676+
LOG_E("Error building changefeed " << changefeed.GetName().Quote() << " on " << dbPath.Quote());
677+
return Result<TRestoreResult>(dbPath, std::move(forgetStatus));
678+
}
679+
}
680+
681+
return Result<TRestoreResult>();
682+
}
683+
629684
TRestoreResult TRestoreClient::RestorePermissions(const TFsPath& fsPath, const TString& dbPath,
630685
const TRestoreSettings& settings, const THashSet<TString>& oldEntries)
631686
{

ydb/public/lib/ydb_cli/dump/restore_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ class TRestoreClient {
129129
TRestoreResult CheckSchema(const TString& dbPath, const NTable::TTableDescription& desc);
130130
TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc);
131131
TRestoreResult RestoreIndexes(const TString& dbPath, const NTable::TTableDescription& desc);
132+
TRestoreResult RestoreChangefeeds(const TString& dbPath, const NTable::TTableDescription& desc);
132133
TRestoreResult RestorePermissions(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const THashSet<TString>& oldEntries);
133134

134135
THolder<NPrivate::IDataWriter> CreateDataWriter(const TString& dbPath, const TRestoreSettings& settings,

ydb/public/sdk/cpp/client/ydb_table/table.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2869,6 +2869,68 @@ void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
28692869
}
28702870
}
28712871

2872+
void TChangefeedDescription::SerializeTo(Ydb::Table::ChangefeedDescription& proto) const {
2873+
proto.set_name(Name_);
2874+
proto.set_virtual_timestamps(VirtualTimestamps_);
2875+
proto.set_aws_region(AwsRegion_);
2876+
2877+
switch (State_) {
2878+
case EChangefeedState::Enabled:
2879+
proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_ENABLED);
2880+
break;
2881+
case EChangefeedState::Disabled:
2882+
proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_DISABLED);
2883+
break;
2884+
case EChangefeedState::InitialScan:
2885+
proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_INITIAL_SCAN);
2886+
break;
2887+
case EChangefeedState::Unknown:
2888+
break;
2889+
}
2890+
2891+
switch (Mode_) {
2892+
case EChangefeedMode::KeysOnly:
2893+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_KEYS_ONLY);
2894+
break;
2895+
case EChangefeedMode::Updates:
2896+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_UPDATES);
2897+
break;
2898+
case EChangefeedMode::NewImage:
2899+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_NEW_IMAGE);
2900+
break;
2901+
case EChangefeedMode::OldImage:
2902+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_OLD_IMAGE);
2903+
break;
2904+
case EChangefeedMode::NewAndOldImages:
2905+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_NEW_AND_OLD_IMAGES);
2906+
break;
2907+
case EChangefeedMode::Unknown:
2908+
break;
2909+
}
2910+
2911+
switch (Format_) {
2912+
case EChangefeedFormat::Json:
2913+
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
2914+
break;
2915+
case EChangefeedFormat::DynamoDBStreamsJson:
2916+
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
2917+
break;
2918+
case EChangefeedFormat::DebeziumJson:
2919+
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
2920+
break;
2921+
case EChangefeedFormat::Unknown:
2922+
break;
2923+
}
2924+
2925+
if (ResolvedTimestamps_) {
2926+
SetDuration(*ResolvedTimestamps_, *proto.mutable_resolved_timestamps_interval());
2927+
}
2928+
2929+
for (const auto& [key, value] : Attributes_) {
2930+
(*proto.mutable_attributes())[key] = value;
2931+
}
2932+
}
2933+
28722934
TString TChangefeedDescription::ToString() const {
28732935
TString result;
28742936
TStringOutput out(result);

ydb/public/sdk/cpp/client/ydb_table/table.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ class TChangefeedDescription {
389389
const std::optional<TInitialScanProgress>& GetInitialScanProgress() const;
390390

391391
void SerializeTo(Ydb::Table::Changefeed& proto) const;
392+
void SerializeTo(Ydb::Table::ChangefeedDescription& proto) const;
392393
TString ToString() const;
393394
void Out(IOutputStream& o) const;
394395

0 commit comments

Comments
 (0)