Skip to content

Commit a443231

Browse files
Merge d7a0c4b into 93cb95f
2 parents 93cb95f + d7a0c4b commit a443231

File tree

9 files changed

+264
-12
lines changed

9 files changed

+264
-12
lines changed

ydb/core/tablet_flat/flat_cxx_database.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Date32> { typedef i32 Ty
238238
template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Datetime64> { typedef i64 Type; };
239239
template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Timestamp64> { typedef i64 Type; };
240240
template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Interval64> { typedef i64 Type; };
241+
template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Json> { typedef TVector<TString> Type; };
241242

242243
/// only for compatibility with old code
243244
template <NScheme::TTypeId ValType>

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4464,6 +4464,20 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
44644464
item.Metadata = NBackup::TMetadata::Deserialize(rowset.GetValue<Schema::ImportItems::Metadata>());
44654465
}
44664466

4467+
if (rowset.HaveValue<Schema::ImportItems::Changefeeds>() && rowset.HaveValue<Schema::ImportItems::Topics>()) {
4468+
const ui64 count = rowset.GetValue<Schema::ImportItems::Changefeeds>().size();
4469+
TVector<TImportInfo::TChangefeedImportDescriptions> changefeeds;
4470+
changefeeds.reserve(count);
4471+
for (ui64 i = 0; i < count; ++i) {
4472+
Ydb::Table::ChangefeedDescription changefeed;
4473+
Ydb::Topic::DescribeTopicResult topic;
4474+
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(changefeed, rowset.GetValue<Schema::ImportItems::Changefeeds>()[i] ));
4475+
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(topic, rowset.GetValue<Schema::ImportItems::Topics>()[i] ));
4476+
changefeeds.emplace_back(changefeed, topic);
4477+
}
4478+
item.Changefeeds = std::move(changefeeds);
4479+
}
4480+
44674481
item.State = static_cast<TImportInfo::EState>(rowset.GetValue<Schema::ImportItems::State>());
44684482
item.WaitTxId = rowset.GetValueOrDefault<Schema::ImportItems::WaitTxId>(InvalidTxId);
44694483
item.NextIndexIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextIndexIdx>(0);

ydb/core/tx/schemeshard/schemeshard_import.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,23 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
189189
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
190190
NIceDb::TUpdate<Schema::ImportItems::Metadata>(item.Metadata.Serialize())
191191
);
192+
const ui64 count = item.Changefeeds.size();
193+
TVector<TString> jsonChangefeeds;
194+
TVector<TString> jsonTopics;
195+
jsonChangefeeds.reserve(count);
196+
jsonTopics.reserve(count);
197+
198+
for (const auto& [changefeed, topic] : item.Changefeeds) {
199+
jsonChangefeeds.push_back(changefeed.SerializeAsString());
200+
jsonTopics.push_back(topic.SerializeAsString());
201+
}
202+
203+
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
204+
NIceDb::TUpdate<Schema::ImportItems::Changefeeds>(jsonChangefeeds)
205+
);
206+
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
207+
NIceDb::TUpdate<Schema::ImportItems::Topics>(jsonTopics)
208+
);
192209
}
193210

194211
void TSchemeShard::PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,23 @@
88
namespace NKikimr {
99
namespace NSchemeShard {
1010

11+
bool CreateChangefeedsPropose(THolder<TEvSchemeShard::TEvModifySchemeTransaction>& propose, const TImportInfo::TItem& item, TString& error) {
12+
auto& record = propose->Record;
13+
const auto& changefeeds = item.Changefeeds;
14+
15+
for (const auto& [changefeed, topic]: changefeeds) {
16+
auto& modifyScheme = *record.AddTransaction();
17+
auto& cdcStream = *modifyScheme.MutableCreateCdcStream();
18+
Ydb::StatusIds::StatusCode status;
19+
auto& cdcStreamDescription = *cdcStream.MutableStreamDescription();
20+
if (!FillChangefeedDescription(cdcStreamDescription, changefeed, status, error)) {
21+
return false;
22+
}
23+
cdcStream.SetRetentionPeriodSeconds(topic.Getretention_period().seconds());
24+
}
25+
return true;
26+
}
27+
1128
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
1229
TSchemeShard* ss,
1330
TTxId txId,
@@ -73,6 +90,8 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
7390
return nullptr;
7491
}
7592

93+
CreateChangefeedsPropose(propose, item, error);
94+
7695
return propose;
7796
}
7897

ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp

Lines changed: 184 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,50 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
5151
return errorType == S3Errors::RESOURCE_NOT_FOUND || errorType == S3Errors::NO_SUCH_KEY;
5252
}
5353

54+
static TString ChangefeedDescriptionKey(const TString& changefeedPrefix) {
55+
return TStringBuilder() << changefeedPrefix << "/changefeed_description.pb";
56+
}
57+
58+
static TString TopicDescriptionKey(const TString& changefeedPrefix) {
59+
return TStringBuilder() << changefeedPrefix << "/topic_description.pb";
60+
}
61+
62+
void ListObjects(const TString& prefix) {
63+
auto request = Model::ListObjectsRequest()
64+
.WithPrefix(prefix);
65+
66+
Send(Client, new TEvExternalStorage::TEvListObjectsRequest(request));
67+
}
68+
69+
void HandleChangefeeds(TEvExternalStorage::TEvListObjectsResponse::TPtr& ev) {
70+
const auto& result = ev.Get()->Get()->Result;
71+
72+
LOG_D("HandleChangefeeds TEvExternalStorage::TEvListObjectResponse"
73+
<< ": self# " << SelfId()
74+
<< ", result# " << result);
75+
76+
if (!CheckResult(result, "ListObject")) {
77+
return;
78+
}
79+
80+
const auto& objects = result.GetResult().GetContents();
81+
ChangefeedsKeys.reserve(objects.size());
82+
83+
for (const auto& obj : objects) {
84+
const TFsPath& path = obj.GetKey();
85+
if (path.GetName() == "changefeed_description.pb") {
86+
ChangefeedsKeys.push_back(path.Dirname());
87+
}
88+
}
89+
90+
if (!ChangefeedsKeys.empty()) {
91+
HeadObject(ChangefeedDescriptionKey(ChangefeedsKeys[0]));
92+
} else {
93+
Reply();
94+
}
95+
96+
}
97+
5498
void HeadObject(const TString& key) {
5599
auto request = Model::HeadObjectRequest()
56100
.WithKey(key);
@@ -128,6 +172,36 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
128172
GetObject(ChecksumKey, std::make_pair(0, contentLength - 1));
129173
}
130174

175+
void HandleChangefeed(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
176+
const auto& result = ev->Get()->Result;
177+
178+
LOG_D("HandleChangefeed TEvExternalStorage::TEvHeadObjectResponse"
179+
<< ": self# " << SelfId()
180+
<< ", result# " << result);
181+
182+
if (!CheckResult(result, "HeadObject")) {
183+
return;
184+
}
185+
186+
const auto contentLength = result.GetResult().GetContentLength();
187+
GetObject(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1));
188+
}
189+
190+
void HandleTopic(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
191+
const auto& result = ev->Get()->Result;
192+
193+
LOG_D("HandleTopic TEvExternalStorage::TEvHeadObjectResponse"
194+
<< ": self# " << SelfId()
195+
<< ", result# " << result);
196+
197+
if (!CheckResult(result, "HeadObject")) {
198+
return;
199+
}
200+
201+
const auto contentLength = result.GetResult().GetContentLength();
202+
GetObject(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1));
203+
}
204+
131205
void GetObject(const TString& key, const std::pair<ui64, ui64>& range) {
132206
auto request = Model::GetObjectRequest()
133207
.WithKey(key)
@@ -205,7 +279,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
205279
if (NeedDownloadPermissions) {
206280
StartDownloadingPermissions();
207281
} else {
208-
Reply();
282+
StartDownloadingChangefeeds();
209283
}
210284
};
211285

@@ -242,7 +316,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
242316
item.Permissions = std::move(permissions);
243317

244318
auto nextStep = [this]() {
245-
Reply();
319+
StartDownloadingChangefeeds();
246320
};
247321

248322
if (NeedValidateChecksums) {
@@ -274,6 +348,82 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
274348
ChecksumValidatedCallback();
275349
}
276350

351+
void HandleChangefeed(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
352+
const auto& msg = *ev->Get();
353+
const auto& result = msg.Result;
354+
355+
LOG_D("HandleChangefeed TEvExternalStorage::TEvGetObjectResponse"
356+
<< ": self# " << SelfId()
357+
<< ", result# " << result);
358+
359+
if (!CheckResult(result, "GetObject")) {
360+
return;
361+
}
362+
363+
Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
364+
auto& item = ImportInfo->Items.at(ItemIdx);
365+
366+
LOG_T("Trying to parse changefeed"
367+
<< ": self# " << SelfId()
368+
<< ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));
369+
370+
Ydb::Table::ChangefeedDescription changefeed;
371+
if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &changefeed)) {
372+
return Reply(false, "Cannot parse сhangefeed");
373+
}
374+
item.Changefeeds[IndexDownloadedChangefeed].ChangefeedDescription = std::move(changefeed);
375+
376+
auto nextStep = [this]() {
377+
HeadObject(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]));
378+
};
379+
380+
if (NeedValidateChecksums) {
381+
StartValidatingChecksum(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), msg.Body, nextStep);
382+
} else {
383+
nextStep();
384+
}
385+
}
386+
387+
void HandleTopic(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
388+
const auto& msg = *ev->Get();
389+
const auto& result = msg.Result;
390+
391+
LOG_D("HandleTopic TEvExternalStorage::TEvGetObjectResponse"
392+
<< ": self# " << SelfId()
393+
<< ", result# " << result);
394+
395+
if (!CheckResult(result, "GetObject")) {
396+
return;
397+
}
398+
399+
Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
400+
auto& item = ImportInfo->Items.at(ItemIdx);
401+
402+
LOG_T("Trying to parse topic"
403+
<< ": self# " << SelfId()
404+
<< ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));
405+
406+
Ydb::Topic::DescribeTopicResult topic;
407+
if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &topic)) {
408+
return Reply(false, "Cannot parse topic");
409+
}
410+
item.Changefeeds[IndexDownloadedChangefeed].Topic = std::move(topic);
411+
412+
auto nextStep = [this]() {
413+
if (++IndexDownloadedChangefeed == ChangefeedsKeys.size()) {
414+
Reply();
415+
} else {
416+
HeadObject(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]));
417+
}
418+
};
419+
420+
if (NeedValidateChecksums) {
421+
StartValidatingChecksum(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), msg.Body, nextStep);
422+
} else {
423+
nextStep();
424+
}
425+
}
426+
277427
template <typename TResult>
278428
bool CheckResult(const TResult& result, const TStringBuf marker) {
279429
if (result.IsSuccess()) {
@@ -312,12 +462,20 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
312462
TActor::PassAway();
313463
}
314464

315-
void Download(const TString& key) {
465+
void DownloadCommon() {
316466
if (Client) {
317467
Send(Client, new TEvents::TEvPoisonPill());
318468
}
319469
Client = RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
470+
}
471+
472+
void DownloadWithoutKey() {
473+
DownloadCommon();
474+
ListObjects(ImportInfo->Settings.items(ItemIdx).source_prefix());
475+
}
320476

477+
void Download(const TString& key) {
478+
DownloadCommon();
321479
HeadObject(key);
322480
}
323481

@@ -337,6 +495,10 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
337495
Download(ChecksumKey);
338496
}
339497

498+
void DownloadChangefeeds() {
499+
DownloadWithoutKey();
500+
}
501+
340502
void ResetRetries() {
341503
Attempt = 0;
342504
}
@@ -353,6 +515,12 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
353515
Become(&TThis::StateDownloadPermissions);
354516
}
355517

518+
void StartDownloadingChangefeeds() {
519+
ResetRetries();
520+
DownloadChangefeeds();
521+
Become(&TThis::StateDownloadChangefeeds);
522+
}
523+
356524
void StartValidatingChecksum(const TString& key, const TString& object, std::function<void()> checksumValidatedCallback) {
357525
ChecksumKey = NBackup::ChecksumKey(key);
358526
Checksum = NBackup::ComputeChecksum(object);
@@ -413,6 +581,17 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
413581
}
414582
}
415583

584+
STATEFN(StateDownloadChangefeeds) {
585+
switch (ev->GetTypeRewrite()) {
586+
hFunc(TEvExternalStorage::TEvListObjectsResponse, HandleChangefeeds);
587+
hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleChangefeed);
588+
hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleChangefeed);
589+
590+
sFunc(TEvents::TEvWakeup, DownloadChangefeeds);
591+
sFunc(TEvents::TEvPoisonPill, PassAway);
592+
}
593+
}
594+
416595
STATEFN(StateDownloadChecksum) {
417596
switch (ev->GetTypeRewrite()) {
418597
hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleChecksum);
@@ -432,6 +611,8 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
432611
const TString MetadataKey;
433612
TString SchemeKey;
434613
const TString PermissionsKey;
614+
TVector<TString> ChangefeedsKeys;
615+
ui64 IndexDownloadedChangefeed = 0;
435616

436617
const ui32 Retries;
437618
ui32 Attempt = 0;

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2837,6 +2837,11 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
28372837
S3 = 0,
28382838
};
28392839

2840+
struct TChangefeedImportDescriptions {
2841+
Ydb::Table::ChangefeedDescription ChangefeedDescription;
2842+
Ydb::Topic::DescribeTopicResult Topic;
2843+
};
2844+
28402845
struct TItem {
28412846
enum class ESubState: ui8 {
28422847
AllocateTxId = 0,
@@ -2851,6 +2856,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
28512856
TMaybe<NKikimrSchemeOp::TModifyScheme> PreparedCreationQuery;
28522857
TMaybeFail<Ydb::Scheme::ModifyPermissionsRequest> Permissions;
28532858
NBackup::TMetadata Metadata;
2859+
TVector<TChangefeedImportDescriptions> Changefeeds;
28542860

28552861
EState State = EState::GetScheme;
28562862
ESubState SubState = ESubState::AllocateTxId;

ydb/core/tx/schemeshard/schemeshard_schema.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1562,6 +1562,8 @@ struct Schema : NIceDb::Schema {
15621562
struct PreparedCreationQuery : Column<14, NScheme::NTypeIds::String> {};
15631563
struct Permissions : Column<11, NScheme::NTypeIds::String> {};
15641564
struct Metadata : Column<12, NScheme::NTypeIds::String> {};
1565+
struct Changefeeds : Column<13, NScheme::NTypeIds::Json> {};
1566+
struct Topics : Column<14, NScheme::NTypeIds::Json> {};
15651567

15661568
struct State : Column<7, NScheme::NTypeIds::Byte> {};
15671569
struct WaitTxId : Column<8, NScheme::NTypeIds::Uint64> { using Type = TTxId; };

0 commit comments

Comments
 (0)