Skip to content

Commit 560706e

Browse files
committed
Force oldest locks into shard locks due to range limits
1 parent 849baa4 commit 560706e

File tree

3 files changed

+157
-40
lines changed

3 files changed

+157
-40
lines changed

ydb/core/client/locks_ut.cpp

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,8 +1810,9 @@ static void LocksLimit() {
18101810

18111811
using TLock = TSysTables::TLocksTable::TLock;
18121812

1813-
ui32 limit = NDataShard::TLockLocker::LockLimit();
1814-
const ui32 factor = 100;
1813+
auto restoreLimit = NDataShard::TLockLocker::OverrideLockLimit(20);
1814+
const ui32 limit = NDataShard::TLockLocker::LockLimit();
1815+
const ui32 factor = 5;
18151816

18161817
const char * query = R"((
18171818
(let row0_ '('('key (Uint32 '%u))))
@@ -1916,9 +1917,8 @@ static void ShardLocks() {
19161917
NKikimrMiniKQL::TResult res;
19171918
TClient::TFlatQueryOptions opts;
19181919

1919-
1920-
ui32 limit = NDataShard::TLockLocker::LockLimit();
1921-
//const ui32 factor = 100;
1920+
auto restoreRangeLimit = NDataShard::TLockLocker::OverrideTotalRangesLimit(10);
1921+
const ui32 limit = NDataShard::TLockLocker::TotalRangesLimit();
19221922

19231923
const char * setLock = R"___((
19241924
(let range_ '('IncFrom 'IncTo '('key (Uint32 '%u) (Uint32 '%u))))
@@ -1932,14 +1932,16 @@ static void ShardLocks() {
19321932
// Attach lots of ranges to a single lock.
19331933
TVector<NMiniKQL::IEngineFlat::TTxLock> locks;
19341934
ui64 lockId = 0;
1935-
for (ui32 i = 0; i < limit + 1; ++i) {
1935+
for (ui32 i = 0; i < limit; ++i) {
1936+
Cout << "... reading range " << i << Endl;
19361937
cs.Client.FlatQuery(Sprintf(setLock, i * 10, i * 10 + 5, lockId), res);
19371938
ExtractResultLocks<TLocksVer>(res, locks);
19381939
lockId = locks.back().LockId;
19391940
}
19401941

1941-
// We now have too many rnages attached to locks and new lock
1942-
// will be forced to be shard lock.
1942+
// We now have too many ranges attached to locks and the oldest lock
1943+
// will be forced to be a shard lock.
1944+
Cout << "... reading additional range with a new lock" << Endl;
19431945
cs.Client.FlatQuery(Sprintf(setLock, 0, 5, 0), res);
19441946
ExtractResultLocks<TLocksVer>(res, locks);
19451947

@@ -1953,6 +1955,7 @@ static void ShardLocks() {
19531955
))
19541956
))___";
19551957
{
1958+
Cout << "... checking the last lock (must be set)" << Endl;
19561959
cs.Client.FlatQuery(Sprintf(checkLock,
19571960
TLocksVer::TableName(),
19581961
TLocksVer::Key(locks.back().LockId,
@@ -1969,9 +1972,11 @@ static void ShardLocks() {
19691972
UNIT_ASSERT_VALUES_EQUAL(lock.Counter, locks.back().Counter);
19701973
}
19711974

1972-
// Break locks by single row update.
1975+
// Upsert key 48, which does not conflict with either lock.
1976+
// However since the first lock is forced to be a shard lock it will break.
1977+
Cout << "... upserting key 48 (will break the first lock despite no conflicts)" << Endl;
19731978
const char * lockUpdate = R"___((
1974-
(let row0_ '('('key (Uint32 '42))))
1979+
(let row0_ '('('key (Uint32 '48))))
19751980
(let update_ '('('value (Uint32 '0))))
19761981
(let ret_ (AsList
19771982
(UpdateRow '/dc-1/Dir/A row0_ update_)
@@ -1980,8 +1985,9 @@ static void ShardLocks() {
19801985
))___";
19811986
cs.Client.FlatQuery(lockUpdate, opts, res);
19821987

1983-
// Check locks are broken.
1988+
// Check the last lock is not broken.
19841989
{
1990+
Cout << "... checking the last lock (must not be broken)" << Endl;
19851991
cs.Client.FlatQuery(Sprintf(checkLock,
19861992
TLocksVer::TableName(),
19871993
TLocksVer::Key(locks.back().LockId,
@@ -1991,10 +1997,16 @@ static void ShardLocks() {
19911997
TLocksVer::Columns()), res);
19921998
TValue result = TValue::Create(res.GetValue(), res.GetType());
19931999
TValue xres = result["Result"];
1994-
UNIT_ASSERT(!xres.HaveValue());
2000+
UNIT_ASSERT(xres.HaveValue());
2001+
auto lock = ExtractRowLock<TLocksVer>(xres);
2002+
UNIT_ASSERT_VALUES_EQUAL(lock.LockId, locks.back().LockId);
2003+
UNIT_ASSERT_VALUES_EQUAL(lock.Generation, locks.back().Generation);
2004+
UNIT_ASSERT_VALUES_EQUAL(lock.Counter, locks.back().Counter);
19952005
}
19962006

2007+
// Check the first lock is broken.
19972008
{
2009+
Cout << "... checking the first lock (must be broken)" << Endl;
19982010
cs.Client.FlatQuery(Sprintf(checkLock,
19992011
TLocksVer::TableName(),
20002012
TLocksVer::Key(locks[0].LockId,

ydb/core/tx/locks/locks.cpp

Lines changed: 111 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -402,9 +402,69 @@ void TTableLocks::RemoveWriteLock(TLockInfo* lock) {
402402

403403
// TLockLocker
404404

405+
namespace {
406+
407+
static constexpr ui64 DefaultLockLimit() {
408+
// Valgrind and sanitizers are too slow
409+
// Some tests cannot exhaust default limit in under 5 minutes
410+
return NValgrind::PlainOrUnderValgrind(
411+
NSan::PlainOrUnderSanitizer(
412+
20000,
413+
1000),
414+
1000);
415+
}
416+
417+
static constexpr ui64 DefaultLockRangesLimit() {
418+
return 50000;
419+
}
420+
421+
static constexpr ui64 DefaultTotalRangesLimit() {
422+
return 1000000;
423+
}
424+
425+
static std::atomic<ui64> g_LockLimit{ DefaultLockLimit() };
426+
static std::atomic<ui64> g_LockRangesLimit{ DefaultLockRangesLimit() };
427+
static std::atomic<ui64> g_TotalRangesLimit{ DefaultTotalRangesLimit() };
428+
429+
} // namespace
430+
431+
ui64 TLockLocker::LockLimit() {
432+
return g_LockLimit.load(std::memory_order_relaxed);
433+
}
434+
435+
ui64 TLockLocker::LockRangesLimit() {
436+
return g_LockRangesLimit.load(std::memory_order_relaxed);
437+
}
438+
439+
ui64 TLockLocker::TotalRangesLimit() {
440+
return g_TotalRangesLimit.load(std::memory_order_relaxed);
441+
}
442+
443+
std::shared_ptr<void> TLockLocker::OverrideLockLimit(ui64 newLimit) {
444+
ui64 oldLimit = g_LockLimit.exchange(newLimit, std::memory_order_relaxed);
445+
return std::shared_ptr<void>(nullptr, [oldLimit](void*) {
446+
g_LockLimit.store(oldLimit, std::memory_order_relaxed);
447+
});
448+
}
449+
450+
std::shared_ptr<void> TLockLocker::OverrideLockRangesLimit(ui64 newLimit) {
451+
ui64 oldLimit = g_LockRangesLimit.exchange(newLimit, std::memory_order_relaxed);
452+
return std::shared_ptr<void>(nullptr, [oldLimit](void*) {
453+
g_LockRangesLimit.store(oldLimit, std::memory_order_relaxed);
454+
});
455+
}
456+
457+
std::shared_ptr<void> TLockLocker::OverrideTotalRangesLimit(ui64 newLimit) {
458+
ui64 oldLimit = g_TotalRangesLimit.exchange(newLimit, std::memory_order_relaxed);
459+
return std::shared_ptr<void>(nullptr, [oldLimit](void*) {
460+
g_TotalRangesLimit.store(oldLimit, std::memory_order_relaxed);
461+
});
462+
}
463+
405464
void TLockLocker::AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key) {
406465
if (lock->AddPoint(key)) {
407466
key.Table->AddPointLock(key, lock.Get());
467+
LocksWithRanges.PushBack(lock.Get());
408468
} else {
409469
key.Table->AddShardLock(lock.Get());
410470
}
@@ -413,21 +473,27 @@ void TLockLocker::AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key
413473
void TLockLocker::AddRangeLock(const TLockInfo::TPtr& lock, const TRangeKey& key) {
414474
if (lock->AddRange(key)) {
415475
key.Table->AddRangeLock(key, lock.Get());
476+
LocksWithRanges.PushBack(lock.Get());
416477
} else {
417478
key.Table->AddShardLock(lock.Get());
418479
}
419480
}
420481

421-
void TLockLocker::AddShardLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) {
482+
void TLockLocker::MakeShardLock(TLockInfo* lock) {
422483
if (!lock->IsShardLock()) {
423484
for (const TPathId& tableId : lock->GetReadTables()) {
424-
Tables.at(tableId)->RemoveRangeLock(lock.Get());
485+
Tables.at(tableId)->RemoveRangeLock(lock);
425486
}
426487
lock->MakeShardLock();
488+
LocksWithRanges.Remove(lock);
427489
for (const TPathId& tableId : lock->GetReadTables()) {
428-
Tables.at(tableId)->AddShardLock(lock.Get());
490+
Tables.at(tableId)->AddShardLock(lock);
429491
}
430492
}
493+
}
494+
495+
void TLockLocker::AddShardLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) {
496+
MakeShardLock(lock.Get());
431497
for (auto& table : readTables) {
432498
const TPathId& tableId = table.GetTableId();
433499
Y_ABORT_UNLESS(Tables.at(tableId).Get() == &table);
@@ -519,6 +585,9 @@ void TLockLocker::RemoveBrokenRanges() {
519585
TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId, ui32 lockNodeId) {
520586
auto it = Locks.find(lockId);
521587
if (it != Locks.end()) {
588+
if (it->second->IsInList<TLockInfoRangesListTag>()) {
589+
LocksWithRanges.PushBack(it->second.Get());
590+
}
522591
if (it->second->IsInList<TLockInfoExpireListTag>()) {
523592
ExpireQueue.PushBack(it->second.Get());
524593
}
@@ -591,6 +660,7 @@ void TLockLocker::RemoveOneLock(ui64 lockTxId, ILocksDb* db) {
591660
for (const TPathId& tableId : txLock->GetWriteTables()) {
592661
Tables.at(tableId)->RemoveWriteLock(txLock.Get());
593662
}
663+
LocksWithRanges.Remove(txLock.Get());
594664
txLock->CleanupConflicts();
595665
Locks.erase(it);
596666

@@ -634,6 +704,7 @@ void TLockLocker::RemoveSchema(const TPathId& tableId, ILocksDb* db) {
634704
Y_ABORT_UNLESS(Tables.empty());
635705
Locks.clear();
636706
ShardLocks.clear();
707+
LocksWithRanges.Clear();
637708
ExpireQueue.Clear();
638709
BrokenLocks.Clear();
639710
BrokenPersistentLocks.Clear();
@@ -643,21 +714,41 @@ void TLockLocker::RemoveSchema(const TPathId& tableId, ILocksDb* db) {
643714
PendingSubscribeLocks.clear();
644715
}
645716

646-
bool TLockLocker::ForceShardLock(const TPathId& tableId) const {
647-
auto it = Tables.find(tableId);
648-
if (it != Tables.end()) {
649-
if (it->second->RangeCount() > LockLimit()) {
650-
return true;
651-
}
717+
bool TLockLocker::ForceShardLock(
718+
const TLockInfo::TPtr& lock,
719+
const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables,
720+
ui64 newRanges)
721+
{
722+
if (lock->NumPoints() + lock->NumRanges() + newRanges > LockRangesLimit()) {
723+
// Lock has too many ranges, will never fit in
724+
return true;
652725
}
653-
return false;
654-
}
655726

656-
bool TLockLocker::ForceShardLock(const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) const {
657727
for (auto& table : readTables) {
658-
if (table.RangeCount() > LockLimit())
659-
return true;
728+
while (table.RangeCount() + newRanges > TotalRangesLimit()) {
729+
if (LocksWithRanges.Empty()) {
730+
// Too many new ranges (e.g. TotalRangesLimit < LockRangesLimit)
731+
return true;
732+
}
733+
734+
// Try to reduce the number of ranges until new ranges fit in
735+
TLockInfo* next = LocksWithRanges.PopFront();
736+
if (next == lock.Get()) {
737+
bool wasLast = LocksWithRanges.Empty();
738+
LocksWithRanges.PushBack(next);
739+
if (wasLast) {
740+
return true;
741+
}
742+
// We want to handle the newest lock last
743+
continue;
744+
}
745+
746+
// Reduce the number of ranges by making the oldest lock into a shard lock
747+
MakeShardLock(next);
748+
Self->IncCounter(COUNTER_LOCKS_WHOLE_SHARD);
749+
}
660750
}
751+
661752
return false;
662753
}
663754

@@ -771,8 +862,6 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
771862
return TVector<TLock>();
772863
}
773864

774-
bool shardLock = Locker.ForceShardLock(Update->ReadTables);
775-
776865
TLockInfo::TPtr lock;
777866
ui64 counter = TLock::ErrorNotSet;
778867

@@ -791,6 +880,12 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
791880
} else if (lock->IsBroken()) {
792881
counter = TLock::ErrorBroken;
793882
} else {
883+
bool shardLock = (
884+
lock->IsShardLock() ||
885+
Locker.ForceShardLock(
886+
lock,
887+
Update->ReadTables,
888+
Update->PointLocks.size() + Update->RangeLocks.size()));
794889
if (shardLock) {
795890
Locker.AddShardLock(lock, Update->ReadTables);
796891
Self->IncCounter(COUNTER_LOCKS_WHOLE_SHARD);

ydb/core/tx/locks/locks.h

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ struct TLockInfoWriteConflictListTag {};
252252
struct TLockInfoBrokenListTag {};
253253
struct TLockInfoBrokenPersistentListTag {};
254254
struct TLockInfoExpireListTag {};
255+
struct TLockInfoRangesListTag {};
255256

256257
/// Aggregates shard, point and range locks
257258
class TLockInfo
@@ -263,6 +264,7 @@ class TLockInfo
263264
, public TIntrusiveListItem<TLockInfo, TLockInfoBrokenListTag>
264265
, public TIntrusiveListItem<TLockInfo, TLockInfoBrokenPersistentListTag>
265266
, public TIntrusiveListItem<TLockInfo, TLockInfoExpireListTag>
267+
, public TIntrusiveListItem<TLockInfo, TLockInfoRangesListTag>
266268
{
267269
friend class TTableLocks;
268270
friend class TLockLocker;
@@ -508,16 +510,19 @@ class TLockLocker {
508510
friend class TSysLocks;
509511

510512
public:
511-
/// Prevent unlimited lock's count growth
512-
static constexpr ui64 LockLimit() {
513-
// Valgrind and sanitizers are too slow
514-
// Some tests cannot exhaust default limit in under 5 minutes
515-
return NValgrind::PlainOrUnderValgrind(
516-
NSan::PlainOrUnderSanitizer(
517-
16 * 1024,
518-
1024),
519-
1024);
520-
}
513+
/// Prevent unlimited locks count growth
514+
static ui64 LockLimit();
515+
516+
/// Prevent unlimited range count growth
517+
static ui64 LockRangesLimit();
518+
519+
/// Prevent unlimited number of total ranges
520+
static ui64 TotalRangesLimit();
521+
522+
/// Make it possible for tests to override defaults
523+
static std::shared_ptr<void> OverrideLockLimit(ui64 newLimit);
524+
static std::shared_ptr<void> OverrideLockRangesLimit(ui64 newLimit);
525+
static std::shared_ptr<void> OverrideTotalRangesLimit(ui64 newLimit);
521526

522527
/// We don't expire locks until this time limit after they are created
523528
static constexpr TDuration LockTimeLimit() { return TDuration::Minutes(5); }
@@ -535,6 +540,7 @@ class TLockLocker {
535540

536541
void AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key);
537542
void AddRangeLock(const TLockInfo::TPtr& lock, const TRangeKey& key);
543+
void MakeShardLock(TLockInfo* lock);
538544
void AddShardLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables);
539545
void AddWriteLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksWriteListTag>& writeTables);
540546

@@ -592,8 +598,10 @@ class TLockLocker {
592598

593599
void UpdateSchema(const TPathId& tableId, const TVector<NScheme::TTypeInfo>& keyColumnTypes);
594600
void RemoveSchema(const TPathId& tableId, ILocksDb* db);
595-
bool ForceShardLock(const TPathId& tableId) const;
596-
bool ForceShardLock(const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) const;
601+
bool ForceShardLock(
602+
const TLockInfo::TPtr& lock,
603+
const TIntrusiveList<TTableLocks,
604+
TTableLocksReadListTag>& readTables, ui64 newRanges);
597605

598606
void ScheduleBrokenLock(TLockInfo* lock);
599607
void ScheduleRemoveBrokenRanges(ui64 lockId, const TRowVersion& at);
@@ -633,6 +641,8 @@ class TLockLocker {
633641
THashMap<ui64, TLockInfo::TPtr> Locks; // key is LockId
634642
THashMap<TPathId, TTableLocks::TPtr> Tables;
635643
THashSet<ui64> ShardLocks;
644+
// A list of locks that have ranges (from oldest to newest)
645+
TIntrusiveList<TLockInfo, TLockInfoRangesListTag> LocksWithRanges;
636646
// A list of locks that may be removed when enough time passes
637647
TIntrusiveList<TLockInfo, TLockInfoExpireListTag> ExpireQueue;
638648
// A list of broken, but not yet removed locks

0 commit comments

Comments
 (0)