Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/kafka_proxy/kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,10 @@ class TKafkaReadable {

void skip(size_t length);

size_t left() const;

size_t position() const;

private:
void checkEof(size_t length);

Expand Down
7 changes: 6 additions & 1 deletion ydb/core/kafka_proxy/kafka_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,12 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet

case MESSAGE_PROCESS:
Request->StartTime = TInstant::Now();
KAFKA_LOG_D("received message. ApiKey=" << Request->ApiKey << ", Version=" << Request->ApiVersion << ", CorrelationId=" << Request->CorrelationId);
if constexpr (DEBUG_ENABLED) {
KAFKA_LOG_D("received message. ApiKey=" << Request->ApiKey << ", Version=" << Request->ApiVersion << ", CorrelationId=" << Request->CorrelationId
<< ", Data=" << Hex(Request->Buffer->Begin(), Request->Buffer->End()));
} else {
KAFKA_LOG_D("received message. ApiKey=" << Request->ApiKey << ", Version=" << Request->ApiVersion << ", CorrelationId=" << Request->CorrelationId);
}

TKafkaReadable readable(*Request->Buffer);

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kafka_proxy/kafka_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

namespace NKafka {

static constexpr bool DEBUG_ENABLED = false;

TString Hex(const char* begin, const char* end);

inline TString LogPrefix() { return {}; }

}
24 changes: 24 additions & 0 deletions ydb/core/kafka_proxy/kafka_messages_int.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,34 @@ char TKafkaReadable::take(size_t shift) {
return *(Is.Data() + Position + shift);
}

size_t TKafkaReadable::left() const {
return Is.Size() - Position;
}

size_t TKafkaReadable::position() const {
return Position;
}

void TKafkaReadable::checkEof(size_t length) {
if (Position + length > Is.Size()) {
ythrow yexception() << "unexpected end of stream";
}
}

char Hex(const unsigned char c) {
return c < 10 ? '0' + c : 'A' + c - 10;
}

TString Hex(const char* begin, const char *end) {
TStringBuilder sb;
for(auto i = begin; i < end; ++i) {
unsigned char c = *i;
if (i != begin) {
sb << ", ";
}
sb << "0x" << Hex(c >> 4) << Hex(c & 0x0F);
}
return sb;
}

} // namespace NKafka
36 changes: 27 additions & 9 deletions ydb/core/kafka_proxy/kafka_messages_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
namespace NKafka {
namespace NPrivate {

static constexpr bool DEBUG_ENABLED = false;

struct TWriteCollector {
ui32 NumTaggedFields = 0;
};
Expand Down Expand Up @@ -191,7 +189,11 @@ class TypeStrategy {

inline static void DoLog(const TValueType& value) {
if constexpr (DEBUG_ENABLED) {
Cerr << "Was read field '" << Meta::Name << "' value " << value << Endl;
if constexpr (sizeof(TValueType) == 1) {
Cerr << "Was read field '" << Meta::Name << "' value " << (size_t)value << Endl;
} else {
Cerr << "Was read field '" << Meta::Name << "' value " << value << Endl;
}
}
}
};
Expand Down Expand Up @@ -483,11 +485,12 @@ class TypeStrategy<Meta, TKafkaRecords, TKafkaRecordsDesc> {
value.emplace();

if (magic < CURRENT_RECORD_VERSION) {
size_t end = readable.position() + length;

TKafkaRecordBatchV0 v0;
v0.Read(readable, magic);

value->Magic = v0.Record.Magic;
value->Crc = v0.Record.Crc;
value->Magic = 2;
value->Attributes = v0.Record.Attributes & 0x07;

value->Records.resize(1);
Expand All @@ -497,6 +500,21 @@ class TypeStrategy<Meta, TKafkaRecords, TKafkaRecordsDesc> {
record.TimestampDelta = v0.Record.Timestamp;
record.Key = v0.Record.Key;
record.Value = v0.Record.Value;

while(readable.position() < end) {
magic = readable.take(16);

v0 = {};
v0.Read(readable, magic);

value->Records.resize(value->Records.size() + 1);
auto& record = value->Records.back();
record.Length = v0.Record.MessageSize;
record.OffsetDelta = v0.Offset;
record.TimestampDelta = v0.Record.Timestamp;
record.Key = v0.Record.Key;
record.Value = v0.Record.Value;
}
} else {
(*value).Read(readable, magic);
}
Expand Down Expand Up @@ -633,16 +651,16 @@ inline void Size(TSizeCollector& collector, TKafkaInt16 version, const typename
++collector.NumTaggedFields;

i64 size = TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value);
collector.Size += size + SizeOfUnsignedVarint(Meta::Tag) + SizeOfUnsignedVarint(size);
collector.Size += size + SizeOfUnsignedVarint(Meta::Tag) + SizeOfUnsignedVarint(size);
if constexpr (DEBUG_ENABLED) {
Cerr << "Size of field '" << Meta::Name << "' " << size << " + " << SizeOfUnsignedVarint(Meta::Tag) << " + " << SizeOfUnsignedVarint(size) << Endl;
Cerr << "Size of field '" << Meta::Name << "' " << size << " + " << SizeOfUnsignedVarint(Meta::Tag) << " + " << SizeOfUnsignedVarint(size) << Endl;
}
}
} else {
i64 size = TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value);
collector.Size += size;
if constexpr (DEBUG_ENABLED) {
Cerr << "Size of field '" << Meta::Name << "' " << size << Endl;
Cerr << "Size of field '" << Meta::Name << "' " << size << Endl;
}
}
}
Expand All @@ -651,7 +669,7 @@ inline void Size(TSizeCollector& collector, TKafkaInt16 version, const typename
i64 size = TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value);
collector.Size += size;
if constexpr (DEBUG_ENABLED) {
Cerr << "Size of field '" << Meta::Name << "' " << size << Endl;
Cerr << "Size of field '" << Meta::Name << "' " << size << Endl;
}
}
}
Expand Down
14 changes: 1 addition & 13 deletions ydb/core/kafka_proxy/ut/kafka_test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -808,19 +808,7 @@ TMessagePtr<T> TKafkaTestClient::Read(TSocketInput& si, TRequestHeaderData* requ
}

void TKafkaTestClient::Print(const TBuffer& buffer) {
TStringBuilder sb;
for (size_t i = 0; i < buffer.Size(); ++i) {
char c = buffer.Data()[i];
if (i > 0) {
sb << ", ";
}
sb << "0x" << Hex0((c & 0xF0) >> 4) << Hex0(c & 0x0F);
}
Cerr << ">>>>> Packet sent: " << sb << Endl;
}

char TKafkaTestClient::Hex0(const unsigned char c) {
return c < 10 ? '0' + c : 'A' + c - 10;
Cerr << ">>>>> Packet sent: " << Hex(buffer.Begin(), buffer.End()) << Endl;
}

void TKafkaTestClient::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics) {
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kafka_proxy/ut/kafka_test_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ class TKafkaTestClient {
template <std::derived_from<TApiMessage> T>
TMessagePtr<T> Read(TSocketInput& si, TRequestHeaderData* requestHeader);
void Print(const TBuffer& buffer);
char Hex0(const unsigned char c);
void FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics);

private:
Expand Down
75 changes: 63 additions & 12 deletions ydb/core/kafka_proxy/ut/ut_serialization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -776,8 +776,8 @@ Y_UNIT_TEST(ProduceRequestData_Record_v0) {
UNIT_ASSERT_EQUAL(r0.BaseOffset, 0);
UNIT_ASSERT_EQUAL(r0.BatchLength, 0);
UNIT_ASSERT_EQUAL(r0.PartitionLeaderEpoch, -1);
UNIT_ASSERT_EQUAL(r0.Magic, 0);
UNIT_ASSERT_EQUAL(r0.Crc, 544167206);
UNIT_ASSERT_EQUAL(r0.Magic, 2);
UNIT_ASSERT_EQUAL(r0.Crc, 0);
UNIT_ASSERT_EQUAL(r0.Attributes, 0);
UNIT_ASSERT_EQUAL(r0.LastOffsetDelta, 0);
UNIT_ASSERT_EQUAL(r0.BaseTimestamp, -1);
Expand All @@ -793,19 +793,70 @@ Y_UNIT_TEST(ProduceRequestData_Record_v0) {
UNIT_ASSERT_EQUAL(r0.Records[0].Headers.size(), (size_t)0);
}

char Hex(const unsigned char c) {
return c < 10 ? '0' + c : 'A' + c - 10;
//

Y_UNIT_TEST(ProduceRequestData_Record_v0_manyMessages) {
ui8 reference[] = { 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x05, 0x00, 0x07, 0x72, 0x64,
0x6B, 0x61, 0x66, 0x6B, 0x61, 0x00, 0x00, 0xFF, 0xFF, 0x00, 0x00, 0x75, 0x30, 0x02,
0x05, 0x61, 0x61, 0x61, 0x61, 0x02, 0x00, 0x00, 0x00, 0x00, 0x7D, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x13, 0x5F, 0x1B, 0x4F, 0x8D, 0x00, 0x00, 0xFF, 0xFF, 0xFF,
0xFF, 0x00, 0x00, 0x00, 0x05, 0x61, 0x61, 0x61, 0x61, 0x61, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x01, 0x00, 0x00, 0x00, 0x13, 0xBA, 0x6C, 0x26, 0x93, 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x05, 0x62, 0x62, 0x62, 0x62, 0x62, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x02, 0x00, 0x00, 0x00, 0x13, 0x50, 0x6E, 0x03, 0xA6, 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x00,
0x00, 0x00, 0x05, 0x63, 0x63, 0x63, 0x63, 0x63, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03,
0x00, 0x00, 0x00, 0x13, 0xAB, 0xF3, 0xF2, 0xEE, 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00,
0x00, 0x05, 0x64, 0x64, 0x64, 0x64, 0x64, 0x00, 0x00, 0x00};

TBuffer buffer((char*)reference, sizeof(reference));
TKafkaReadable readable(buffer);

Cerr << ">>>>> Buffer size: " << buffer.Size() << Endl;

TRequestHeaderData header;
header.Read(readable, 2);

TProduceRequestData result;
result.Read(readable, header.RequestApiVersion);

UNIT_ASSERT_EQUAL(result.Acks, -1);
UNIT_ASSERT_EQUAL(result.TimeoutMs, 30000);

auto& r0 = *result.TopicData[0].PartitionData[0].Records;
UNIT_ASSERT_EQUAL(r0.BaseOffset, 0);
UNIT_ASSERT_EQUAL(r0.BatchLength, 0);
UNIT_ASSERT_EQUAL(r0.PartitionLeaderEpoch, -1);
UNIT_ASSERT_EQUAL(r0.Magic, 2);
UNIT_ASSERT_EQUAL(r0.Crc, 0);
UNIT_ASSERT_EQUAL(r0.Attributes, 0);
UNIT_ASSERT_EQUAL(r0.LastOffsetDelta, 0);
UNIT_ASSERT_EQUAL(r0.BaseTimestamp, -1);
UNIT_ASSERT_EQUAL(r0.MaxTimestamp, -1);
UNIT_ASSERT_EQUAL(r0.ProducerId, -1);
UNIT_ASSERT_EQUAL(r0.ProducerEpoch, -1);
UNIT_ASSERT_EQUAL(r0.BaseSequence, -1);

UNIT_ASSERT_VALUES_EQUAL(r0.Records.size(), (size_t)4);

//UNIT_ASSERT_EQUAL(r0.Records[0].Key, TKafkaRawBytes("", 0));
UNIT_ASSERT_EQUAL(r0.Records[0].Value, TKafkaRawBytes("aaaaa", 5));
UNIT_ASSERT_EQUAL(r0.Records[0].Headers.size(), (size_t)0);

//UNIT_ASSERT_EQUAL(r0.Records[0].Key, TKafkaRawBytes("", 0));
UNIT_ASSERT_EQUAL(r0.Records[1].Value, TKafkaRawBytes("bbbbb", 5));
UNIT_ASSERT_EQUAL(r0.Records[1].Headers.size(), (size_t)0);

//UNIT_ASSERT_EQUAL(r0.Records[0].Key, TKafkaRawBytes("", 0));
UNIT_ASSERT_EQUAL(r0.Records[2].Value, TKafkaRawBytes("ccccc", 5));
UNIT_ASSERT_EQUAL(r0.Records[2].Headers.size(), (size_t)0);

//UNIT_ASSERT_EQUAL(r0.Records[0].Key, TKafkaRawBytes("", 0));
UNIT_ASSERT_EQUAL(r0.Records[3].Value, TKafkaRawBytes("ddddd", 5));
UNIT_ASSERT_EQUAL(r0.Records[3].Headers.size(), (size_t)0);
}

void Print(std::string& sb) {
for(size_t i = 0; i < sb.length(); ++i) {
char c = sb.at(i);
if (i > 0) {
Cerr << ", ";
}
Cerr << "0x" << Hex(c >> 4) << Hex(c & 0x0F);
}
Cerr << Endl;
Cerr << Hex(sb.begin(), sb.end()) << Endl;
}

}
Loading