Skip to content

Commit d395eb2

Browse files
Merge fd9c415 into d78c86c
2 parents d78c86c + fd9c415 commit d395eb2

File tree

4 files changed

+96
-33
lines changed

4 files changed

+96
-33
lines changed

ydb/library/backup/backup.cpp

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@
33
#include "util.h"
44

55
#include <ydb/public/api/protos/ydb_table.pb.h>
6+
#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
67
#include <ydb/public/lib/ydb_cli/common/recursive_remove.h>
8+
#include <ydb/public/lib/ydb_cli/common/retry_func.h>
79
#include <ydb/public/lib/ydb_cli/dump/files/files.h>
810
#include <ydb/public/lib/ydb_cli/dump/util/util.h>
911
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
12+
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
1013
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
1114
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
1215
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
16+
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
1317
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
1418

1519
#include <library/cpp/containers/stack_vector/stack_vec.h>
@@ -475,6 +479,36 @@ void BackupPermissions(TDriver driver, const TString& dbPrefix, const TString& p
475479
WriteProtoToFile(proto, folderPath, NDump::NFiles::Permissions());
476480
}
477481

482+
Ydb::Table::ChangefeedDescription ProtoFromChangefeedDesc(const NTable::TChangefeedDescription& changefeedDesc) {
483+
Ydb::Table::ChangefeedDescription protoChangeFeedDesc;
484+
changefeedDesc.SerializeTo(protoChangeFeedDesc);
485+
return protoChangeFeedDesc;
486+
}
487+
488+
NTopic::TDescribeTopicResult DescribeTopic(TDriver driver, const TString& path) {
489+
NYdb::NTopic::TTopicClient client(driver);
490+
return NConsoleClient::RetryFunction([&]() {
491+
return client.DescribeTopic(path).GetValueSync();
492+
});
493+
}
494+
495+
void BackupChangefeeds(TDriver driver, const TString& dbPrefix, const TString& path, const TFsPath& folderPath) {
496+
const auto dirPath = JoinDatabasePath(dbPrefix, path);
497+
auto desc = DescribeTable(driver, dirPath);
498+
for (const auto& changefeedDesc : desc.GetChangefeedDescriptions()) {
499+
TFsPath changefeedDirPath = CreateDirectory(folderPath, changefeedDesc.GetName());
500+
501+
auto protoChangeFeedDesc = ProtoFromChangefeedDesc(changefeedDesc);
502+
const auto descTopicResult = DescribeTopic(driver, JoinDatabasePath(dirPath, changefeedDesc.GetName()));
503+
NConsoleClient::ThrowOnError(descTopicResult);
504+
const auto& topicDescription = descTopicResult.GetTopicDescription();
505+
const auto protoTopicDescription = NYdb::TProtoAccessor::GetProto(topicDescription);
506+
507+
WriteProtoToFile(protoChangeFeedDesc, changefeedDirPath, NDump::NFiles::Changefeed());
508+
WriteProtoToFile(protoTopicDescription, changefeedDirPath, NDump::NFiles::Topic());
509+
}
510+
}
511+
478512
void BackupTable(TDriver driver, const TString& dbPrefix, const TString& backupPrefix, const TString& path,
479513
const TFsPath& folderPath, bool schemaOnly, bool preservePoolKinds, bool ordered) {
480514
Y_ENSURE(!path.empty());
@@ -486,8 +520,9 @@ void BackupTable(TDriver driver, const TString& dbPrefix, const TString& backupP
486520

487521
auto desc = DescribeTable(driver, fullPath);
488522
auto proto = ProtoFromTableDescription(desc, preservePoolKinds);
489-
490523
WriteProtoToFile(proto, folderPath, NDump::NFiles::TableScheme());
524+
525+
BackupChangefeeds(driver, dbPrefix, path, folderPath);
491526
BackupPermissions(driver, dbPrefix, path, folderPath);
492527

493528
if (!schemaOnly) {

ydb/library/backup/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ PEERDIR(
1414
ydb/public/sdk/cpp/client/ydb_driver
1515
ydb/public/sdk/cpp/client/ydb_result
1616
ydb/public/sdk/cpp/client/ydb_table
17+
ydb/public/sdk/cpp/client/ydb_topic
1718
ydb/public/sdk/cpp/client/ydb_value
1819
)
1920

ydb/public/sdk/cpp/client/ydb_table/table.cpp

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2816,56 +2816,79 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
28162816
return ret;
28172817
}
28182818

2819-
void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
2819+
template<typename TProto>
2820+
void TChangefeedDescription::SerializeCommonFields(TProto& proto) const {
28202821
proto.set_name(Name_);
28212822
proto.set_virtual_timestamps(VirtualTimestamps_);
2822-
proto.set_initial_scan(InitialScan_);
28232823
proto.set_aws_region(AwsRegion_);
28242824

28252825
switch (Mode_) {
2826-
case EChangefeedMode::KeysOnly:
2827-
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_KEYS_ONLY);
2828-
break;
2829-
case EChangefeedMode::Updates:
2830-
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_UPDATES);
2831-
break;
2832-
case EChangefeedMode::NewImage:
2833-
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_NEW_IMAGE);
2834-
break;
2835-
case EChangefeedMode::OldImage:
2836-
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_OLD_IMAGE);
2837-
break;
2838-
case EChangefeedMode::NewAndOldImages:
2839-
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_NEW_AND_OLD_IMAGES);
2840-
break;
2841-
case EChangefeedMode::Unknown:
2842-
break;
2826+
case EChangefeedMode::KeysOnly:
2827+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_KEYS_ONLY);
2828+
break;
2829+
case EChangefeedMode::Updates:
2830+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_UPDATES);
2831+
break;
2832+
case EChangefeedMode::NewImage:
2833+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_NEW_IMAGE);
2834+
break;
2835+
case EChangefeedMode::OldImage:
2836+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_OLD_IMAGE);
2837+
break;
2838+
case EChangefeedMode::NewAndOldImages:
2839+
proto.set_mode(Ydb::Table::ChangefeedMode::MODE_NEW_AND_OLD_IMAGES);
2840+
break;
2841+
case EChangefeedMode::Unknown:
2842+
break;
28432843
}
28442844

28452845
switch (Format_) {
2846-
case EChangefeedFormat::Json:
2847-
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
2848-
break;
2849-
case EChangefeedFormat::DynamoDBStreamsJson:
2850-
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
2851-
break;
2852-
case EChangefeedFormat::DebeziumJson:
2853-
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
2854-
break;
2855-
case EChangefeedFormat::Unknown:
2856-
break;
2846+
case EChangefeedFormat::Json:
2847+
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
2848+
break;
2849+
case EChangefeedFormat::DynamoDBStreamsJson:
2850+
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
2851+
break;
2852+
case EChangefeedFormat::DebeziumJson:
2853+
proto.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
2854+
break;
2855+
case EChangefeedFormat::Unknown:
2856+
break;
28572857
}
28582858

28592859
if (ResolvedTimestamps_) {
28602860
SetDuration(*ResolvedTimestamps_, *proto.mutable_resolved_timestamps_interval());
28612861
}
28622862

2863+
for (const auto& [key, value] : Attributes_) {
2864+
(*proto.mutable_attributes())[key] = value;
2865+
}
2866+
}
2867+
2868+
void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
2869+
SerializeCommonFields(proto);
2870+
proto.set_initial_scan(InitialScan_);
2871+
28632872
if (RetentionPeriod_) {
28642873
SetDuration(*RetentionPeriod_, *proto.mutable_retention_period());
28652874
}
2875+
}
28662876

2867-
for (const auto& [key, value] : Attributes_) {
2868-
(*proto.mutable_attributes())[key] = value;
2877+
void TChangefeedDescription::SerializeTo(Ydb::Table::ChangefeedDescription& proto) const {
2878+
SerializeCommonFields(proto);
2879+
2880+
switch (State_) {
2881+
case EChangefeedState::Enabled:
2882+
proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_ENABLED);
2883+
break;
2884+
case EChangefeedState::Disabled:
2885+
proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_DISABLED);
2886+
break;
2887+
case EChangefeedState::InitialScan:
2888+
proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_INITIAL_SCAN);
2889+
break;
2890+
case EChangefeedState::Unknown:
2891+
break;
28692892
}
28702893
}
28712894

ydb/public/sdk/cpp/client/ydb_table/table.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ class TChangefeedDescription {
389389
const std::optional<TInitialScanProgress>& GetInitialScanProgress() const;
390390

391391
void SerializeTo(Ydb::Table::Changefeed& proto) const;
392+
void SerializeTo(Ydb::Table::ChangefeedDescription& proto) const;
392393
TString ToString() const;
393394
void Out(IOutputStream& o) const;
394395

@@ -399,6 +400,9 @@ class TChangefeedDescription {
399400
template <typename TProto>
400401
static TChangefeedDescription FromProto(const TProto& proto);
401402

403+
template<typename TProto>
404+
void SerializeCommonFields(TProto& proto) const;
405+
402406
private:
403407
TString Name_;
404408
EChangefeedMode Mode_;

0 commit comments

Comments
 (0)