@@ -108,14 +108,12 @@ namespace NKikimr {
108108 };
109109
110110
111- // //////////////////////////////////////////////////////////////////////////
112- // TCompactRecordMergerBase
113111 // //////////////////////////////////////////////////////////////////////////
114112 // Valid call sequence:
115113 // Clear(); Add(); ... Add(); Finish()
116114 // GetMemRec(); GetData();
117115 template <class TKey , class TMemRec >
118- class TCompactRecordMergerBase : public TRecordMergerBase <TKey, TMemRec> {
116+ class TCompactRecordMerger : public TRecordMergerBase <TKey, TMemRec> {
119117 protected:
120118 using TBase = TRecordMergerBase<TKey, TMemRec>;
121119 using TBase::MemRec;
@@ -132,18 +130,16 @@ namespace NKikimr {
132130 };
133131
134132 public:
135- TCompactRecordMergerBase (const TBlobStorageGroupType >ype, bool addHeader)
133+ TCompactRecordMerger (const TBlobStorageGroupType >ype, bool addHeader)
136134 : TBase(gtype, true )
137- , MemRecs()
138- , ProducingSmallBlob(false )
139- , NeedToLoadData(ELoadData::NotSet)
140135 , AddHeader(addHeader)
141136 {}
142137
143138 void Clear () {
144139 TBase::Clear ();
145140 MemRecs.clear ();
146141 ProducingSmallBlob = false ;
142+ ProducingHugeBlob = false ;
147143 NeedToLoadData = ELoadData::NotSet;
148144 DataMerger.Clear ();
149145 }
@@ -156,51 +152,47 @@ namespace NKikimr {
156152 }
157153
158154 void AddFromSegment (const TMemRec &memRec, const TDiskPart *outbound, const TKey &key, ui64 circaLsn) {
159- Y_DEBUG_ABORT_UNLESS (NeedToLoadData != ELoadData::NotSet);
160- AddBasic (memRec, key);
161- switch (memRec.GetType ()) {
162- case TBlobType::DiskBlob: {
163- if (memRec.HasData () && NeedToLoadData == ELoadData::LoadData) {
164- MemRecs.push_back (memRec);
165- ProducingSmallBlob = true ;
166- }
167- break ;
168- }
169- case TBlobType::HugeBlob:
170- case TBlobType::ManyHugeBlobs: {
171- TDiskDataExtractor extr;
172- memRec.GetDiskData (&extr, outbound);
173- const NMatrix::TVectorType v = memRec.GetLocalParts (GType);
174- DataMerger.AddHugeBlob (extr.Begin , extr.End , v, circaLsn);
175- break ;
176- }
177- default :
178- Y_ABORT (" Impossible case" );
179- }
180- VerifyConsistency (memRec, outbound);
155+ Add (memRec, nullptr , outbound, key, circaLsn);
181156 }
182157
183158 void AddFromFresh (const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
159+ Add (memRec, data, nullptr , key, lsn);
160+ }
161+
162+ void Add (const TMemRec& memRec, const TRope *data, const TDiskPart *outbound, const TKey& key, ui64 lsn) {
184163 Y_DEBUG_ABORT_UNLESS (NeedToLoadData != ELoadData::NotSet);
185164 AddBasic (memRec, key);
186- if (memRec.HasData ()) {
187- if (data) {
188- Y_ABORT_UNLESS (memRec.GetType () == TBlobType::MemBlob || memRec.GetType () == TBlobType::DiskBlob);
189- if (NeedToLoadData == ELoadData::LoadData) {
190- DataMerger.AddBlob (TDiskBlob (data, memRec.GetLocalParts (GType), GType, key.LogoBlobID ()));
191- ProducingSmallBlob = true ;
192- } else {
193- // intentionally do nothing: don't add any data to DataMerger, because we don't need it
194- }
195- } else {
196- Y_ABORT_UNLESS (memRec.GetType () == TBlobType::HugeBlob);
197- TDiskDataExtractor extr;
198- memRec.GetDiskData (&extr, nullptr );
199- const NMatrix::TVectorType v = memRec.GetLocalParts (GType);
200- DataMerger.AddHugeBlob (extr.Begin , extr.End , v, lsn);
165+ if (const NMatrix::TVectorType local = memRec.GetLocalParts (GType); !local.Empty ()) {
166+ TDiskDataExtractor extr;
167+ switch (memRec.GetType ()) {
168+ case TBlobType::MemBlob:
169+ case TBlobType::DiskBlob:
170+ if (NeedToLoadData == ELoadData::LoadData) {
171+ if (data) {
172+ // we have some data in-memory
173+ DataMerger.AddBlob (TDiskBlob (data, local, GType, key.LogoBlobID ()));
174+ }
175+ if (memRec.HasData () && memRec.GetType () == TBlobType::DiskBlob) {
176+ // there is something to read from the disk
177+ MemRecs.push_back (memRec);
178+ }
179+ Y_DEBUG_ABORT_UNLESS (!ProducingHugeBlob);
180+ ProducingSmallBlob = true ;
181+ }
182+ break ;
183+
184+ case TBlobType::ManyHugeBlobs:
185+ Y_ABORT_UNLESS (outbound);
186+ [[fallthrough]];
187+ case TBlobType::HugeBlob:
188+ memRec.GetDiskData (&extr, outbound);
189+ DataMerger.AddHugeBlob (extr.Begin , extr.End , local, lsn);
190+ Y_DEBUG_ABORT_UNLESS (!ProducingSmallBlob);
191+ ProducingHugeBlob = true ;
192+ break ;
201193 }
202194 }
203- VerifyConsistency (memRec, nullptr );
195+ VerifyConsistency (memRec, outbound );
204196 }
205197
206198 void VerifyConsistency (const TMemRec& memRec, const TDiskPart *outbound) {
@@ -239,6 +231,13 @@ namespace NKikimr {
239231 }
240232
241233 void Finish () {
234+ if (NeedToLoadData == ELoadData::DontLoadData) {
235+ Y_ABORT_UNLESS (!DataMerger.HasSmallBlobs ()); // we didn't put any small blob to the data merger
236+ // if we have huge blobs for the record, than we set TBlobType::HugeBlob or
237+ // TBlobType::ManyHugeBlobs a few lines below
238+ MemRec.SetNoBlob ();
239+ }
240+
242241 Y_DEBUG_ABORT_UNLESS (!Empty ());
243242 VerifyConsistency ();
244243
@@ -263,118 +262,22 @@ namespace NKikimr {
263262 return &DataMerger;
264263 }
265264
266- protected:
267- TStackVec<TMemRec, 16 > MemRecs;
268- bool ProducingSmallBlob;
269- ELoadData NeedToLoadData;
270- TDataMerger DataMerger;
271- const bool AddHeader;
272- };
273-
274- // //////////////////////////////////////////////////////////////////////////
275- // TCompactRecordMergerIndexPass
276- // //////////////////////////////////////////////////////////////////////////
277- template <typename TKey, typename TMemRec>
278- class TCompactRecordMergerIndexPass : public TCompactRecordMergerBase <TKey, TMemRec> {
279- using TBase = TCompactRecordMergerBase<TKey, TMemRec>;
280-
281- using ELoadData = typename TBase::ELoadData;
282-
283- using TBase::MemRecs;
284- using TBase::ProducingSmallBlob;
285- using TBase::NeedToLoadData;
286- using TBase::DataMerger;
287- using TBase::MemRec;
288-
289- public:
290- TCompactRecordMergerIndexPass (const TBlobStorageGroupType >ype, bool addHeader)
291- : TBase(gtype, addHeader)
292- {}
293-
294- void Finish () {
295- if (NeedToLoadData == ELoadData::DontLoadData) {
296- Y_ABORT_UNLESS (!DataMerger.HasSmallBlobs ()); // we didn't put any small blob to the data merger
297- // if we have huge blobs for the record, than we set TBlobType::HugeBlob or
298- // TBlobType::ManyHugeBlobs a few lines below
299- MemRec.SetNoBlob ();
300- }
301-
302- TBase::Finish ();
303- }
304-
305265 template <typename TCallback>
306266 void ForEachSmallDiskBlob (TCallback&& callback) {
307267 for (const auto & memRec : MemRecs) {
308268 callback (memRec);
309269 }
310270 }
311- };
312-
313- // //////////////////////////////////////////////////////////////////////////
314- // TCompactRecordMergerDataPass
315- // //////////////////////////////////////////////////////////////////////////
316- template <typename TKey, typename TMemRec>
317- class TCompactRecordMergerDataPass : public TCompactRecordMergerBase <TKey, TMemRec> {
318- using TBase = TCompactRecordMergerBase<TKey, TMemRec>;
319-
320- using TBase::ProducingSmallBlob;
321- using TBase::MemRecs;
322- using TBase::MemRec;
323- using TBase::DataMerger;
324- using TBase::GType;
325- using TBase::SetLoadDataMode;
326-
327- public:
328- TCompactRecordMergerDataPass (const TBlobStorageGroupType >ype, bool addHeader)
329- : TBase(gtype, addHeader)
330- {
331- SetLoadDataMode (true );
332- }
333271
334- void Clear () {
335- TBase::Clear ();
336- ReadSmallBlobs.clear ();
337- SetLoadDataMode (true );
338- }
339-
340- // add read small blob content; they should come in order as returned from GetSmallBlobDiskParts by index merger
341- void AddReadSmallBlob (TString data) {
342- Y_ABORT_UNLESS (ProducingSmallBlob);
343- ReadSmallBlobs.push_back (std::move (data));
344- }
345-
346- void Finish () {
347- // ensure we are producing small blobs; otherwise this merger should never be created
348- Y_ABORT_UNLESS (ProducingSmallBlob);
349-
350- // add all read small blobs into blob merger
351- const size_t count = ReadSmallBlobs.size ();
352- Y_ABORT_UNLESS (count == +MemRecs, " count# %zu +MemRecs# %zu" , count, +MemRecs);
353- for (size_t i = 0 ; i < count; ++i) {
354- const TMemRec& memRec = MemRecs[i]->GetMemRec ();
355- const TString& buffer = ReadSmallBlobs[i];
356- Y_ABORT_UNLESS (buffer.size () == memRec.DataSize ());
357- DataMerger.AddBlob (TDiskBlob (buffer.data (), buffer.size (), memRec.GetLocalParts (GType)));
358- }
359-
360-
361- // ensure that data merger has small blob
362- Y_ABORT_UNLESS (DataMerger.HasSmallBlobs ());
363-
364- // finalize base class logic; it also generates blob record
365- TBase::Finish ();
366-
367- // ensure that we have generated correct DiskBlob with full set of declared parts
368- const TDiskBlob& blob = DataMerger.GetDiskBlobMerger ().GetDiskBlob ();
369- Y_ABORT_UNLESS (blob.GetParts () == MemRec.GetLocalParts (GType));
370- Y_ABORT_UNLESS (MemRec.GetType () == TBlobType::DiskBlob);
371- }
372-
373- private:
374- TVector<TString> ReadSmallBlobs;
272+ protected:
273+ TStackVec<TMemRec, 16 > MemRecs;
274+ bool ProducingSmallBlob = false ;
275+ bool ProducingHugeBlob = false ;
276+ ELoadData NeedToLoadData = ELoadData::NotSet;
277+ TDataMerger DataMerger;
278+ const bool AddHeader;
375279 };
376280
377-
378281 // //////////////////////////////////////////////////////////////////////////
379282 // TRecordMergerCallback
380283 // //////////////////////////////////////////////////////////////////////////
@@ -412,9 +315,8 @@ namespace NKikimr {
412315 case 1 : {
413316 if (memRec.GetType () == TBlobType::DiskBlob) {
414317 // don't deduplicate inplaced data
415- const TDiskPart &data = extr.SwearOne ();
416- if (data.ChunkIdx && data.Size ) {
417- (*Callback)(data, v);
318+ if (!v.Empty ()) {
319+ (*Callback)(extr.SwearOne (), v);
418320 }
419321 } else if (memRec.GetType () == TBlobType::HugeBlob) {
420322 Y_ABORT_UNLESS (v.CountBits () == 1u );
@@ -446,20 +348,31 @@ namespace NKikimr {
446348
447349 void AddFromFresh (const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
448350 AddBasic (memRec, key);
449- if (memRec.HasData ()) {
450- const NMatrix::TVectorType v = memRec.GetLocalParts (GType);
451- if (data) {
452- Y_ABORT_UNLESS (memRec.GetType () == TBlobType::MemBlob);
453- // we have in-memory data in a rope, it always wins among other data,
454- // so we call Callback immediately and remove any data for this local part
455- // from LastWriteWinsMerger
456- (*Callback)(TDiskBlob (data, v, GType, key.LogoBlobID ()));
457- } else {
458- Y_ABORT_UNLESS (memRec.GetType () == TBlobType::HugeBlob && v.CountBits () == 1u );
459- TDiskDataExtractor extr;
460- memRec.GetDiskData (&extr, nullptr );
461- // deduplicate huge blob
462- LastWriteWinsMerger.Add (extr.SwearOne (), v, lsn);
351+ if (const NMatrix::TVectorType local = memRec.GetLocalParts (GType); !local.Empty ()) {
352+ TDiskDataExtractor extr;
353+ static TRope rope;
354+ switch (memRec.GetType ()) {
355+ case TBlobType::MemBlob:
356+ // we have in-memory data in a rope, it always wins among other data,
357+ // so we call Callback immediately and remove any data for this local part
358+ // from LastWriteWinsMerger
359+ Y_ABORT_UNLESS (data); // HaveToMergeData is true, so data must be present
360+ (*Callback)(TDiskBlob (data, local, GType, key.LogoBlobID ()));
361+ break ;
362+
363+ case TBlobType::DiskBlob:
364+ Y_ABORT_UNLESS (!memRec.HasData ());
365+ (*Callback)(TDiskBlob (&rope, local, GType, key.LogoBlobID ())); // pure metadata parts only
366+ break ;
367+
368+ case TBlobType::HugeBlob:
369+ Y_ABORT_UNLESS (local.CountBits () == 1 );
370+ memRec.GetDiskData (&extr, nullptr );
371+ LastWriteWinsMerger.Add (extr.SwearOne (), local, lsn);
372+ break ;
373+
374+ case TBlobType::ManyHugeBlobs:
375+ Y_ABORT (" unexpected case" );
463376 }
464377 }
465378 }
0 commit comments