Skip to content

Commit d242d53

Browse files
Merge 381d9f1 into 751da8a
2 parents 751da8a + 381d9f1 commit d242d53

File tree

6 files changed

+156
-0
lines changed

6 files changed

+156
-0
lines changed

ydb/library/backup/backup.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/public/lib/ydb_cli/common/recursive_remove.h>
77
#include <ydb/public/lib/ydb_cli/dump/util/util.h>
88
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
9+
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
910
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
1011
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
1112
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
@@ -34,6 +35,8 @@ namespace NYdb::NBackup {
3435

3536
static constexpr const char *SCHEME_FILE_NAME = "scheme.pb";
3637
static constexpr const char *PERMISSIONS_FILE_NAME = "permissions.pb";
38+
static constexpr const char *CHANGEFEED_DESCRIPTION_FILE_NAME = "changefeed_description.pb";
39+
static constexpr const char *TOPIC_DESCRIPTION_FILE_NAME = "topic_description.pb";
3740
static constexpr const char *INCOMPLETE_DATA_FILE_NAME = "incomplete.csv";
3841
static constexpr const char *INCOMPLETE_FILE_NAME = "incomplete";
3942
static constexpr const char *EMPTY_FILE_NAME = "empty_dir";
@@ -471,6 +474,52 @@ void BackupPermissions(TDriver driver, const TString& dbPrefix, const TString& p
471474
outFile.Write(permissionsStr.data(), permissionsStr.size());
472475
}
473476

477+
TFsPath CreateDirectory(const TFsPath& folderPath, const TString& name) {
478+
TFsPath childFolderPath = folderPath.Child(name);
479+
LOG_D("Process " << childFolderPath.GetPath().Quote());
480+
childFolderPath.MkDir();
481+
return childFolderPath;
482+
}
483+
484+
Ydb::Table::ChangefeedDescription ProtoFromChangefeedDesc(const NTable::TChangefeedDescription& changefeedDesc) {
485+
Ydb::Table::ChangefeedDescription protoChangeFeedDesc;
486+
changefeedDesc.SerializeTo(protoChangeFeedDesc);
487+
return protoChangeFeedDesc;
488+
}
489+
490+
NTopic::TTopicDescription GetTopicDescription(TDriver driver, const TString& path) {
491+
NYdb::NTopic::TTopicClient client(driver);
492+
auto result =
493+
client.DescribeTopic(path).GetValueSync();
494+
return result.GetTopicDescription();
495+
}
496+
497+
void WriteProtoToFile(const google::protobuf::Message& proto, const TFsPath& folderPath, const TString& fileName) {
498+
TString changefeedStr;
499+
google::protobuf::TextFormat::PrintToString(proto, &changefeedStr);
500+
LOG_D("Write changefeed into " << folderPath.Child(fileName).GetPath().Quote());
501+
TFile outFile(folderPath.Child(fileName), CreateAlways | WrOnly);
502+
outFile.Write(changefeedStr.data(), changefeedStr.size());
503+
}
504+
505+
void BackupChangefeeds(TDriver driver, const TString& dbPrefix, const TString& path, const TFsPath& folderPath) {
506+
507+
const auto dirPath = JoinDatabasePath(dbPrefix, path);
508+
auto desc = DescribeTable(driver, dirPath);
509+
510+
for (const auto& changefeedDesc : desc.GetChangefeedDescriptions()) {
511+
TFsPath changefeedDirPath = CreateDirectory(folderPath, changefeedDesc.GetName());
512+
513+
auto protoChangeFeedDesc = ProtoFromChangefeedDesc(changefeedDesc);
514+
auto a = JoinDatabasePath(dirPath, changefeedDesc.GetName());
515+
const auto topicDescription = GetTopicDescription(driver, JoinDatabasePath(dirPath, changefeedDesc.GetName()));
516+
const auto& protoTopicDescription = NYdb::TProtoAccessor::GetProto(topicDescription);
517+
518+
WriteProtoToFile(protoChangeFeedDesc, changefeedDirPath, CHANGEFEED_DESCRIPTION_FILE_NAME);
519+
WriteProtoToFile(protoTopicDescription, changefeedDirPath, TOPIC_DESCRIPTION_FILE_NAME);
520+
}
521+
}
522+
474523
void BackupTable(TDriver driver, const TString& dbPrefix, const TString& backupPrefix, const TString& path,
475524
const TFsPath& folderPath, bool schemaOnly, bool preservePoolKinds, bool ordered) {
476525
Y_ENSURE(!path.empty());
@@ -489,6 +538,7 @@ void BackupTable(TDriver driver, const TString& dbPrefix, const TString& backupP
489538
TFile outFile(folderPath.Child(SCHEME_FILE_NAME), CreateAlways | WrOnly);
490539
outFile.Write(schemaStr.data(), schemaStr.size());
491540

541+
BackupChangefeeds(driver, dbPrefix, path, folderPath);
492542
BackupPermissions(driver, dbPrefix, path, folderPath);
493543

494544
if (!schemaOnly) {

ydb/public/lib/ydb_cli/dump/dump.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ struct TRestoreSettings: public TOperationRequestSettings<TRestoreSettings> {
100100
FLUENT_SETTING_DEFAULT(bool, DryRun, false);
101101
FLUENT_SETTING_DEFAULT(bool, RestoreData, true);
102102
FLUENT_SETTING_DEFAULT(bool, RestoreIndexes, true);
103+
FLUENT_SETTING_DEFAULT(bool, RestoreChangefeed, true);
103104
FLUENT_SETTING_DEFAULT(bool, RestoreACL, true);
104105
FLUENT_SETTING_DEFAULT(bool, SkipDocumentTables, false);
105106
FLUENT_SETTING_DEFAULT(bool, SavePartialResult, false);

ydb/public/lib/ydb_cli/dump/restore_impl.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,15 @@ TRestoreResult TRestoreClient::RestoreTable(const TFsPath& fsPath, const TString
363363
LOG_D("Skip restoring indexes of " << dbPath.Quote());
364364
}
365365

366+
if (settings.RestoreChangefeed_) {
367+
auto result = RestoreChangefeeds(dbPath, dumpedDesc);
368+
if (!result.IsSuccess()) {
369+
return result;
370+
}
371+
} else if (!dumpedDesc.GetChangefeedDescriptions().empty()) {
372+
LOG_D("Skip restoring changefeeds of " << dbPath.Quote());
373+
}
374+
366375
return RestorePermissions(fsPath, dbPath, settings, oldEntries);
367376
}
368377

@@ -626,6 +635,52 @@ TRestoreResult TRestoreClient::RestoreIndexes(const TString& dbPath, const TTabl
626635
return Result<TRestoreResult>();
627636
}
628637

638+
TRestoreResult TRestoreClient::RestoreChangefeeds(const TString& dbPath, const TTableDescription& desc) {
639+
TMaybe<TTableDescription> actualDesc;
640+
auto descResult = DescribeTable(TableClient, dbPath, actualDesc);
641+
if (!descResult.IsSuccess()) {
642+
return Result<TRestoreResult>(dbPath, std::move(descResult));
643+
}
644+
645+
for (const auto& changefeed : desc.GetChangefeedDescriptions()) {
646+
if (FindPtr(actualDesc->GetChangefeedDescriptions(), changefeed)) {
647+
continue;
648+
}
649+
LOG_D("Restore changefeed " << changefeed.GetName().Quote() << " on " << dbPath.Quote());
650+
651+
TOperation::TOperationId buildChangefeedId;
652+
auto buildChangefeedStatus = TableClient.RetryOperationSync([&, &outId = buildChangefeedId](TSession session) {
653+
auto settings = TAlterTableSettings().AppendAddChangefeeds(changefeed);
654+
auto result = session.AlterTableLong(dbPath, settings).GetValueSync();
655+
if (IsOperationStarted(result.Status())) {
656+
outId = result.Id();
657+
}
658+
return result.Status();
659+
});
660+
661+
if (!IsOperationStarted(buildChangefeedStatus)) {
662+
LOG_E("Error building changefeed " << changefeed.GetName().Quote() << " on " << dbPath.Quote());
663+
return Result<TRestoreResult>(dbPath, std::move(buildChangefeedStatus));
664+
}
665+
666+
auto waitForIndexBuildStatus = WaitForIndexBuild(OperationClient, buildChangefeedId);
667+
if (!waitForIndexBuildStatus.IsSuccess()) {
668+
LOG_E("Error building changefeed " << changefeed.GetName().Quote() << " on " << dbPath.Quote());
669+
return Result<TRestoreResult>(dbPath, std::move(waitForIndexBuildStatus));
670+
}
671+
672+
auto forgetStatus = NConsoleClient::RetryFunction([&]() {
673+
return OperationClient.Forget(buildChangefeedId).GetValueSync();
674+
});
675+
if (!forgetStatus.IsSuccess()) {
676+
LOG_E("Error building changefeed " << changefeed.GetName().Quote() << " on " << dbPath.Quote());
677+
return Result<TRestoreResult>(dbPath, std::move(forgetStatus));
678+
}
679+
}
680+
681+
return Result<TRestoreResult>();
682+
}
683+
629684
TRestoreResult TRestoreClient::RestorePermissions(const TFsPath& fsPath, const TString& dbPath,
630685
const TRestoreSettings& settings, const THashSet<TString>& oldEntries)
631686
{

ydb/public/lib/ydb_cli/dump/restore_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ class TRestoreClient {
129129
TRestoreResult CheckSchema(const TString& dbPath, const NTable::TTableDescription& desc);
130130
TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc);
131131
TRestoreResult RestoreIndexes(const TString& dbPath, const NTable::TTableDescription& desc);
132+
TRestoreResult RestoreChangefeeds(const TString& dbPath, const NTable::TTableDescription& desc);
132133
TRestoreResult RestorePermissions(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const THashSet<TString>& oldEntries);
133134

134135
THolder<NPrivate::IDataWriter> CreateDataWriter(const TString& dbPath, const TRestoreSettings& settings,

ydb/public/sdk/cpp/client/ydb_table/table.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2869,6 +2869,54 @@ void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
28692869
}
28702870
}
28712871

2872+
void TChangefeedDescription::SerializeTo(Ydb::Table::ChangefeedDescription& proto) const {
2873+
proto.set_name(Name_);
2874+
proto.set_virtual_timestamps(VirtualTimestamps_);
2875+
proto.set_aws_region(AwsRegion_);
2876+
2877+
switch (Mode_) {
2878+
case EChangefeedMode::KeysOnly:
2879+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_KEYS_ONLY);
2880+
break;
2881+
case EChangefeedMode::Updates:
2882+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_UPDATES);
2883+
break;
2884+
case EChangefeedMode::NewImage:
2885+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_NEW_IMAGE);
2886+
break;
2887+
case EChangefeedMode::OldImage:
2888+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_OLD_IMAGE);
2889+
break;
2890+
case EChangefeedMode::NewAndOldImages:
2891+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_NEW_AND_OLD_IMAGES);
2892+
break;
2893+
case EChangefeedMode::Unknown:
2894+
break;
2895+
}
2896+
2897+
switch (Format_) {
2898+
case EChangefeedFormat::Json:
2899+
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
2900+
break;
2901+
case EChangefeedFormat::DynamoDBStreamsJson:
2902+
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
2903+
break;
2904+
case EChangefeedFormat::DebeziumJson:
2905+
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
2906+
break;
2907+
case EChangefeedFormat::Unknown:
2908+
break;
2909+
}
2910+
2911+
if (ResolvedTimestamps_) {
2912+
SetDuration(*ResolvedTimestamps_, *proto.mutable_resolved_timestamps_interval());
2913+
}
2914+
2915+
for (const auto& [key, value] : Attributes_) {
2916+
(*proto.mutable_attributes())[key] = value;
2917+
}
2918+
}
2919+
28722920
TString TChangefeedDescription::ToString() const {
28732921
TString result;
28742922
TStringOutput out(result);

ydb/public/sdk/cpp/client/ydb_table/table.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ class TChangefeedDescription {
389389
const std::optional<TInitialScanProgress>& GetInitialScanProgress() const;
390390

391391
void SerializeTo(Ydb::Table::Changefeed& proto) const;
392+
void SerializeTo(Ydb::Table::ChangefeedDescription& proto) const;
392393
TString ToString() const;
393394
void Out(IOutputStream& o) const;
394395

0 commit comments

Comments
 (0)