@@ -108,14 +108,12 @@ namespace NKikimr {
108108 };
109109
110110
111- // //////////////////////////////////////////////////////////////////////////
112- // TCompactRecordMergerBase
113111 // //////////////////////////////////////////////////////////////////////////
114112 // Valid call sequence:
115113 // Clear(); Add(); ... Add(); Finish()
116114 // GetMemRec(); GetData();
117115 template <class TKey , class TMemRec >
118- class TCompactRecordMergerBase : public TRecordMergerBase <TKey, TMemRec> {
116+ class TCompactRecordMerger : public TRecordMergerBase <TKey, TMemRec> {
119117 protected:
120118 using TBase = TRecordMergerBase<TKey, TMemRec>;
121119 using TBase::MemRec;
@@ -132,18 +130,16 @@ namespace NKikimr {
132130 };
133131
134132 public:
135- TCompactRecordMergerBase (const TBlobStorageGroupType >ype, bool addHeader)
133+ TCompactRecordMerger (const TBlobStorageGroupType >ype, bool addHeader)
136134 : TBase(gtype, true )
137- , MemRecs()
138- , ProducingSmallBlob(false )
139- , NeedToLoadData(ELoadData::NotSet)
140135 , AddHeader(addHeader)
141136 {}
142137
143138 void Clear () {
144139 TBase::Clear ();
145140 MemRecs.clear ();
146141 ProducingSmallBlob = false ;
142+ ProducingHugeBlob = false ;
147143 NeedToLoadData = ELoadData::NotSet;
148144 DataMerger.Clear ();
149145 }
@@ -156,48 +152,42 @@ namespace NKikimr {
156152 }
157153
158154 void AddFromSegment (const TMemRec &memRec, const TDiskPart *outbound, const TKey &key, ui64 circaLsn) {
159- Y_DEBUG_ABORT_UNLESS (NeedToLoadData != ELoadData::NotSet);
160- AddBasic (memRec, key);
161- switch (memRec.GetType ()) {
162- case TBlobType::DiskBlob: {
163- if (memRec.HasData () && NeedToLoadData == ELoadData::LoadData) {
164- MemRecs.push_back (memRec);
165- ProducingSmallBlob = true ;
166- }
167- break ;
168- }
169- case TBlobType::HugeBlob:
170- case TBlobType::ManyHugeBlobs: {
171- TDiskDataExtractor extr;
172- memRec.GetDiskData (&extr, outbound);
173- const NMatrix::TVectorType v = memRec.GetLocalParts (GType);
174- DataMerger.AddHugeBlob (extr.Begin , extr.End , v, circaLsn);
175- break ;
176- }
177- default :
178- Y_ABORT (" Impossible case" );
179- }
180- VerifyConsistency (memRec, outbound);
155+ Add (memRec, nullptr , outbound, key, circaLsn);
181156 }
182157
183158 void AddFromFresh (const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
159+ Add (memRec, data, nullptr , key, lsn);
160+ }
161+
162+ void Add (const TMemRec& memRec, const TRope *data, const TDiskPart *outbound, const TKey& key, ui64 lsn) {
184163 Y_DEBUG_ABORT_UNLESS (NeedToLoadData != ELoadData::NotSet);
185164 AddBasic (memRec, key);
186- if (memRec.HasData ()) {
187- if (data) {
188- Y_ABORT_UNLESS (memRec.GetType () == TBlobType::MemBlob || memRec.GetType () == TBlobType::DiskBlob);
189- if (NeedToLoadData == ELoadData::LoadData) {
190- DataMerger.AddBlob (TDiskBlob (data, memRec.GetLocalParts (GType), GType, key.LogoBlobID ()));
191- ProducingSmallBlob = true ;
192- } else {
193- // intentionally do nothing: don't add any data to DataMerger, because we don't need it
194- }
195- } else {
196- Y_ABORT_UNLESS (memRec.GetType () == TBlobType::HugeBlob);
197- TDiskDataExtractor extr;
198- memRec.GetDiskData (&extr, nullptr );
199- const NMatrix::TVectorType v = memRec.GetLocalParts (GType);
200- DataMerger.AddHugeBlob (extr.Begin , extr.End , v, lsn);
165+ if (const NMatrix::TVectorType local = memRec.GetLocalParts (GType); !local.Empty ()) {
166+ TDiskDataExtractor extr;
167+ switch (memRec.GetType ()) {
168+ case TBlobType::MemBlob:
169+ case TBlobType::DiskBlob:
170+ if (NeedToLoadData == ELoadData::LoadData) {
171+ if (data) {
172+ DataMerger.AddBlob (TDiskBlob (data, local, GType, key.LogoBlobID ()));
173+ }
174+ if (memRec.HasData () && memRec.GetType () == TBlobType::DiskBlob) {
175+ MemRecs.push_back (memRec);
176+ }
177+ Y_DEBUG_ABORT_UNLESS (!ProducingHugeBlob);
178+ ProducingSmallBlob = true ;
179+ }
180+ break ;
181+
182+ case TBlobType::ManyHugeBlobs:
183+ Y_ABORT_UNLESS (outbound);
184+ [[fallthrough]];
185+ case TBlobType::HugeBlob:
186+ memRec.GetDiskData (&extr, outbound);
187+ DataMerger.AddHugeBlob (extr.Begin , extr.End , local, lsn);
188+ Y_DEBUG_ABORT_UNLESS (!ProducingSmallBlob);
189+ ProducingHugeBlob = true ;
190+ break ;
201191 }
202192 }
203193 VerifyConsistency (memRec, nullptr );
@@ -239,6 +229,13 @@ namespace NKikimr {
239229 }
240230
241231 void Finish () {
232+ if (NeedToLoadData == ELoadData::DontLoadData) {
233+ Y_ABORT_UNLESS (!DataMerger.HasSmallBlobs ()); // we didn't put any small blob to the data merger
234+ // if we have huge blobs for the record, than we set TBlobType::HugeBlob or
235+ // TBlobType::ManyHugeBlobs a few lines below
236+ MemRec.SetNoBlob ();
237+ }
238+
242239 Y_DEBUG_ABORT_UNLESS (!Empty ());
243240 VerifyConsistency ();
244241
@@ -263,118 +260,22 @@ namespace NKikimr {
263260 return &DataMerger;
264261 }
265262
266- protected:
267- TStackVec<TMemRec, 16 > MemRecs;
268- bool ProducingSmallBlob;
269- ELoadData NeedToLoadData;
270- TDataMerger DataMerger;
271- const bool AddHeader;
272- };
273-
274- // //////////////////////////////////////////////////////////////////////////
275- // TCompactRecordMergerIndexPass
276- // //////////////////////////////////////////////////////////////////////////
277- template <typename TKey, typename TMemRec>
278- class TCompactRecordMergerIndexPass : public TCompactRecordMergerBase <TKey, TMemRec> {
279- using TBase = TCompactRecordMergerBase<TKey, TMemRec>;
280-
281- using ELoadData = typename TBase::ELoadData;
282-
283- using TBase::MemRecs;
284- using TBase::ProducingSmallBlob;
285- using TBase::NeedToLoadData;
286- using TBase::DataMerger;
287- using TBase::MemRec;
288-
289- public:
290- TCompactRecordMergerIndexPass (const TBlobStorageGroupType >ype, bool addHeader)
291- : TBase(gtype, addHeader)
292- {}
293-
294- void Finish () {
295- if (NeedToLoadData == ELoadData::DontLoadData) {
296- Y_ABORT_UNLESS (!DataMerger.HasSmallBlobs ()); // we didn't put any small blob to the data merger
297- // if we have huge blobs for the record, than we set TBlobType::HugeBlob or
298- // TBlobType::ManyHugeBlobs a few lines below
299- MemRec.SetNoBlob ();
300- }
301-
302- TBase::Finish ();
303- }
304-
305263 template <typename TCallback>
306264 void ForEachSmallDiskBlob (TCallback&& callback) {
307265 for (const auto & memRec : MemRecs) {
308266 callback (memRec);
309267 }
310268 }
311- };
312-
313- // //////////////////////////////////////////////////////////////////////////
314- // TCompactRecordMergerDataPass
315- // //////////////////////////////////////////////////////////////////////////
316- template <typename TKey, typename TMemRec>
317- class TCompactRecordMergerDataPass : public TCompactRecordMergerBase <TKey, TMemRec> {
318- using TBase = TCompactRecordMergerBase<TKey, TMemRec>;
319-
320- using TBase::ProducingSmallBlob;
321- using TBase::MemRecs;
322- using TBase::MemRec;
323- using TBase::DataMerger;
324- using TBase::GType;
325- using TBase::SetLoadDataMode;
326-
327- public:
328- TCompactRecordMergerDataPass (const TBlobStorageGroupType >ype, bool addHeader)
329- : TBase(gtype, addHeader)
330- {
331- SetLoadDataMode (true );
332- }
333-
334- void Clear () {
335- TBase::Clear ();
336- ReadSmallBlobs.clear ();
337- SetLoadDataMode (true );
338- }
339-
340- // add read small blob content; they should come in order as returned from GetSmallBlobDiskParts by index merger
341- void AddReadSmallBlob (TString data) {
342- Y_ABORT_UNLESS (ProducingSmallBlob);
343- ReadSmallBlobs.push_back (std::move (data));
344- }
345-
346- void Finish () {
347- // ensure we are producing small blobs; otherwise this merger should never be created
348- Y_ABORT_UNLESS (ProducingSmallBlob);
349-
350- // add all read small blobs into blob merger
351- const size_t count = ReadSmallBlobs.size ();
352- Y_ABORT_UNLESS (count == +MemRecs, " count# %zu +MemRecs# %zu" , count, +MemRecs);
353- for (size_t i = 0 ; i < count; ++i) {
354- const TMemRec& memRec = MemRecs[i]->GetMemRec ();
355- const TString& buffer = ReadSmallBlobs[i];
356- Y_ABORT_UNLESS (buffer.size () == memRec.DataSize ());
357- DataMerger.AddBlob (TDiskBlob (buffer.data (), buffer.size (), memRec.GetLocalParts (GType)));
358- }
359-
360-
361- // ensure that data merger has small blob
362- Y_ABORT_UNLESS (DataMerger.HasSmallBlobs ());
363-
364- // finalize base class logic; it also generates blob record
365- TBase::Finish ();
366-
367- // ensure that we have generated correct DiskBlob with full set of declared parts
368- const TDiskBlob& blob = DataMerger.GetDiskBlobMerger ().GetDiskBlob ();
369- Y_ABORT_UNLESS (blob.GetParts () == MemRec.GetLocalParts (GType));
370- Y_ABORT_UNLESS (MemRec.GetType () == TBlobType::DiskBlob);
371- }
372269
373- private:
374- TVector<TString> ReadSmallBlobs;
270+ protected:
271+ TStackVec<TMemRec, 16 > MemRecs;
272+ bool ProducingSmallBlob = false ;
273+ bool ProducingHugeBlob = false ;
274+ ELoadData NeedToLoadData = ELoadData::NotSet;
275+ TDataMerger DataMerger;
276+ const bool AddHeader;
375277 };
376278
377-
378279 // //////////////////////////////////////////////////////////////////////////
379280 // TRecordMergerCallback
380281 // //////////////////////////////////////////////////////////////////////////
@@ -446,20 +347,30 @@ namespace NKikimr {
446347
447348 void AddFromFresh (const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
448349 AddBasic (memRec, key);
449- if (memRec.HasData ()) {
450- const NMatrix::TVectorType v = memRec.GetLocalParts (GType);
451- if (data) {
452- Y_ABORT_UNLESS (memRec.GetType () == TBlobType::MemBlob);
453- // we have in-memory data in a rope, it always wins among other data,
454- // so we call Callback immediately and remove any data for this local part
455- // from LastWriteWinsMerger
456- (*Callback)(TDiskBlob (data, v, GType, key.LogoBlobID ()));
457- } else {
458- Y_ABORT_UNLESS (memRec.GetType () == TBlobType::HugeBlob && v.CountBits () == 1u );
459- TDiskDataExtractor extr;
460- memRec.GetDiskData (&extr, nullptr );
461- // deduplicate huge blob
462- LastWriteWinsMerger.Add (extr.SwearOne (), v, lsn);
350+ if (const NMatrix::TVectorType local = memRec.GetLocalParts (GType); !local.Empty ()) {
351+ TDiskDataExtractor extr;
352+ static TRope rope;
353+ switch (memRec.GetType ()) {
354+ case TBlobType::MemBlob:
355+ // we have in-memory data in a rope, it always wins among other data,
356+ // so we call Callback immediately and remove any data for this local part
357+ // from LastWriteWinsMerger
358+ Y_ABORT_UNLESS (data);
359+ (*Callback)(TDiskBlob (data, local, GType, key.LogoBlobID ()));
360+ break ;
361+
362+ case TBlobType::DiskBlob:
363+ (*Callback)(TDiskBlob (&rope, local, GType, key.LogoBlobID ())); // pure metadata parts only
364+ break ;
365+
366+ case TBlobType::HugeBlob:
367+ Y_ABORT_UNLESS (local.CountBits () == 1 );
368+ memRec.GetDiskData (&extr, nullptr );
369+ LastWriteWinsMerger.Add (extr.SwearOne (), local, lsn);
370+ break ;
371+
372+ case TBlobType::ManyHugeBlobs:
373+ Y_ABORT (" unexpected case" );
463374 }
464375 }
465376 }
0 commit comments