@@ -41,6 +41,7 @@ static constexpr ui32 CACHE_SIZE = 100_MB;
4141static constexpr ui32 MAX_BYTES = 25_MB;
4242static constexpr ui32 MAX_SOURCE_ID_LENGTH = 2048 ;
4343static constexpr ui32 MAX_HEARTBEAT_SIZE = 2_KB;
44+ static constexpr ui32 MAX_TXS = 1000 ;
4445
4546struct TChangeNotification {
4647 TChangeNotification (const TActorId& actor, const ui64 txId)
@@ -666,6 +667,11 @@ void TPersQueue::HandleTransactionsReadResponse(NKikimrClient::TResponse&& resp,
666667 (resp.ReadRangeResultSize () == 1 ) &&
667668 (resp.HasSetExecutorFastLogPolicyResult ()) &&
668669 (resp.GetSetExecutorFastLogPolicyResult ().GetStatus () == NKikimrProto::OK);
670+ if (!ok) {
671+ PQ_LOG_ERROR_AND_DIE (" Transactions read error: " << resp.ShortDebugString ());
672+ return ;
673+ }
674+
669675 const auto & result = resp.GetReadRangeResult (0 );
670676 auto status = result.GetStatus ();
671677 if (status != NKikimrProto::OVERRUN &&
@@ -2953,8 +2959,10 @@ void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx)
29532959 continue ;
29542960 }
29552961
2956- for (auto & message : tx->GetBindedMsgs (tabletId)) {
2957- PipeClientCache->Send (ctx, tabletId, message.Type , message.Data );
2962+ for (const auto & message : tx->GetBindedMsgs (tabletId)) {
2963+ auto event = std::make_unique<TEvTxProcessing::TEvReadSet>();
2964+ event->Record = message;
2965+ PipeClientCache->Send (ctx, tabletId, event.release ());
29582966 }
29592967 }
29602968}
@@ -3267,10 +3275,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
32673275 return ;
32683276 }
32693277
3270- //
3271- // TODO(abcdef): сохранить пока инициализируемся. TEvPersQueue::TEvHasDataInfo::TPtr как образец. не только конфиг. Inited==true
3272- //
3273-
32743278 if (txBody.OperationsSize () <= 0 ) {
32753279 PQ_LOG_D (" TxId " << event.GetTxId () << " empty list of operations" );
32763280 SendProposeTransactionAbort (ActorIdFromProto (event.GetSourceActor ()),
@@ -3339,7 +3343,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33393343 return ;
33403344 }
33413345
3342-
33433346 if (txBody.GetImmediate ()) {
33443347 PQ_LOG_D (" immediate transaction" );
33453348 TPartitionId originalPartitionId (txBody.GetOperations (0 ).GetPartitionId ());
@@ -3353,6 +3356,15 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
33533356
33543357 ctx.Send (partition.Actor , ev.Release ());
33553358 } else {
3359+ if ((EvProposeTransactionQueue.size () + Txs.size ()) >= MAX_TXS) {
3360+ SendProposeTransactionOverloaded (ActorIdFromProto (event.GetSourceActor ()),
3361+ event.GetTxId (),
3362+ NKikimrPQ::TError::ERROR,
3363+ " too many transactions" ,
3364+ ctx);
3365+ return ;
3366+ }
3367+
33563368 PQ_LOG_D (" distributed transaction" );
33573369 EvProposeTransactionQueue.emplace_back (ev.Release ());
33583370
@@ -4128,7 +4140,7 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx,
41284140
41294141void TPersQueue::SendToPipe (ui64 tabletId,
41304142 TDistributedTransaction& tx,
4131- std::unique_ptr<IEventBase > event,
4143+ std::unique_ptr<TEvTxProcessing::TEvReadSet > event,
41324144 const TActorContext& ctx)
41334145{
41344146 Y_ABORT_UNLESS (event);
@@ -4645,16 +4657,17 @@ bool TPersQueue::AllTransactionsHaveBeenProcessed() const
46454657 return EvProposeTransactionQueue.empty () && Txs.empty ();
46464658}
46474659
4648- void TPersQueue::SendProposeTransactionAbort (const TActorId& target,
4649- ui64 txId,
4650- NKikimrPQ::TError::EKind kind,
4651- const TString& reason,
4652- const TActorContext& ctx)
4660+ void TPersQueue::SendProposeTransactionResult (const TActorId& target,
4661+ ui64 txId,
4662+ NKikimrPQ::TEvProposeTransactionResult::EStatus status,
4663+ NKikimrPQ::TError::EKind kind,
4664+ const TString& reason,
4665+ const TActorContext& ctx)
46534666{
46544667 auto event = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>();
46554668
46564669 event->Record .SetOrigin (TabletID ());
4657- event->Record .SetStatus (NKikimrPQ::TEvProposeTransactionResult::ABORTED );
4670+ event->Record .SetStatus (status );
46584671 event->Record .SetTxId (txId);
46594672
46604673 if (kind != NKikimrPQ::TError::OK) {
@@ -4670,6 +4683,34 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
46704683 ctx.Send (target, std::move (event));
46714684}
46724685
4686+ void TPersQueue::SendProposeTransactionAbort (const TActorId& target,
4687+ ui64 txId,
4688+ NKikimrPQ::TError::EKind kind,
4689+ const TString& reason,
4690+ const TActorContext& ctx)
4691+ {
4692+ SendProposeTransactionResult (target,
4693+ txId,
4694+ NKikimrPQ::TEvProposeTransactionResult::ABORTED,
4695+ kind,
4696+ reason,
4697+ ctx);
4698+ }
4699+
4700+ void TPersQueue::SendProposeTransactionOverloaded (const TActorId& target,
4701+ ui64 txId,
4702+ NKikimrPQ::TError::EKind kind,
4703+ const TString& reason,
4704+ const TActorContext& ctx)
4705+ {
4706+ SendProposeTransactionResult (target,
4707+ txId,
4708+ NKikimrPQ::TEvProposeTransactionResult::OVERLOADED,
4709+ kind,
4710+ reason,
4711+ ctx);
4712+ }
4713+
46734714void TPersQueue::SendEvProposePartitionConfig (const TActorContext& ctx,
46744715 TDistributedTransaction& tx)
46754716{
0 commit comments