11#ifndef KIKIMR_DISABLE_S3_OPS
22
3- #include " export_s3_buffer_raw .h"
3+ #include " export_s3_buffer .h"
44#include " type_serialization.h"
55
6+ #include < ydb/core/backup/common/checksum.h>
67#include < ydb/core/tablet_flat/flat_row_state.h>
78#include < yql/essentials/types/binary_json/read.h>
89#include < ydb/public/lib/scheme_types/scheme_type_id.h>
910
1011#include < library/cpp/string_utils/quote/quote.h>
1112
1213#include < util/datetime/base.h>
14+ #include < util/generic/buffer.h>
1315#include < util/stream/buffer.h>
1416
15- namespace NKikimr {
16- namespace NDataShard {
17+ #include < contrib/libs/zstd/include/zstd.h>
1718
18- TS3BufferRaw::TS3BufferRaw (const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums)
19- : Columns(columns)
20- , RowsLimit(rowsLimit)
21- , BytesLimit(bytesLimit)
22- , Rows(0 )
23- , BytesRead(0 )
24- , Checksum(enableChecksums ? NBackup::CreateChecksum() : nullptr )
19+
20+ namespace NKikimr ::NDataShard {
21+
22+ namespace {
23+
24+ struct DestroyZCtx {
25+ static void Destroy (::ZSTD_CCtx* p) noexcept {
26+ ZSTD_freeCCtx (p);
27+ }
28+ };
29+
30+ class TZStdCompressionProcessor {
31+ public:
32+ using TPtr = THolder<TZStdCompressionProcessor>;
33+
34+ explicit TZStdCompressionProcessor (const TS3ExportBufferSettings::TCompressionSettings& settings);
35+
36+ TString GetError () const {
37+ return ZSTD_getErrorName (ErrorCode);
38+ }
39+
40+ bool AddData (TStringBuf data);
41+
42+ TMaybe<TBuffer> Flush (bool prepare);
43+
44+ private:
45+ enum ECompressionResult {
46+ CONTINUE,
47+ DONE,
48+ ERROR,
49+ };
50+
51+ ECompressionResult Compress (ZSTD_inBuffer* input, ZSTD_EndDirective endOp);
52+ void Reset ();
53+
54+ private:
55+ const int CompressionLevel;
56+ THolder<::ZSTD_CCtx, DestroyZCtx> Context;
57+ size_t ErrorCode = 0 ;
58+ TBuffer Buffer;
59+ ui64 BytesAdded = 0 ;
60+ };
61+
62+ class TS3Buffer : public NExportScan ::IBuffer {
63+ using TTagToColumn = IExport::TTableColumns;
64+ using TTagToIndex = THashMap<ui32, ui32>; // index in IScan::TRow
65+
66+ public:
67+ explicit TS3Buffer (TS3ExportBufferSettings&& settings);
68+
69+ void ColumnsOrder (const TVector<ui32>& tags) override ;
70+ bool Collect (const NTable::IScan::TRow& row) override ;
71+ IEventBase* PrepareEvent (bool last, NExportScan::IBuffer::TStats& stats) override ;
72+ void Clear () override ;
73+ bool IsFilled () const override ;
74+ TString GetError () const override ;
75+
76+ private:
77+ inline ui64 GetRowsLimit () const { return RowsLimit; }
78+ inline ui64 GetBytesLimit () const { return MaxBytes; }
79+
80+ bool Collect (const NTable::IScan::TRow& row, IOutputStream& out);
81+ virtual TMaybe<TBuffer> Flush (bool prepare);
82+
83+ static NBackup::IChecksum* CreateChecksum (const TMaybe<TS3ExportBufferSettings::TChecksumSettings>& settings);
84+ static TZStdCompressionProcessor* CreateCompression (const TMaybe<TS3ExportBufferSettings::TCompressionSettings>& settings);
85+
86+ private:
87+ const TTagToColumn Columns;
88+ const ui64 RowsLimit;
89+ const ui64 MinBytes;
90+ const ui64 MaxBytes;
91+
92+ TTagToIndex Indices;
93+
94+ protected:
95+ ui64 Rows = 0 ;
96+ ui64 BytesRead = 0 ;
97+ TBuffer Buffer;
98+
99+ NBackup::IChecksum::TPtr Checksum;
100+ TZStdCompressionProcessor::TPtr Compression;
101+
102+ TString ErrorString;
103+ }; // TS3Buffer
104+
105+ TS3Buffer::TS3Buffer (TS3ExportBufferSettings&& settings)
106+ : Columns(std::move(settings.Columns))
107+ , RowsLimit(settings.MaxRows)
108+ , MinBytes(settings.MinBytes)
109+ , MaxBytes(settings.MaxBytes)
110+ , Checksum(CreateChecksum(settings.ChecksumSettings))
111+ , Compression(CreateCompression(settings.CompressionSettings))
25112{
26113}
27114
28- void TS3BufferRaw::ColumnsOrder (const TVector<ui32>& tags) {
115+ NBackup::IChecksum* TS3Buffer::CreateChecksum (const TMaybe<TS3ExportBufferSettings::TChecksumSettings>& settings) {
116+ if (settings) {
117+ switch (settings->ChecksumType ) {
118+ case TS3ExportBufferSettings::TChecksumSettings::EChecksumType::Sha256:
119+ return NBackup::CreateChecksum ();
120+ }
121+ }
122+ return nullptr ;
123+ }
124+
125+ TZStdCompressionProcessor* TS3Buffer::CreateCompression (const TMaybe<TS3ExportBufferSettings::TCompressionSettings>& settings) {
126+ if (settings) {
127+ switch (settings->Algorithm ) {
128+ case TS3ExportBufferSettings::TCompressionSettings::EAlgorithm::Zstd:
129+ return new TZStdCompressionProcessor (*settings);
130+ }
131+ }
132+ return nullptr ;
133+ }
134+
135+ void TS3Buffer::ColumnsOrder (const TVector<ui32>& tags) {
29136 Y_ABORT_UNLESS (tags.size () == Columns.size ());
30137
31138 Indices.clear ();
@@ -37,7 +144,7 @@ void TS3BufferRaw::ColumnsOrder(const TVector<ui32>& tags) {
37144 }
38145}
39146
40- bool TS3BufferRaw ::Collect (const NTable::IScan::TRow& row, IOutputStream& out) {
147+ bool TS3Buffer ::Collect (const NTable::IScan::TRow& row, IOutputStream& out) {
41148 bool needsComma = false ;
42149 for (const auto & [tag, column] : Columns) {
43150 auto it = Indices.find (tag);
@@ -152,7 +259,7 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
152259 return true ;
153260}
154261
155- bool TS3BufferRaw ::Collect (const NTable::IScan::TRow& row) {
262+ bool TS3Buffer ::Collect (const NTable::IScan::TRow& row) {
156263 TBufferOutput out (Buffer);
157264 ErrorString.clear ();
158265
@@ -161,14 +268,24 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row) {
161268 return false ;
162269 }
163270
271+ TStringBuf data (Buffer.Data (), Buffer.Size ());
272+ data = data.Tail (beforeSize);
273+
274+ // Apply checksum
164275 if (Checksum) {
165- TStringBuf data (Buffer.Data (), Buffer.Size ());
166- Checksum->AddData (data.Tail (beforeSize));
276+ Checksum->AddData (data);
277+ }
278+
279+ // Compress
280+ if (Compression && !Compression->AddData (data)) {
281+ ErrorString = Compression->GetError ();
282+ return false ;
167283 }
284+
168285 return true ;
169286}
170287
171- IEventBase* TS3BufferRaw ::PrepareEvent (bool last, NExportScan::IBuffer::TStats& stats) {
288+ IEventBase* TS3Buffer ::PrepareEvent (bool last, NExportScan::IBuffer::TStats& stats) {
172289 stats.Rows = Rows;
173290 stats.BytesRead = BytesRead;
174291
@@ -186,31 +303,108 @@ IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats&
186303 }
187304}
188305
189- void TS3BufferRaw ::Clear () {
306+ void TS3Buffer ::Clear () {
190307 Y_ABORT_UNLESS (Flush (false ));
191308}
192309
193- bool TS3BufferRaw::IsFilled () const {
310+ bool TS3Buffer::IsFilled () const {
311+ if (Buffer.Size () < MinBytes) {
312+ return false ;
313+ }
314+
194315 return Rows >= GetRowsLimit () || Buffer.Size () >= GetBytesLimit ();
195316}
196317
197- TString TS3BufferRaw ::GetError () const {
318+ TString TS3Buffer ::GetError () const {
198319 return ErrorString;
199320}
200321
201- TMaybe<TBuffer> TS3BufferRaw ::Flush (bool ) {
322+ TMaybe<TBuffer> TS3Buffer ::Flush (bool prepare ) {
202323 Rows = 0 ;
203324 BytesRead = 0 ;
325+
326+ // Compression finishes compression frame during Flush
327+ // so that last table row borders equal to compression frame borders.
328+ // This full finished block must then be encrypted so that encryption frame
329+ // has the same borders.
330+ // It allows to import data in batches and save its state during import.
331+
332+ if (Compression) {
333+ TMaybe<TBuffer> compressedBuffer = Compression->Flush (prepare);
334+ if (!compressedBuffer) {
335+ return Nothing ();
336+ }
337+
338+ Buffer = std::move (*compressedBuffer);
339+ }
340+
204341 return std::exchange (Buffer, TBuffer ());
205342}
206343
207- NExportScan::IBuffer* CreateS3ExportBufferRaw (
208- const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums)
344+ TZStdCompressionProcessor::TZStdCompressionProcessor (const TS3ExportBufferSettings::TCompressionSettings& settings)
345+ : CompressionLevel(settings.CompressionLevel)
346+ , Context(ZSTD_createCCtx())
209347{
210- return new TS3BufferRaw (columns, rowsLimit, bytesLimit, enableChecksums);
211348}
212349
213- } // NDataShard
214- } // NKikimr
350+ bool TZStdCompressionProcessor::AddData (TStringBuf data) {
351+ BytesAdded += data.size ();
352+ auto input = ZSTD_inBuffer{data.data (), data.size (), 0 };
353+ while (input.pos < input.size ) {
354+ if (ERROR == Compress (&input, ZSTD_e_continue)) {
355+ return false ;
356+ }
357+ }
358+
359+ return true ;
360+ }
361+
362+ TMaybe<TBuffer> TZStdCompressionProcessor::Flush (bool prepare) {
363+ if (prepare && BytesAdded) {
364+ ECompressionResult res;
365+ auto input = ZSTD_inBuffer{NULL , 0 , 0 };
366+
367+ do {
368+ if (res = Compress (&input, ZSTD_e_end); res == ERROR) {
369+ return Nothing ();
370+ }
371+ } while (res != DONE);
372+ }
373+
374+ Reset ();
375+ return std::exchange (Buffer, TBuffer ());
376+ }
377+
378+ TZStdCompressionProcessor::ECompressionResult TZStdCompressionProcessor::Compress (ZSTD_inBuffer* input, ZSTD_EndDirective endOp) {
379+ auto output = ZSTD_outBuffer{Buffer.Data (), Buffer.Capacity (), Buffer.Size ()};
380+ auto res = ZSTD_compressStream2 (Context.Get (), &output, input, endOp);
381+
382+ if (ZSTD_isError (res)) {
383+ ErrorCode = res;
384+ return ERROR;
385+ }
386+
387+ if (res > 0 ) {
388+ Buffer.Reserve (output.pos + res);
389+ }
390+
391+ Buffer.Proceed (output.pos );
392+ return res ? CONTINUE : DONE;
393+ }
394+
395+ void TZStdCompressionProcessor::Reset () {
396+ BytesAdded = 0 ;
397+ ZSTD_CCtx_reset (Context.Get (), ZSTD_reset_session_only);
398+ ZSTD_CCtx_refCDict (Context.Get (), NULL );
399+ ZSTD_CCtx_setParameter (Context.Get (), ZSTD_c_compressionLevel, CompressionLevel);
400+ }
401+
402+ } // anonymous
403+
404+ NExportScan::IBuffer* CreateS3ExportBuffer (TS3ExportBufferSettings&& settings) {
405+ return new TS3Buffer (std::move (settings));
406+ }
407+
408+ } // namespace NKikimr::NDataShard
215409
216410#endif // KIKIMR_DISABLE_S3_OPS
0 commit comments