Skip to content

Commit 4a7c27a

Browse files
Merge 07653aa into acbb460
2 parents acbb460 + 07653aa commit 4a7c27a

File tree

5 files changed

+110
-38
lines changed

5 files changed

+110
-38
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ static constexpr ui32 CACHE_SIZE = 100_MB;
4141
static constexpr ui32 MAX_BYTES = 25_MB;
4242
static constexpr ui32 MAX_SOURCE_ID_LENGTH = 2048;
4343
static constexpr ui32 MAX_HEARTBEAT_SIZE = 2_KB;
44+
static constexpr ui32 MAX_TXS = 1000;
4445

4546
struct TChangeNotification {
4647
TChangeNotification(const TActorId& actor, const ui64 txId)
@@ -2953,8 +2954,10 @@ void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx)
29532954
continue;
29542955
}
29552956

2956-
for (auto& message : tx->GetBindedMsgs(tabletId)) {
2957-
PipeClientCache->Send(ctx, tabletId, message.Type, message.Data);
2957+
for (const auto& message : tx->GetBindedMsgs(tabletId)) {
2958+
auto event = std::make_unique<TEvTxProcessing::TEvReadSet>();
2959+
event->Record = message;
2960+
PipeClientCache->Send(ctx, tabletId, event.release());
29582961
}
29592962
}
29602963
}
@@ -3267,10 +3270,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
32673270
return;
32683271
}
32693272

3270-
//
3271-
// TODO(abcdef): сохранить пока инициализируемся. TEvPersQueue::TEvHasDataInfo::TPtr как образец. не только конфиг. Inited==true
3272-
//
3273-
32743273
if (txBody.OperationsSize() <= 0) {
32753274
PQ_LOG_D("TxId " << event.GetTxId() << " empty list of operations");
32763275
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
@@ -3339,7 +3338,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33393338
return;
33403339
}
33413340

3342-
33433341
if (txBody.GetImmediate()) {
33443342
PQ_LOG_D("immediate transaction");
33453343
TPartitionId originalPartitionId(txBody.GetOperations(0).GetPartitionId());
@@ -3353,6 +3351,15 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33533351

33543352
ctx.Send(partition.Actor, ev.Release());
33553353
} else {
3354+
if ((EvProposeTransactionQueue.size() + Txs.size()) >= MAX_TXS) {
3355+
SendProposeTransactionOverloaded(ActorIdFromProto(event.GetSourceActor()),
3356+
event.GetTxId(),
3357+
NKikimrPQ::TError::ERROR,
3358+
"too many transactions",
3359+
ctx);
3360+
return;
3361+
}
3362+
33563363
PQ_LOG_D("distributed transaction");
33573364
EvProposeTransactionQueue.emplace_back(ev.Release());
33583365

@@ -4128,7 +4135,7 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx,
41284135

41294136
void TPersQueue::SendToPipe(ui64 tabletId,
41304137
TDistributedTransaction& tx,
4131-
std::unique_ptr<IEventBase> event,
4138+
std::unique_ptr<TEvTxProcessing::TEvReadSet> event,
41324139
const TActorContext& ctx)
41334140
{
41344141
Y_ABORT_UNLESS(event);
@@ -4645,16 +4652,17 @@ bool TPersQueue::AllTransactionsHaveBeenProcessed() const
46454652
return EvProposeTransactionQueue.empty() && Txs.empty();
46464653
}
46474654

4648-
void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
4649-
ui64 txId,
4650-
NKikimrPQ::TError::EKind kind,
4651-
const TString& reason,
4652-
const TActorContext& ctx)
4655+
void TPersQueue::SendProposeTransactionResult(const TActorId& target,
4656+
ui64 txId,
4657+
NKikimrPQ::TEvProposeTransactionResult::EStatus status,
4658+
NKikimrPQ::TError::EKind kind,
4659+
const TString& reason,
4660+
const TActorContext& ctx)
46534661
{
46544662
auto event = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>();
46554663

46564664
event->Record.SetOrigin(TabletID());
4657-
event->Record.SetStatus(NKikimrPQ::TEvProposeTransactionResult::ABORTED);
4665+
event->Record.SetStatus(status);
46584666
event->Record.SetTxId(txId);
46594667

46604668
if (kind != NKikimrPQ::TError::OK) {
@@ -4670,6 +4678,34 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
46704678
ctx.Send(target, std::move(event));
46714679
}
46724680

4681+
void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
4682+
ui64 txId,
4683+
NKikimrPQ::TError::EKind kind,
4684+
const TString& reason,
4685+
const TActorContext& ctx)
4686+
{
4687+
SendProposeTransactionResult(target,
4688+
txId,
4689+
NKikimrPQ::TEvProposeTransactionResult::ABORTED,
4690+
kind,
4691+
reason,
4692+
ctx);
4693+
}
4694+
4695+
void TPersQueue::SendProposeTransactionOverloaded(const TActorId& target,
4696+
ui64 txId,
4697+
NKikimrPQ::TError::EKind kind,
4698+
const TString& reason,
4699+
const TActorContext& ctx)
4700+
{
4701+
SendProposeTransactionResult(target,
4702+
txId,
4703+
NKikimrPQ::TEvProposeTransactionResult::OVERLOADED,
4704+
kind,
4705+
reason,
4706+
ctx);
4707+
}
4708+
46734709
void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
46744710
TDistributedTransaction& tx)
46754711
{

ydb/core/persqueue/pq_impl.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,11 +358,22 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
358358
void EndWriteTabletState(const NKikimrClient::TResponse& resp,
359359
const TActorContext& ctx);
360360

361+
void SendProposeTransactionResult(const TActorId& target,
362+
ui64 txId,
363+
NKikimrPQ::TEvProposeTransactionResult::EStatus status,
364+
NKikimrPQ::TError::EKind kind,
365+
const TString& reason,
366+
const TActorContext& ctx);
361367
void SendProposeTransactionAbort(const TActorId& target,
362368
ui64 txId,
363369
NKikimrPQ::TError::EKind kind,
364370
const TString& reason,
365371
const TActorContext& ctx);
372+
void SendProposeTransactionOverloaded(const TActorId& target,
373+
ui64 txId,
374+
NKikimrPQ::TError::EKind kind,
375+
const TString& reason,
376+
const TActorContext& ctx);
366377

367378
void Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx);
368379
void HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> event,
@@ -405,7 +416,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
405416

406417
void SendToPipe(ui64 tabletId,
407418
TDistributedTransaction& tx,
408-
std::unique_ptr<IEventBase> event,
419+
std::unique_ptr<TEvTxProcessing::TEvReadSet> event,
409420
const TActorContext& ctx);
410421

411422
void InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,

ydb/core/persqueue/transaction.cpp

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -424,28 +424,23 @@ TString TDistributedTransaction::GetKey() const
424424
return GetTxKey(TxId);
425425
}
426426

427-
void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const IEventBase& event)
427+
void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const TEvTxProcessing::TEvReadSet& event)
428428
{
429-
Y_ABORT_UNLESS(event.IsSerializable());
430-
431-
TAllocChunkSerializer serializer;
432-
Y_ABORT_UNLESS(event.SerializeToArcadiaStream(&serializer));
433-
auto data = serializer.Release(event.CreateSerializationInfo());
434-
OutputMsgs[tabletId].emplace_back(event.Type(), std::move(data));
429+
OutputMsgs[tabletId].push_back(event.Record);
435430
}
436431

437432
void TDistributedTransaction::UnbindMsgsFromPipe(ui64 tabletId)
438433
{
439434
OutputMsgs.erase(tabletId);
440435
}
441436

442-
auto TDistributedTransaction::GetBindedMsgs(ui64 tabletId) -> const TVector<TSerializedMessage>&
437+
const TVector<NKikimrTx::TEvReadSet>& TDistributedTransaction::GetBindedMsgs(ui64 tabletId)
443438
{
444439
if (auto p = OutputMsgs.find(tabletId); p != OutputMsgs.end()) {
445440
return p->second;
446441
}
447442

448-
static TVector<TSerializedMessage> empty;
443+
static TVector<NKikimrTx::TEvReadSet> empty;
449444

450445
return empty;
451446
}

ydb/core/persqueue/transaction.h

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -97,22 +97,11 @@ struct TDistributedTransaction {
9797

9898
TString LogPrefix() const;
9999

100-
struct TSerializedMessage {
101-
ui32 Type;
102-
TIntrusivePtr<TEventSerializedData> Data;
100+
THashMap<ui64, TVector<NKikimrTx::TEvReadSet>> OutputMsgs;
103101

104-
TSerializedMessage(ui32 type, TIntrusivePtr<TEventSerializedData> data) :
105-
Type(type),
106-
Data(data)
107-
{
108-
}
109-
};
110-
111-
THashMap<ui64, TVector<TSerializedMessage>> OutputMsgs;
112-
113-
void BindMsgToPipe(ui64 tabletId, const IEventBase& event);
102+
void BindMsgToPipe(ui64 tabletId, const TEvTxProcessing::TEvReadSet& event);
114103
void UnbindMsgsFromPipe(ui64 tabletId);
115-
const TVector<TSerializedMessage>& GetBindedMsgs(ui64 tabletId);
104+
const TVector<NKikimrTx::TEvReadSet>& GetBindedMsgs(ui64 tabletId);
116105

117106
bool HasWriteOperations = false;
118107
size_t PredicateAcksCount = 0;

ydb/core/persqueue/ut/pqtablet_ut.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2107,6 +2107,47 @@ Y_UNIT_TEST_F(TEvReadSet_For_A_Non_Existent_Tablet, TPQTabletFixture)
21072107
WaitForTheTransactionToBeDeleted(txId);
21082108
}
21092109

2110+
Y_UNIT_TEST_F(Limit_On_The_Number_Of_Transactons, TPQTabletFixture)
2111+
{
2112+
const ui64 mockTabletId = MakeTabletID(false, 22222);
2113+
const ui64 txId = 67890;
2114+
2115+
PQTabletPrepare({.partitions=1}, {}, *Ctx);
2116+
2117+
for (ui64 i = 0; i < 1002; ++i) {
2118+
SendProposeTransactionRequest({.TxId=txId + i,
2119+
.Senders={mockTabletId}, .Receivers={mockTabletId},
2120+
.TxOps={
2121+
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
2122+
}});
2123+
}
2124+
2125+
size_t preparedCount = 0;
2126+
size_t overloadedCount = 0;
2127+
2128+
for (ui64 i = 0; i < 1002; ++i) {
2129+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPersQueue::TEvProposeTransactionResult>();
2130+
UNIT_ASSERT(event != nullptr);
2131+
2132+
UNIT_ASSERT(event->Record.HasStatus());
2133+
2134+
const auto status = event->Record.GetStatus();
2135+
switch (status) {
2136+
case NKikimrPQ::TEvProposeTransactionResult::PREPARED:
2137+
++preparedCount;
2138+
break;
2139+
case NKikimrPQ::TEvProposeTransactionResult::OVERLOADED:
2140+
++overloadedCount;
2141+
break;
2142+
default:
2143+
UNIT_FAIL("unexpected transaction status " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(status));
2144+
}
2145+
}
2146+
2147+
UNIT_ASSERT_EQUAL(preparedCount, 1000);
2148+
UNIT_ASSERT_EQUAL(overloadedCount, 2);
2149+
}
2150+
21102151
}
21112152

21122153
}

0 commit comments

Comments
 (0)