Skip to content

Commit e95d266

Browse files
authored
[feat](snapshot) calculate snapshot data size (#59168)
1 parent 72a6c12 commit e95d266

File tree

9 files changed

+753
-20
lines changed

9 files changed

+753
-20
lines changed

cloud/src/meta-store/keys.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,6 +1113,40 @@ bool decode_meta_tablet_key(std::string_view* in, int64_t* tablet_id, Versionsta
11131113
return true;
11141114
}
11151115

1116+
// Decode tablet inverted index key
1117+
// Return true if decode successfully, otherwise false
1118+
bool decode_tablet_inverted_index_key(std::string_view* in, int64_t* db_id, int64_t* table_id,
1119+
int64_t* index_id, int64_t* partition_id,
1120+
int64_t* tablet_id) {
1121+
// 0x03 "index" ${instance_id} "tablet_inverted" ${db_id} ${table_id} ${index_id} ${partition} ${tablet}
1122+
if (in->empty() || static_cast<uint8_t>((*in)[0]) != CLOUD_VERSIONED_KEY_SPACE03) {
1123+
return false;
1124+
}
1125+
in->remove_prefix(1);
1126+
1127+
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1128+
auto res = decode_key(in, &out);
1129+
if (res != 0 || out.size() != 8) {
1130+
return false;
1131+
}
1132+
1133+
try {
1134+
if (std::get<std::string>(std::get<0>(out[0])) != INDEX_KEY_PREFIX ||
1135+
std::get<std::string>(std::get<0>(out[2])) != TABLET_INVERTED_INDEX_KEY_INFIX) {
1136+
return false;
1137+
}
1138+
*db_id = std::get<int64_t>(std::get<0>(out[3]));
1139+
*table_id = std::get<int64_t>(std::get<0>(out[4]));
1140+
*index_id = std::get<int64_t>(std::get<0>(out[5]));
1141+
*partition_id = std::get<int64_t>(std::get<0>(out[6]));
1142+
*tablet_id = std::get<int64_t>(std::get<0>(out[7]));
1143+
} catch (const std::bad_variant_access& e) {
1144+
return false;
1145+
}
1146+
1147+
return true;
1148+
}
1149+
11161150
bool decode_snapshot_ref_key(std::string_view* in, std::string* instance_id,
11171151
Versionstamp* timestamp, std::string* ref_instance_id) {
11181152
// Key format: 0x03 + encode_bytes("snapshot") + encode_bytes(instance_id) +

cloud/src/meta-store/keys.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,11 @@ bool decode_meta_schema_key(std::string_view* in, int64_t* index_id, int64_t* sc
597597
// Return true if decode successfully, otherwise false
598598
bool decode_meta_tablet_key(std::string_view* in, int64_t* tablet_id, Versionstamp* timestamp);
599599

600+
// Decode tablet inverted index key
601+
// Return true if decode successfully, otherwise false
602+
bool decode_tablet_inverted_index_key(std::string_view* in, int64_t* db_id, int64_t* table_id,
603+
int64_t* index_id, int64_t* partition_id, int64_t* tablet_id);
604+
600605
// Decode snapshot reference key
601606
// Return true if decode successfully, otherwise false
602607
bool decode_snapshot_ref_key(std::string_view* in, std::string* instance_id,

cloud/src/recycler/recycler.h

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,12 @@ class InstanceRecycler {
552552
SegmentRecyclerMetricsContext segment_metrics_context_;
553553
};
554554

555+
struct OperationLogReferenceInfo {
556+
bool referenced_by_instance = false;
557+
bool referenced_by_snapshot = false;
558+
Versionstamp referenced_snapshot_timestamp;
559+
};
560+
555561
// Helper class to check if operation logs can be recycled based on snapshots and versionstamps
556562
class OperationLogRecycleChecker {
557563
public:
@@ -563,10 +569,15 @@ class OperationLogRecycleChecker {
563569
int init();
564570

565571
// Check if an operation log can be recycled
566-
bool can_recycle(const Versionstamp& log_versionstamp, int64_t log_min_timestamp) const;
572+
bool can_recycle(const Versionstamp& log_versionstamp, int64_t log_min_timestamp,
573+
OperationLogReferenceInfo* reference_info) const;
567574

568575
Versionstamp max_versionstamp() const { return max_versionstamp_; }
569576

577+
const std::vector<std::pair<SnapshotPB, Versionstamp>>& get_snapshots() const {
578+
return snapshots_;
579+
}
580+
570581
private:
571582
std::string_view instance_id_;
572583
TxnKv* txn_kv_;
@@ -577,4 +588,33 @@ class OperationLogRecycleChecker {
577588
std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots_;
578589
};
579590

591+
class SnapshotDataSizeCalculator {
592+
public:
593+
SnapshotDataSizeCalculator(std::string_view instance_id, std::shared_ptr<TxnKv> txn_kv)
594+
: instance_id_(instance_id), txn_kv_(std::move(txn_kv)) {}
595+
596+
void init(const std::vector<std::pair<SnapshotPB, Versionstamp>>& snapshots);
597+
598+
int calculate_operation_log_data_size(const std::string_view& log_key,
599+
OperationLogPB& operation_log,
600+
OperationLogReferenceInfo& reference_info);
601+
602+
int save_snapshot_data_size_with_retry();
603+
604+
private:
605+
int get_all_index_partitions(int64_t db_id, int64_t table_id, int64_t index_id,
606+
std::vector<int64_t>* partition_ids);
607+
int get_index_partition_data_size(int64_t db_id, int64_t table_id, int64_t index_id,
608+
int64_t partition_id, int64_t* data_size);
609+
int save_operation_log(const std::string_view& log_key, OperationLogPB& operation_log);
610+
int save_snapshot_data_size();
611+
612+
std::string_view instance_id_;
613+
std::shared_ptr<TxnKv> txn_kv_;
614+
615+
int64_t instance_retained_data_size_ = 0;
616+
std::map<Versionstamp, int64_t> retained_data_size_;
617+
std::set<std::string> calculated_partitions_;
618+
};
619+
580620
} // namespace doris::cloud

cloud/src/recycler/recycler_operation_log.cpp

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ int OperationLogRecycleChecker::init() {
8282
snapshots_.clear();
8383
snapshot_indexes_.clear();
8484
MetaReader reader(instance_id_);
85-
err = reader.get_snapshots(txn.get(), &snapshots_);
85+
std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
86+
err = reader.get_snapshots(txn.get(), &snapshots);
8687
if (err != TxnErrorCode::TXN_OK) {
8788
LOG_WARNING("failed to get snapshots").tag("err", err);
8889
return -1;
@@ -96,16 +97,22 @@ int OperationLogRecycleChecker::init() {
9697
}
9798

9899
max_versionstamp_ = Versionstamp(read_version, 0);
99-
for (size_t i = 0; i < snapshots_.size(); ++i) {
100-
auto&& [snapshot, versionstamp] = snapshots_[i];
101-
snapshot_indexes_.insert(std::make_pair(versionstamp, i));
100+
for (size_t i = 0; i < snapshots.size(); ++i) {
101+
auto&& [snapshot, versionstamp] = snapshots[i];
102+
if (snapshot.status() == SnapshotStatus::SNAPSHOT_ABORTED ||
103+
snapshot.status() == SnapshotStatus::SNAPSHOT_RECYCLED) {
104+
continue;
105+
}
106+
snapshot_indexes_.insert(std::make_pair(versionstamp, snapshots_.size()));
107+
snapshots_.push_back(std::make_pair(std::move(snapshot), versionstamp));
102108
}
103109

104110
return 0;
105111
}
106112

107113
bool OperationLogRecycleChecker::can_recycle(const Versionstamp& log_versionstamp,
108-
int64_t log_min_timestamp) const {
114+
int64_t log_min_timestamp,
115+
OperationLogReferenceInfo* reference_info) const {
109116
Versionstamp log_min_read_timestamp(log_min_timestamp, 0);
110117
if (log_versionstamp > max_versionstamp_) {
111118
// Not recycleable.
@@ -114,12 +121,15 @@ bool OperationLogRecycleChecker::can_recycle(const Versionstamp& log_versionstam
114121

115122
// Do not recycle operation logs referenced by active snapshots.
116123
if (log_min_read_timestamp < source_snapshot_versionstamp_) {
124+
reference_info->referenced_by_instance = true;
117125
return false;
118126
}
119127

120128
auto it = snapshot_indexes_.lower_bound(log_min_read_timestamp);
121129
if (it != snapshot_indexes_.end() && snapshots_[it->second].second < log_versionstamp) {
122130
// in [log_min_read_timestmap, log_versionstamp)
131+
reference_info->referenced_by_snapshot = true;
132+
reference_info->referenced_snapshot_timestamp = snapshots_[it->second].second;
123133
return false;
124134
}
125135

@@ -677,6 +687,8 @@ int InstanceRecycler::recycle_operation_logs() {
677687
LOG_WARNING("failed to initialize recycle checker").tag("error_code", init_res);
678688
return init_res;
679689
}
690+
SnapshotDataSizeCalculator calculator(instance_id_, txn_kv_);
691+
calculator.init(recycle_checker.get_snapshots());
680692

681693
auto scan_and_recycle_operation_log = [&](const std::string_view& key,
682694
const std::vector<std::string>& raw_keys,
@@ -690,7 +702,9 @@ int InstanceRecycler::recycle_operation_logs() {
690702
}
691703

692704
size_t value_size = operation_log.ByteSizeLong();
693-
if (recycle_checker.can_recycle(log_versionstamp, operation_log.min_timestamp())) {
705+
OperationLogReferenceInfo reference_info;
706+
if (recycle_checker.can_recycle(log_versionstamp, operation_log.min_timestamp(),
707+
&reference_info)) {
694708
AnnotateTag tag("log_key", hex(key));
695709
int res = recycle_operation_log(log_versionstamp, raw_keys, std::move(operation_log));
696710
if (res != 0) {
@@ -700,6 +714,13 @@ int InstanceRecycler::recycle_operation_logs() {
700714

701715
recycled_operation_logs++;
702716
recycled_operation_log_data_size += value_size;
717+
} else {
718+
int res = calculator.calculate_operation_log_data_size(key, operation_log,
719+
reference_info);
720+
if (res != 0) {
721+
LOG_WARNING("failed to calculate operation log data size").tag("error_code", res);
722+
return res;
723+
}
703724
}
704725

705726
total_operation_logs++;
@@ -769,6 +790,11 @@ int InstanceRecycler::recycle_operation_logs() {
769790
.tag("error_code", iter->error_code());
770791
return -1;
771792
}
793+
int res = calculator.save_snapshot_data_size_with_retry();
794+
if (res != 0) {
795+
LOG_WARNING("failed to save snapshot data size").tag("error_code", res);
796+
return res;
797+
}
772798
return 0;
773799
}
774800

0 commit comments

Comments
 (0)