Skip to content

Commit e5dd5c1

Browse files
authored
Fixed processing kafka batch v0 with many messages (#24272)
1 parent ad8a5ad commit e5dd5c1

File tree

8 files changed

+129
-36
lines changed

8 files changed

+129
-36
lines changed

ydb/core/kafka_proxy/kafka.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,10 @@ class TKafkaReadable {
412412

413413
void skip(size_t length);
414414

415+
size_t left() const;
416+
417+
size_t position() const;
418+
415419
private:
416420
void checkEof(size_t length);
417421

ydb/core/kafka_proxy/kafka_connection.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,12 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
804804

805805
case MESSAGE_PROCESS:
806806
Request->StartTime = TInstant::Now();
807-
KAFKA_LOG_D("received message. ApiKey=" << Request->ApiKey << ", Version=" << Request->ApiVersion << ", CorrelationId=" << Request->CorrelationId);
807+
if constexpr (DEBUG_ENABLED) {
808+
KAFKA_LOG_D("received message. ApiKey=" << Request->ApiKey << ", Version=" << Request->ApiVersion << ", CorrelationId=" << Request->CorrelationId
809+
<< ", Data=" << Hex(Request->Buffer->Begin(), Request->Buffer->End()));
810+
} else {
811+
KAFKA_LOG_D("received message. ApiKey=" << Request->ApiKey << ", Version=" << Request->ApiVersion << ", CorrelationId=" << Request->CorrelationId);
812+
}
808813

809814
TKafkaReadable readable(*Request->Buffer);
810815

ydb/core/kafka_proxy/kafka_log.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55

66
namespace NKafka {
77

8+
static constexpr bool DEBUG_ENABLED = false;
9+
10+
TString Hex(const char* begin, const char* end);
11+
812
inline TString LogPrefix() { return {}; }
913

1014
}

ydb/core/kafka_proxy/kafka_messages_int.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,34 @@ char TKafkaReadable::take(size_t shift) {
6363
return *(Is.Data() + Position + shift);
6464
}
6565

66+
size_t TKafkaReadable::left() const {
67+
return Is.Size() - Position;
68+
}
69+
70+
size_t TKafkaReadable::position() const {
71+
return Position;
72+
}
73+
6674
void TKafkaReadable::checkEof(size_t length) {
6775
if (Position + length > Is.Size()) {
6876
ythrow yexception() << "unexpected end of stream";
6977
}
7078
}
7179

80+
char Hex(const unsigned char c) {
81+
return c < 10 ? '0' + c : 'A' + c - 10;
82+
}
83+
84+
TString Hex(const char* begin, const char *end) {
85+
TStringBuilder sb;
86+
for(auto i = begin; i < end; ++i) {
87+
unsigned char c = *i;
88+
if (i != begin) {
89+
sb << ", ";
90+
}
91+
sb << "0x" << Hex(c >> 4) << Hex(c & 0x0F);
92+
}
93+
return sb;
94+
}
95+
7296
} // namespace NKafka

ydb/core/kafka_proxy/kafka_messages_int.h

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
namespace NKafka {
1818
namespace NPrivate {
1919

20-
static constexpr bool DEBUG_ENABLED = false;
21-
2220
struct TWriteCollector {
2321
ui32 NumTaggedFields = 0;
2422
};
@@ -191,7 +189,11 @@ class TypeStrategy {
191189

192190
inline static void DoLog(const TValueType& value) {
193191
if constexpr (DEBUG_ENABLED) {
194-
Cerr << "Was read field '" << Meta::Name << "' value " << value << Endl;
192+
if constexpr (sizeof(TValueType) == 1) {
193+
Cerr << "Was read field '" << Meta::Name << "' value " << (size_t)value << Endl;
194+
} else {
195+
Cerr << "Was read field '" << Meta::Name << "' value " << value << Endl;
196+
}
195197
}
196198
}
197199
};
@@ -483,11 +485,12 @@ class TypeStrategy<Meta, TKafkaRecords, TKafkaRecordsDesc> {
483485
value.emplace();
484486

485487
if (magic < CURRENT_RECORD_VERSION) {
488+
size_t end = readable.position() + length;
489+
486490
TKafkaRecordBatchV0 v0;
487491
v0.Read(readable, magic);
488492

489-
value->Magic = v0.Record.Magic;
490-
value->Crc = v0.Record.Crc;
493+
value->Magic = 2;
491494
value->Attributes = v0.Record.Attributes & 0x07;
492495

493496
value->Records.resize(1);
@@ -497,6 +500,21 @@ class TypeStrategy<Meta, TKafkaRecords, TKafkaRecordsDesc> {
497500
record.TimestampDelta = v0.Record.Timestamp;
498501
record.Key = v0.Record.Key;
499502
record.Value = v0.Record.Value;
503+
504+
while(readable.position() < end) {
505+
magic = readable.take(16);
506+
507+
v0 = {};
508+
v0.Read(readable, magic);
509+
510+
value->Records.resize(value->Records.size() + 1);
511+
auto& record = value->Records.back();
512+
record.Length = v0.Record.MessageSize;
513+
record.OffsetDelta = v0.Offset;
514+
record.TimestampDelta = v0.Record.Timestamp;
515+
record.Key = v0.Record.Key;
516+
record.Value = v0.Record.Value;
517+
}
500518
} else {
501519
(*value).Read(readable, magic);
502520
}
@@ -633,16 +651,16 @@ inline void Size(TSizeCollector& collector, TKafkaInt16 version, const typename
633651
++collector.NumTaggedFields;
634652

635653
i64 size = TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value);
636-
collector.Size += size + SizeOfUnsignedVarint(Meta::Tag) + SizeOfUnsignedVarint(size);
654+
collector.Size += size + SizeOfUnsignedVarint(Meta::Tag) + SizeOfUnsignedVarint(size);
637655
if constexpr (DEBUG_ENABLED) {
638-
Cerr << "Size of field '" << Meta::Name << "' " << size << " + " << SizeOfUnsignedVarint(Meta::Tag) << " + " << SizeOfUnsignedVarint(size) << Endl;
656+
Cerr << "Size of field '" << Meta::Name << "' " << size << " + " << SizeOfUnsignedVarint(Meta::Tag) << " + " << SizeOfUnsignedVarint(size) << Endl;
639657
}
640658
}
641659
} else {
642660
i64 size = TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value);
643661
collector.Size += size;
644662
if constexpr (DEBUG_ENABLED) {
645-
Cerr << "Size of field '" << Meta::Name << "' " << size << Endl;
663+
Cerr << "Size of field '" << Meta::Name << "' " << size << Endl;
646664
}
647665
}
648666
}
@@ -651,7 +669,7 @@ inline void Size(TSizeCollector& collector, TKafkaInt16 version, const typename
651669
i64 size = TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value);
652670
collector.Size += size;
653671
if constexpr (DEBUG_ENABLED) {
654-
Cerr << "Size of field '" << Meta::Name << "' " << size << Endl;
672+
Cerr << "Size of field '" << Meta::Name << "' " << size << Endl;
655673
}
656674
}
657675
}

ydb/core/kafka_proxy/ut/kafka_test_client.cpp

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -808,19 +808,7 @@ TMessagePtr<T> TKafkaTestClient::Read(TSocketInput& si, TRequestHeaderData* requ
808808
}
809809

810810
void TKafkaTestClient::Print(const TBuffer& buffer) {
811-
TStringBuilder sb;
812-
for (size_t i = 0; i < buffer.Size(); ++i) {
813-
char c = buffer.Data()[i];
814-
if (i > 0) {
815-
sb << ", ";
816-
}
817-
sb << "0x" << Hex0((c & 0xF0) >> 4) << Hex0(c & 0x0F);
818-
}
819-
Cerr << ">>>>> Packet sent: " << sb << Endl;
820-
}
821-
822-
char TKafkaTestClient::Hex0(const unsigned char c) {
823-
return c < 10 ? '0' + c : 'A' + c - 10;
811+
Cerr << ">>>>> Packet sent: " << Hex(buffer.Begin(), buffer.End()) << Endl;
824812
}
825813

826814
void TKafkaTestClient::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics) {

ydb/core/kafka_proxy/ut/kafka_test_client.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ class TKafkaTestClient {
150150
template <std::derived_from<TApiMessage> T>
151151
TMessagePtr<T> Read(TSocketInput& si, TRequestHeaderData* requestHeader);
152152
void Print(const TBuffer& buffer);
153-
char Hex0(const unsigned char c);
154153
void FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics);
155154

156155
private:

ydb/core/kafka_proxy/ut/ut_serialization.cpp

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -776,8 +776,8 @@ Y_UNIT_TEST(ProduceRequestData_Record_v0) {
776776
UNIT_ASSERT_EQUAL(r0.BaseOffset, 0);
777777
UNIT_ASSERT_EQUAL(r0.BatchLength, 0);
778778
UNIT_ASSERT_EQUAL(r0.PartitionLeaderEpoch, -1);
779-
UNIT_ASSERT_EQUAL(r0.Magic, 0);
780-
UNIT_ASSERT_EQUAL(r0.Crc, 544167206);
779+
UNIT_ASSERT_EQUAL(r0.Magic, 2);
780+
UNIT_ASSERT_EQUAL(r0.Crc, 0);
781781
UNIT_ASSERT_EQUAL(r0.Attributes, 0);
782782
UNIT_ASSERT_EQUAL(r0.LastOffsetDelta, 0);
783783
UNIT_ASSERT_EQUAL(r0.BaseTimestamp, -1);
@@ -793,19 +793,70 @@ Y_UNIT_TEST(ProduceRequestData_Record_v0) {
793793
UNIT_ASSERT_EQUAL(r0.Records[0].Headers.size(), (size_t)0);
794794
}
795795

796-
char Hex(const unsigned char c) {
797-
return c < 10 ? '0' + c : 'A' + c - 10;
796+
//
797+
798+
Y_UNIT_TEST(ProduceRequestData_Record_v0_manyMessages) {
799+
ui8 reference[] = { 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x05, 0x00, 0x07, 0x72, 0x64,
800+
0x6B, 0x61, 0x66, 0x6B, 0x61, 0x00, 0x00, 0xFF, 0xFF, 0x00, 0x00, 0x75, 0x30, 0x02,
801+
0x05, 0x61, 0x61, 0x61, 0x61, 0x02, 0x00, 0x00, 0x00, 0x00, 0x7D, 0x00, 0x00, 0x00, 0x00, 0x00,
802+
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x13, 0x5F, 0x1B, 0x4F, 0x8D, 0x00, 0x00, 0xFF, 0xFF, 0xFF,
803+
0xFF, 0x00, 0x00, 0x00, 0x05, 0x61, 0x61, 0x61, 0x61, 0x61, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
804+
0x00, 0x01, 0x00, 0x00, 0x00, 0x13, 0xBA, 0x6C, 0x26, 0x93, 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF,
805+
0x00, 0x00, 0x00, 0x05, 0x62, 0x62, 0x62, 0x62, 0x62, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
806+
0x02, 0x00, 0x00, 0x00, 0x13, 0x50, 0x6E, 0x03, 0xA6, 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x00,
807+
0x00, 0x00, 0x05, 0x63, 0x63, 0x63, 0x63, 0x63, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03,
808+
0x00, 0x00, 0x00, 0x13, 0xAB, 0xF3, 0xF2, 0xEE, 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00,
809+
0x00, 0x05, 0x64, 0x64, 0x64, 0x64, 0x64, 0x00, 0x00, 0x00};
810+
811+
TBuffer buffer((char*)reference, sizeof(reference));
812+
TKafkaReadable readable(buffer);
813+
814+
Cerr << ">>>>> Buffer size: " << buffer.Size() << Endl;
815+
816+
TRequestHeaderData header;
817+
header.Read(readable, 2);
818+
819+
TProduceRequestData result;
820+
result.Read(readable, header.RequestApiVersion);
821+
822+
UNIT_ASSERT_EQUAL(result.Acks, -1);
823+
UNIT_ASSERT_EQUAL(result.TimeoutMs, 30000);
824+
825+
auto& r0 = *result.TopicData[0].PartitionData[0].Records;
826+
UNIT_ASSERT_EQUAL(r0.BaseOffset, 0);
827+
UNIT_ASSERT_EQUAL(r0.BatchLength, 0);
828+
UNIT_ASSERT_EQUAL(r0.PartitionLeaderEpoch, -1);
829+
UNIT_ASSERT_EQUAL(r0.Magic, 2);
830+
UNIT_ASSERT_EQUAL(r0.Crc, 0);
831+
UNIT_ASSERT_EQUAL(r0.Attributes, 0);
832+
UNIT_ASSERT_EQUAL(r0.LastOffsetDelta, 0);
833+
UNIT_ASSERT_EQUAL(r0.BaseTimestamp, -1);
834+
UNIT_ASSERT_EQUAL(r0.MaxTimestamp, -1);
835+
UNIT_ASSERT_EQUAL(r0.ProducerId, -1);
836+
UNIT_ASSERT_EQUAL(r0.ProducerEpoch, -1);
837+
UNIT_ASSERT_EQUAL(r0.BaseSequence, -1);
838+
839+
UNIT_ASSERT_VALUES_EQUAL(r0.Records.size(), (size_t)4);
840+
841+
//UNIT_ASSERT_EQUAL(r0.Records[0].Key, TKafkaRawBytes("", 0));
842+
UNIT_ASSERT_EQUAL(r0.Records[0].Value, TKafkaRawBytes("aaaaa", 5));
843+
UNIT_ASSERT_EQUAL(r0.Records[0].Headers.size(), (size_t)0);
844+
845+
//UNIT_ASSERT_EQUAL(r0.Records[0].Key, TKafkaRawBytes("", 0));
846+
UNIT_ASSERT_EQUAL(r0.Records[1].Value, TKafkaRawBytes("bbbbb", 5));
847+
UNIT_ASSERT_EQUAL(r0.Records[1].Headers.size(), (size_t)0);
848+
849+
//UNIT_ASSERT_EQUAL(r0.Records[0].Key, TKafkaRawBytes("", 0));
850+
UNIT_ASSERT_EQUAL(r0.Records[2].Value, TKafkaRawBytes("ccccc", 5));
851+
UNIT_ASSERT_EQUAL(r0.Records[2].Headers.size(), (size_t)0);
852+
853+
//UNIT_ASSERT_EQUAL(r0.Records[0].Key, TKafkaRawBytes("", 0));
854+
UNIT_ASSERT_EQUAL(r0.Records[3].Value, TKafkaRawBytes("ddddd", 5));
855+
UNIT_ASSERT_EQUAL(r0.Records[3].Headers.size(), (size_t)0);
798856
}
799857

800858
void Print(std::string& sb) {
801-
for(size_t i = 0; i < sb.length(); ++i) {
802-
char c = sb.at(i);
803-
if (i > 0) {
804-
Cerr << ", ";
805-
}
806-
Cerr << "0x" << Hex(c >> 4) << Hex(c & 0x0F);
807-
}
808-
Cerr << Endl;
859+
Cerr << Hex(sb.begin(), sb.end()) << Endl;
809860
}
810861

811862
}

0 commit comments

Comments
 (0)