Skip to content

Commit 4575239

Browse files
authored
Merge 9baa3bb into 80dbaee
2 parents 80dbaee + 9baa3bb commit 4575239

File tree

2 files changed

+161
-5
lines changed

2 files changed

+161
-5
lines changed

ydb/core/tx/datashard/datashard_ut_volatile.cpp

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3457,6 +3457,161 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
34573457
"{ items { uint32_value: 20 } items { uint32_value: 20 } }");
34583458
}
34593459

3460+
Y_UNIT_TEST(GracefulShardRestartNoEarlyReadSetAck) {
3461+
TPortManager pm;
3462+
TServerSettings serverSettings(pm.GetPort(2134));
3463+
serverSettings.SetDomainName("Root")
3464+
.SetUseRealThreads(false)
3465+
.SetEnableDataShardVolatileTransactions(true);
3466+
3467+
Tests::TServer::TPtr server = new TServer(serverSettings);
3468+
auto &runtime = *server->GetRuntime();
3469+
auto sender = runtime.AllocateEdgeActor();
3470+
3471+
runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);
3472+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
3473+
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);
3474+
3475+
InitRoot(server, sender);
3476+
3477+
TDisableDataShardLogBatching disableDataShardLogBatching;
3478+
3479+
struct TBootInfo {
3480+
ui32 Generation;
3481+
TActorId Launcher;
3482+
};
3483+
THashMap<ui64, TBootInfo> bootInfo;
3484+
auto observeBootInfo = runtime.AddObserver<TEvTablet::TEvBoot>([&](auto& ev) {
3485+
auto* msg = ev->Get();
3486+
auto& info = bootInfo[msg->TabletID];
3487+
info.Generation = msg->Generation;
3488+
info.Launcher = msg->Launcher;
3489+
});
3490+
3491+
Cerr << "========= Creating table =========" << Endl;
3492+
UNIT_ASSERT_VALUES_EQUAL(
3493+
KqpSchemeExec(runtime, R"(
3494+
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
3495+
WITH (PARTITION_AT_KEYS = (10, 20));
3496+
)"),
3497+
"SUCCESS");
3498+
3499+
const auto shards = GetTableShards(server, sender, "/Root/table");
3500+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 3u);
3501+
// const auto tableId = ResolveTableId(server, sender, "/Root/table");
3502+
3503+
// We need to fill table with some data
3504+
Cerr << "========= Upserting initial values =========" << Endl;
3505+
UNIT_ASSERT_VALUES_EQUAL(
3506+
KqpSimpleExec(runtime, R"(
3507+
UPSERT INTO `/Root/table` (key, value) VALUES (1, 1), (11, 11), (21, 21);
3508+
)"),
3509+
"<empty>");
3510+
3511+
Cerr << "========= Starting a transaction =========" << Endl;
3512+
TString sessionId, txId;
3513+
UNIT_ASSERT_VALUES_EQUAL(
3514+
KqpSimpleBegin(runtime, sessionId, txId, R"(
3515+
SELECT key, value FROM `/Root/table` ORDER BY key;
3516+
)"),
3517+
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
3518+
"{ items { uint32_value: 11 } items { uint32_value: 11 } }, "
3519+
"{ items { uint32_value: 21 } items { uint32_value: 21 } }");
3520+
3521+
Cerr << "========= Upserting a row to shard 2 to break the lock =========" << Endl;
3522+
UNIT_ASSERT_VALUES_EQUAL(
3523+
KqpSimpleExec(runtime, R"(
3524+
UPSERT INTO `/Root/table` (key, value) VALUES (11, 111);
3525+
)"),
3526+
"<empty>");
3527+
3528+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3529+
3530+
// Block channel 0 commits at shard 3
3531+
TBlockEvents<TEvBlobStorage::TEvPut> blockedCommits(runtime,
3532+
[shard3 = shards.at(2)](const auto& ev) {
3533+
auto* msg = ev->Get();
3534+
if (msg->Id.TabletID() == shard3 && msg->Id.Channel() == 0) {
3535+
return true;
3536+
}
3537+
return false;
3538+
});
3539+
3540+
// Block readsets at shard 3
3541+
TBlockEvents<TEvTxProcessing::TEvReadSet> blockedReadSets(runtime,
3542+
[shard3actor = ResolveTablet(runtime, shards.at(2))](const auto& ev) {
3543+
return ev->GetRecipientRewrite() == shard3actor;
3544+
});
3545+
3546+
// Force shard 1 to be the arbiter
3547+
TForceVolatileProposeArbiter forceArbiter(runtime, shards.at(0));
3548+
3549+
Cerr << "========= Starting to commit =========" << Endl;
3550+
auto commitFuture = KqpSimpleSendCommit(runtime, sessionId, txId, R"(
3551+
UPSERT INTO `/Root/table` (key, value) VALUES (2, 2), (12, 12), (22, 22);
3552+
)");
3553+
3554+
// After a short while shard 3 must have 2 blocked commits and 2 blocked readsets (expectation + abort):
3555+
// - Sys update on the new PlanStep
3556+
// - Execute and persist volatile tx
3557+
// - Processed abort tx persistence
3558+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3559+
UNIT_ASSERT_VALUES_EQUAL(blockedCommits.size(), 2u);
3560+
UNIT_ASSERT_VALUES_EQUAL(blockedReadSets.size(), 2u);
3561+
3562+
// Unblock readsets and wait for one more commit attempt (abort persistence)
3563+
blockedReadSets.Stop().Unblock();
3564+
runtime.WaitFor("1 more commit", [&]{ return blockedCommits.size() >= 3; });
3565+
3566+
// Block channel 0 commit responses at shard 3
3567+
TBlockEvents<TEvBlobStorage::TEvPutResult> blockedCommitResults(runtime,
3568+
[shard3 = shards.at(2)](const auto& ev) {
3569+
auto* msg = ev->Get();
3570+
if (msg->Id.TabletID() == shard3 && msg->Id.Channel() == 0) {
3571+
return true;
3572+
}
3573+
return false;
3574+
});
3575+
3576+
// Unblock the first two commits and wait for their blocked responses
3577+
blockedCommits.Unblock(2);
3578+
runtime.WaitFor("2 commit results", [&]{ return blockedCommitResults.size() >= 2; });
3579+
blockedCommits.Stop();
3580+
blockedCommitResults.Stop();
3581+
3582+
Cerr << "========= Starting new shard3 generation =========" << Endl;
3583+
UNIT_ASSERT(bootInfo.contains(shards.at(2)));
3584+
auto shard3sys = ResolveTablet(runtime, shards.at(2), 0, true);
3585+
auto shard3info = bootInfo.at(shards.at(2));
3586+
runtime.Send(
3587+
new IEventHandle(shard3info.Launcher, shard3sys,
3588+
new TEvTablet::TEvTabletDead(shards.at(2), TEvTablet::TEvTabletDead::ReasonPill, shard3info.Generation)),
3589+
0, true);
3590+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
3591+
InvalidateTabletResolverCache(runtime, shards.at(2), 0);
3592+
3593+
Cerr << "========= Unblocking old shard3 generation =========" << Endl;
3594+
blockedCommitResults.Unblock();
3595+
blockedCommits.Unblock();
3596+
3597+
UNIT_ASSERT_VALUES_EQUAL(
3598+
FormatResult(runtime.WaitFuture(std::move(commitFuture))),
3599+
"ERROR: ABORTED");
3600+
3601+
Cerr << "========= Final read (must not hang) =========" << Endl;
3602+
auto readFuture = KqpSimpleSend(runtime, R"(
3603+
SELECT key, value FROM `/Root/table` ORDER BY key;
3604+
)");
3605+
runtime.SimulateSleep(TDuration::MilliSeconds(100));
3606+
UNIT_ASSERT_C(readFuture.HasValue(), "Read didn't finish in 100ms of simulated time");
3607+
3608+
UNIT_ASSERT_VALUES_EQUAL(
3609+
FormatResult(readFuture.ExtractValueSync()),
3610+
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
3611+
"{ items { uint32_value: 11 } items { uint32_value: 111 } }, "
3612+
"{ items { uint32_value: 21 } items { uint32_value: 21 } }");
3613+
}
3614+
34603615
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
34613616

34623617
} // namespace NKikimr

ydb/core/tx/datashard/volatile_tx.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -803,10 +803,9 @@ namespace NKikimr::NDataShard {
803803
return true;
804804

805805
case EVolatileTxState::Aborting:
806-
// Aborting state will not change as long as we're still leader
807-
return true;
808-
// Ack readset normally as long as we're still a leader
809-
return true;
806+
// We need to wait until volatile tx abort is committed to send rs acks
807+
info->DelayedAcks.push_back(std::move(ack));
808+
return false;
810809
}
811810

812811
ui64 srcTabletId = record.GetTabletSource();
@@ -861,8 +860,10 @@ namespace NKikimr::NDataShard {
861860
}();
862861

863862
if (!committed) {
863+
// We need to wait until volatile tx abort is committed to send rs acks
864+
info->DelayedAcks.push_back(std::move(ack));
864865
AbortWaitingTransaction(info);
865-
return true;
866+
return false;
866867
}
867868

868869
NIceDb::TNiceDb db(txc.DB);

0 commit comments

Comments
 (0)