Skip to content

Commit 9d4ad3c

Browse files
The serialized TEvReadSet takes up a lot of memory (#18289)
2 parents acbb460 + b5924e8 commit 9d4ad3c

File tree

5 files changed

+115
-38
lines changed

5 files changed

+115
-38
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ static constexpr ui32 CACHE_SIZE = 100_MB;
4141
static constexpr ui32 MAX_BYTES = 25_MB;
4242
static constexpr ui32 MAX_SOURCE_ID_LENGTH = 2048;
4343
static constexpr ui32 MAX_HEARTBEAT_SIZE = 2_KB;
44+
static constexpr ui32 MAX_TXS = 1000;
4445

4546
struct TChangeNotification {
4647
TChangeNotification(const TActorId& actor, const ui64 txId)
@@ -666,6 +667,11 @@ void TPersQueue::HandleTransactionsReadResponse(NKikimrClient::TResponse&& resp,
666667
(resp.ReadRangeResultSize() == 1) &&
667668
(resp.HasSetExecutorFastLogPolicyResult()) &&
668669
(resp.GetSetExecutorFastLogPolicyResult().GetStatus() == NKikimrProto::OK);
670+
if (!ok) {
671+
PQ_LOG_ERROR_AND_DIE("Transactions read error: " << resp.ShortDebugString());
672+
return;
673+
}
674+
669675
const auto& result = resp.GetReadRangeResult(0);
670676
auto status = result.GetStatus();
671677
if (status != NKikimrProto::OVERRUN &&
@@ -2953,8 +2959,10 @@ void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx)
29532959
continue;
29542960
}
29552961

2956-
for (auto& message : tx->GetBindedMsgs(tabletId)) {
2957-
PipeClientCache->Send(ctx, tabletId, message.Type, message.Data);
2962+
for (const auto& message : tx->GetBindedMsgs(tabletId)) {
2963+
auto event = std::make_unique<TEvTxProcessing::TEvReadSet>();
2964+
event->Record = message;
2965+
PipeClientCache->Send(ctx, tabletId, event.release());
29582966
}
29592967
}
29602968
}
@@ -3267,10 +3275,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
32673275
return;
32683276
}
32693277

3270-
//
3271-
// TODO(abcdef): сохранить пока инициализируемся. TEvPersQueue::TEvHasDataInfo::TPtr как образец. не только конфиг. Inited==true
3272-
//
3273-
32743278
if (txBody.OperationsSize() <= 0) {
32753279
PQ_LOG_D("TxId " << event.GetTxId() << " empty list of operations");
32763280
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
@@ -3339,7 +3343,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33393343
return;
33403344
}
33413345

3342-
33433346
if (txBody.GetImmediate()) {
33443347
PQ_LOG_D("immediate transaction");
33453348
TPartitionId originalPartitionId(txBody.GetOperations(0).GetPartitionId());
@@ -3353,6 +3356,15 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33533356

33543357
ctx.Send(partition.Actor, ev.Release());
33553358
} else {
3359+
if ((EvProposeTransactionQueue.size() + Txs.size()) >= MAX_TXS) {
3360+
SendProposeTransactionOverloaded(ActorIdFromProto(event.GetSourceActor()),
3361+
event.GetTxId(),
3362+
NKikimrPQ::TError::ERROR,
3363+
"too many transactions",
3364+
ctx);
3365+
return;
3366+
}
3367+
33563368
PQ_LOG_D("distributed transaction");
33573369
EvProposeTransactionQueue.emplace_back(ev.Release());
33583370

@@ -4128,7 +4140,7 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx,
41284140

41294141
void TPersQueue::SendToPipe(ui64 tabletId,
41304142
TDistributedTransaction& tx,
4131-
std::unique_ptr<IEventBase> event,
4143+
std::unique_ptr<TEvTxProcessing::TEvReadSet> event,
41324144
const TActorContext& ctx)
41334145
{
41344146
Y_ABORT_UNLESS(event);
@@ -4645,16 +4657,17 @@ bool TPersQueue::AllTransactionsHaveBeenProcessed() const
46454657
return EvProposeTransactionQueue.empty() && Txs.empty();
46464658
}
46474659

4648-
void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
4649-
ui64 txId,
4650-
NKikimrPQ::TError::EKind kind,
4651-
const TString& reason,
4652-
const TActorContext& ctx)
4660+
void TPersQueue::SendProposeTransactionResult(const TActorId& target,
4661+
ui64 txId,
4662+
NKikimrPQ::TEvProposeTransactionResult::EStatus status,
4663+
NKikimrPQ::TError::EKind kind,
4664+
const TString& reason,
4665+
const TActorContext& ctx)
46534666
{
46544667
auto event = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>();
46554668

46564669
event->Record.SetOrigin(TabletID());
4657-
event->Record.SetStatus(NKikimrPQ::TEvProposeTransactionResult::ABORTED);
4670+
event->Record.SetStatus(status);
46584671
event->Record.SetTxId(txId);
46594672

46604673
if (kind != NKikimrPQ::TError::OK) {
@@ -4670,6 +4683,34 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
46704683
ctx.Send(target, std::move(event));
46714684
}
46724685

4686+
void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
4687+
ui64 txId,
4688+
NKikimrPQ::TError::EKind kind,
4689+
const TString& reason,
4690+
const TActorContext& ctx)
4691+
{
4692+
SendProposeTransactionResult(target,
4693+
txId,
4694+
NKikimrPQ::TEvProposeTransactionResult::ABORTED,
4695+
kind,
4696+
reason,
4697+
ctx);
4698+
}
4699+
4700+
void TPersQueue::SendProposeTransactionOverloaded(const TActorId& target,
4701+
ui64 txId,
4702+
NKikimrPQ::TError::EKind kind,
4703+
const TString& reason,
4704+
const TActorContext& ctx)
4705+
{
4706+
SendProposeTransactionResult(target,
4707+
txId,
4708+
NKikimrPQ::TEvProposeTransactionResult::OVERLOADED,
4709+
kind,
4710+
reason,
4711+
ctx);
4712+
}
4713+
46734714
void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
46744715
TDistributedTransaction& tx)
46754716
{

ydb/core/persqueue/pq_impl.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,11 +358,22 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
358358
void EndWriteTabletState(const NKikimrClient::TResponse& resp,
359359
const TActorContext& ctx);
360360

361+
void SendProposeTransactionResult(const TActorId& target,
362+
ui64 txId,
363+
NKikimrPQ::TEvProposeTransactionResult::EStatus status,
364+
NKikimrPQ::TError::EKind kind,
365+
const TString& reason,
366+
const TActorContext& ctx);
361367
void SendProposeTransactionAbort(const TActorId& target,
362368
ui64 txId,
363369
NKikimrPQ::TError::EKind kind,
364370
const TString& reason,
365371
const TActorContext& ctx);
372+
void SendProposeTransactionOverloaded(const TActorId& target,
373+
ui64 txId,
374+
NKikimrPQ::TError::EKind kind,
375+
const TString& reason,
376+
const TActorContext& ctx);
366377

367378
void Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx);
368379
void HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> event,
@@ -405,7 +416,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
405416

406417
void SendToPipe(ui64 tabletId,
407418
TDistributedTransaction& tx,
408-
std::unique_ptr<IEventBase> event,
419+
std::unique_ptr<TEvTxProcessing::TEvReadSet> event,
409420
const TActorContext& ctx);
410421

411422
void InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,

ydb/core/persqueue/transaction.cpp

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -424,28 +424,23 @@ TString TDistributedTransaction::GetKey() const
424424
return GetTxKey(TxId);
425425
}
426426

427-
void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const IEventBase& event)
427+
void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const TEvTxProcessing::TEvReadSet& event)
428428
{
429-
Y_ABORT_UNLESS(event.IsSerializable());
430-
431-
TAllocChunkSerializer serializer;
432-
Y_ABORT_UNLESS(event.SerializeToArcadiaStream(&serializer));
433-
auto data = serializer.Release(event.CreateSerializationInfo());
434-
OutputMsgs[tabletId].emplace_back(event.Type(), std::move(data));
429+
OutputMsgs[tabletId].push_back(event.Record);
435430
}
436431

437432
void TDistributedTransaction::UnbindMsgsFromPipe(ui64 tabletId)
438433
{
439434
OutputMsgs.erase(tabletId);
440435
}
441436

442-
auto TDistributedTransaction::GetBindedMsgs(ui64 tabletId) -> const TVector<TSerializedMessage>&
437+
const TVector<NKikimrTx::TEvReadSet>& TDistributedTransaction::GetBindedMsgs(ui64 tabletId)
443438
{
444439
if (auto p = OutputMsgs.find(tabletId); p != OutputMsgs.end()) {
445440
return p->second;
446441
}
447442

448-
static TVector<TSerializedMessage> empty;
443+
static TVector<NKikimrTx::TEvReadSet> empty;
449444

450445
return empty;
451446
}

ydb/core/persqueue/transaction.h

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -97,22 +97,11 @@ struct TDistributedTransaction {
9797

9898
TString LogPrefix() const;
9999

100-
struct TSerializedMessage {
101-
ui32 Type;
102-
TIntrusivePtr<TEventSerializedData> Data;
100+
THashMap<ui64, TVector<NKikimrTx::TEvReadSet>> OutputMsgs;
103101

104-
TSerializedMessage(ui32 type, TIntrusivePtr<TEventSerializedData> data) :
105-
Type(type),
106-
Data(data)
107-
{
108-
}
109-
};
110-
111-
THashMap<ui64, TVector<TSerializedMessage>> OutputMsgs;
112-
113-
void BindMsgToPipe(ui64 tabletId, const IEventBase& event);
102+
void BindMsgToPipe(ui64 tabletId, const TEvTxProcessing::TEvReadSet& event);
114103
void UnbindMsgsFromPipe(ui64 tabletId);
115-
const TVector<TSerializedMessage>& GetBindedMsgs(ui64 tabletId);
104+
const TVector<NKikimrTx::TEvReadSet>& GetBindedMsgs(ui64 tabletId);
116105

117106
bool HasWriteOperations = false;
118107
size_t PredicateAcksCount = 0;

ydb/core/persqueue/ut/pqtablet_ut.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2107,6 +2107,47 @@ Y_UNIT_TEST_F(TEvReadSet_For_A_Non_Existent_Tablet, TPQTabletFixture)
21072107
WaitForTheTransactionToBeDeleted(txId);
21082108
}
21092109

2110+
Y_UNIT_TEST_F(Limit_On_The_Number_Of_Transactons, TPQTabletFixture)
2111+
{
2112+
const ui64 mockTabletId = MakeTabletID(false, 22222);
2113+
const ui64 txId = 67890;
2114+
2115+
PQTabletPrepare({.partitions=1}, {}, *Ctx);
2116+
2117+
for (ui64 i = 0; i < 1002; ++i) {
2118+
SendProposeTransactionRequest({.TxId=txId + i,
2119+
.Senders={mockTabletId}, .Receivers={mockTabletId},
2120+
.TxOps={
2121+
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
2122+
}});
2123+
}
2124+
2125+
size_t preparedCount = 0;
2126+
size_t overloadedCount = 0;
2127+
2128+
for (ui64 i = 0; i < 1002; ++i) {
2129+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPersQueue::TEvProposeTransactionResult>();
2130+
UNIT_ASSERT(event != nullptr);
2131+
2132+
UNIT_ASSERT(event->Record.HasStatus());
2133+
2134+
const auto status = event->Record.GetStatus();
2135+
switch (status) {
2136+
case NKikimrPQ::TEvProposeTransactionResult::PREPARED:
2137+
++preparedCount;
2138+
break;
2139+
case NKikimrPQ::TEvProposeTransactionResult::OVERLOADED:
2140+
++overloadedCount;
2141+
break;
2142+
default:
2143+
UNIT_FAIL("unexpected transaction status " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(status));
2144+
}
2145+
}
2146+
2147+
UNIT_ASSERT_EQUAL(preparedCount, 1000);
2148+
UNIT_ASSERT_EQUAL(overloadedCount, 2);
2149+
}
2150+
21102151
}
21112152

21122153
}

0 commit comments

Comments
 (0)