Skip to content

Commit 5783247

Browse files
[-] too many transactions
1 parent 2093221 commit 5783247

File tree

2 files changed

+51
-5
lines changed

2 files changed

+51
-5
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ static constexpr ui32 CACHE_SIZE = 100_MB;
4141
static constexpr ui32 MAX_BYTES = 25_MB;
4242
static constexpr ui32 MAX_SOURCE_ID_LENGTH = 2048;
4343
static constexpr ui32 MAX_HEARTBEAT_SIZE = 2_KB;
44+
static constexpr ui32 MAX_TXS = 1000;
4445

4546
struct TChangeNotification {
4647
TChangeNotification(const TActorId& actor, const ui64 txId)
@@ -3274,10 +3275,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
32743275
return;
32753276
}
32763277

3277-
//
3278-
// TODO(abcdef): сохранить пока инициализируемся. TEvPersQueue::TEvHasDataInfo::TPtr как образец. не только конфиг. Inited==true
3279-
//
3280-
32813278
if (txBody.OperationsSize() <= 0) {
32823279
PQ_LOG_D("TxId " << event.GetTxId() << " empty list of operations");
32833280
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
@@ -3346,7 +3343,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33463343
return;
33473344
}
33483345

3349-
33503346
if (txBody.GetImmediate()) {
33513347
PQ_LOG_D("immediate transaction");
33523348
TPartitionId originalPartitionId(txBody.GetOperations(0).GetPartitionId());
@@ -3360,6 +3356,15 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33603356

33613357
ctx.Send(partition.Actor, ev.Release());
33623358
} else {
3359+
if ((EvProposeTransactionQueue.size() + Txs.size()) >= MAX_TXS) {
3360+
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
3361+
event.GetTxId(),
3362+
NKikimrPQ::TError::ERROR,
3363+
"too many transactions",
3364+
ctx);
3365+
return;
3366+
}
3367+
33633368
PQ_LOG_D("distributed transaction");
33643369
EvProposeTransactionQueue.emplace_back(ev.Release());
33653370

ydb/core/persqueue/ut/pqtablet_ut.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2107,6 +2107,47 @@ Y_UNIT_TEST_F(TEvReadSet_For_A_Non_Existent_Tablet, TPQTabletFixture)
21072107
WaitForTheTransactionToBeDeleted(txId);
21082108
}
21092109

2110+
Y_UNIT_TEST_F(Limit_On_The_Number_Of_Transactons, TPQTabletFixture)
2111+
{
2112+
const ui64 mockTabletId = MakeTabletID(false, 22222);
2113+
const ui64 txId = 67890;
2114+
2115+
PQTabletPrepare({.partitions=1}, {}, *Ctx);
2116+
2117+
for (ui64 i = 0; i < 1002; ++i) {
2118+
SendProposeTransactionRequest({.TxId=txId + i,
2119+
.Senders={mockTabletId}, .Receivers={mockTabletId},
2120+
.TxOps={
2121+
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
2122+
}});
2123+
}
2124+
2125+
size_t preparedCount = 0;
2126+
size_t abortedCount = 0;
2127+
2128+
for (ui64 i = 0; i < 1002; ++i) {
2129+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPersQueue::TEvProposeTransactionResult>();
2130+
UNIT_ASSERT(event != nullptr);
2131+
2132+
UNIT_ASSERT(event->Record.HasStatus());
2133+
2134+
const auto status = event->Record.GetStatus();
2135+
switch (status) {
2136+
case NKikimrPQ::TEvProposeTransactionResult::PREPARED:
2137+
++preparedCount;
2138+
break;
2139+
case NKikimrPQ::TEvProposeTransactionResult::ABORTED:
2140+
++abortedCount;
2141+
break;
2142+
default:
2143+
UNIT_FAIL("unexpected transaction status " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(status));
2144+
}
2145+
}
2146+
2147+
UNIT_ASSERT_EQUAL(preparedCount, 1000);
2148+
UNIT_ASSERT_EQUAL(abortedCount, 2);
2149+
}
2150+
21102151
}
21112152

21122153
}

0 commit comments

Comments
 (0)