Skip to content
69 changes: 55 additions & 14 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ static constexpr ui32 CACHE_SIZE = 100_MB;
static constexpr ui32 MAX_BYTES = 25_MB;
static constexpr ui32 MAX_SOURCE_ID_LENGTH = 2048;
static constexpr ui32 MAX_HEARTBEAT_SIZE = 2_KB;
static constexpr ui32 MAX_TXS = 1000;

struct TChangeNotification {
TChangeNotification(const TActorId& actor, const ui64 txId)
Expand Down Expand Up @@ -666,6 +667,11 @@ void TPersQueue::HandleTransactionsReadResponse(NKikimrClient::TResponse&& resp,
(resp.ReadRangeResultSize() == 1) &&
(resp.HasSetExecutorFastLogPolicyResult()) &&
(resp.GetSetExecutorFastLogPolicyResult().GetStatus() == NKikimrProto::OK);
if (!ok) {
PQ_LOG_ERROR_AND_DIE("Transactions read error: " << resp.ShortDebugString());
return;
}

const auto& result = resp.GetReadRangeResult(0);
auto status = result.GetStatus();
if (status != NKikimrProto::OVERRUN &&
Expand Down Expand Up @@ -2953,8 +2959,10 @@ void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx)
continue;
}

for (auto& message : tx->GetBindedMsgs(tabletId)) {
PipeClientCache->Send(ctx, tabletId, message.Type, message.Data);
for (const auto& message : tx->GetBindedMsgs(tabletId)) {
auto event = std::make_unique<TEvTxProcessing::TEvReadSet>();
event->Record = message;
PipeClientCache->Send(ctx, tabletId, event.release());
}
}
}
Expand Down Expand Up @@ -3267,10 +3275,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
return;
}

//
// TODO(abcdef): сохранить пока инициализируемся. TEvPersQueue::TEvHasDataInfo::TPtr как образец. не только конфиг. Inited==true
//

if (txBody.OperationsSize() <= 0) {
PQ_LOG_D("TxId " << event.GetTxId() << " empty list of operations");
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
Expand Down Expand Up @@ -3339,7 +3343,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
return;
}


if (txBody.GetImmediate()) {
PQ_LOG_D("immediate transaction");
TPartitionId originalPartitionId(txBody.GetOperations(0).GetPartitionId());
Expand All @@ -3353,6 +3356,15 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact

ctx.Send(partition.Actor, ev.Release());
} else {
if ((EvProposeTransactionQueue.size() + Txs.size()) >= MAX_TXS) {
SendProposeTransactionOverloaded(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
NKikimrPQ::TError::ERROR,
"too many transactions",
ctx);
return;
}

PQ_LOG_D("distributed transaction");
EvProposeTransactionQueue.emplace_back(ev.Release());

Expand Down Expand Up @@ -4128,7 +4140,7 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx,

void TPersQueue::SendToPipe(ui64 tabletId,
TDistributedTransaction& tx,
std::unique_ptr<IEventBase> event,
std::unique_ptr<TEvTxProcessing::TEvReadSet> event,
const TActorContext& ctx)
{
Y_ABORT_UNLESS(event);
Expand Down Expand Up @@ -4645,16 +4657,17 @@ bool TPersQueue::AllTransactionsHaveBeenProcessed() const
return EvProposeTransactionQueue.empty() && Txs.empty();
}

void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
ui64 txId,
NKikimrPQ::TError::EKind kind,
const TString& reason,
const TActorContext& ctx)
void TPersQueue::SendProposeTransactionResult(const TActorId& target,
ui64 txId,
NKikimrPQ::TEvProposeTransactionResult::EStatus status,
NKikimrPQ::TError::EKind kind,
const TString& reason,
const TActorContext& ctx)
{
auto event = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>();

event->Record.SetOrigin(TabletID());
event->Record.SetStatus(NKikimrPQ::TEvProposeTransactionResult::ABORTED);
event->Record.SetStatus(status);
event->Record.SetTxId(txId);

if (kind != NKikimrPQ::TError::OK) {
Expand All @@ -4670,6 +4683,34 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
ctx.Send(target, std::move(event));
}

void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
ui64 txId,
NKikimrPQ::TError::EKind kind,
const TString& reason,
const TActorContext& ctx)
{
SendProposeTransactionResult(target,
txId,
NKikimrPQ::TEvProposeTransactionResult::ABORTED,
kind,
reason,
ctx);
}

void TPersQueue::SendProposeTransactionOverloaded(const TActorId& target,
ui64 txId,
NKikimrPQ::TError::EKind kind,
const TString& reason,
const TActorContext& ctx)
{
SendProposeTransactionResult(target,
txId,
NKikimrPQ::TEvProposeTransactionResult::OVERLOADED,
kind,
reason,
ctx);
}

void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
TDistributedTransaction& tx)
{
Expand Down
13 changes: 12 additions & 1 deletion ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,22 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void EndWriteTabletState(const NKikimrClient::TResponse& resp,
const TActorContext& ctx);

void SendProposeTransactionResult(const TActorId& target,
ui64 txId,
NKikimrPQ::TEvProposeTransactionResult::EStatus status,
NKikimrPQ::TError::EKind kind,
const TString& reason,
const TActorContext& ctx);
void SendProposeTransactionAbort(const TActorId& target,
ui64 txId,
NKikimrPQ::TError::EKind kind,
const TString& reason,
const TActorContext& ctx);
void SendProposeTransactionOverloaded(const TActorId& target,
ui64 txId,
NKikimrPQ::TError::EKind kind,
const TString& reason,
const TActorContext& ctx);

void Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx);
void HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> event,
Expand Down Expand Up @@ -405,7 +416,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {

void SendToPipe(ui64 tabletId,
TDistributedTransaction& tx,
std::unique_ptr<IEventBase> event,
std::unique_ptr<TEvTxProcessing::TEvReadSet> event,
const TActorContext& ctx);

void InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,
Expand Down
13 changes: 4 additions & 9 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,28 +424,23 @@ TString TDistributedTransaction::GetKey() const
return GetTxKey(TxId);
}

void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const IEventBase& event)
void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const TEvTxProcessing::TEvReadSet& event)
{
Y_ABORT_UNLESS(event.IsSerializable());

TAllocChunkSerializer serializer;
Y_ABORT_UNLESS(event.SerializeToArcadiaStream(&serializer));
auto data = serializer.Release(event.CreateSerializationInfo());
OutputMsgs[tabletId].emplace_back(event.Type(), std::move(data));
OutputMsgs[tabletId].push_back(event.Record);
}

void TDistributedTransaction::UnbindMsgsFromPipe(ui64 tabletId)
{
OutputMsgs.erase(tabletId);
}

auto TDistributedTransaction::GetBindedMsgs(ui64 tabletId) -> const TVector<TSerializedMessage>&
const TVector<NKikimrTx::TEvReadSet>& TDistributedTransaction::GetBindedMsgs(ui64 tabletId)
{
if (auto p = OutputMsgs.find(tabletId); p != OutputMsgs.end()) {
return p->second;
}

static TVector<TSerializedMessage> empty;
static TVector<NKikimrTx::TEvReadSet> empty;

return empty;
}
Expand Down
17 changes: 3 additions & 14 deletions ydb/core/persqueue/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,11 @@ struct TDistributedTransaction {

TString LogPrefix() const;

struct TSerializedMessage {
ui32 Type;
TIntrusivePtr<TEventSerializedData> Data;
THashMap<ui64, TVector<NKikimrTx::TEvReadSet>> OutputMsgs;

TSerializedMessage(ui32 type, TIntrusivePtr<TEventSerializedData> data) :
Type(type),
Data(data)
{
}
};

THashMap<ui64, TVector<TSerializedMessage>> OutputMsgs;

void BindMsgToPipe(ui64 tabletId, const IEventBase& event);
void BindMsgToPipe(ui64 tabletId, const TEvTxProcessing::TEvReadSet& event);
void UnbindMsgsFromPipe(ui64 tabletId);
const TVector<TSerializedMessage>& GetBindedMsgs(ui64 tabletId);
const TVector<NKikimrTx::TEvReadSet>& GetBindedMsgs(ui64 tabletId);

bool HasWriteOperations = false;
size_t PredicateAcksCount = 0;
Expand Down
41 changes: 41 additions & 0 deletions ydb/core/persqueue/ut/pqtablet_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2107,6 +2107,47 @@ Y_UNIT_TEST_F(TEvReadSet_For_A_Non_Existent_Tablet, TPQTabletFixture)
WaitForTheTransactionToBeDeleted(txId);
}

Y_UNIT_TEST_F(Limit_On_The_Number_Of_Transactons, TPQTabletFixture)
{
const ui64 mockTabletId = MakeTabletID(false, 22222);
const ui64 txId = 67890;

PQTabletPrepare({.partitions=1}, {}, *Ctx);

for (ui64 i = 0; i < 1002; ++i) {
SendProposeTransactionRequest({.TxId=txId + i,
.Senders={mockTabletId}, .Receivers={mockTabletId},
.TxOps={
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
}});
}

size_t preparedCount = 0;
size_t overloadedCount = 0;

for (ui64 i = 0; i < 1002; ++i) {
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPersQueue::TEvProposeTransactionResult>();
UNIT_ASSERT(event != nullptr);

UNIT_ASSERT(event->Record.HasStatus());

const auto status = event->Record.GetStatus();
switch (status) {
case NKikimrPQ::TEvProposeTransactionResult::PREPARED:
++preparedCount;
break;
case NKikimrPQ::TEvProposeTransactionResult::OVERLOADED:
++overloadedCount;
break;
default:
UNIT_FAIL("unexpected transaction status " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(status));
}
}

UNIT_ASSERT_EQUAL(preparedCount, 1000);
UNIT_ASSERT_EQUAL(overloadedCount, 2);
}

}

}
Loading