Skip to content

Commit ec2cd8c

Browse files
committed
Force oldest locks into shard locks due to range limits
1 parent 849baa4 commit ec2cd8c

File tree

3 files changed

+136
-39
lines changed

3 files changed

+136
-39
lines changed

ydb/core/client/locks_ut.cpp

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,8 +1810,9 @@ static void LocksLimit() {
18101810

18111811
using TLock = TSysTables::TLocksTable::TLock;
18121812

1813-
ui32 limit = NDataShard::TLockLocker::LockLimit();
1814-
const ui32 factor = 100;
1813+
auto restoreLimit = NDataShard::TLockLocker::OverrideLockLimit(20);
1814+
const ui32 limit = NDataShard::TLockLocker::LockLimit();
1815+
const ui32 factor = 5;
18151816

18161817
const char * query = R"((
18171818
(let row0_ '('('key (Uint32 '%u))))
@@ -1916,9 +1917,8 @@ static void ShardLocks() {
19161917
NKikimrMiniKQL::TResult res;
19171918
TClient::TFlatQueryOptions opts;
19181919

1919-
1920-
ui32 limit = NDataShard::TLockLocker::LockLimit();
1921-
//const ui32 factor = 100;
1920+
auto restoreRangeLimit = NDataShard::TLockLocker::OverrideLockRangesLimit(10);
1921+
const ui32 limit = NDataShard::TLockLocker::LockRangesLimit();
19221922

19231923
const char * setLock = R"___((
19241924
(let range_ '('IncFrom 'IncTo '('key (Uint32 '%u) (Uint32 '%u))))
@@ -1932,14 +1932,16 @@ static void ShardLocks() {
19321932
// Attach lots of ranges to a single lock.
19331933
TVector<NMiniKQL::IEngineFlat::TTxLock> locks;
19341934
ui64 lockId = 0;
1935-
for (ui32 i = 0; i < limit + 1; ++i) {
1935+
for (ui32 i = 0; i < limit; ++i) {
1936+
Cout << "... reading range " << i << Endl;
19361937
cs.Client.FlatQuery(Sprintf(setLock, i * 10, i * 10 + 5, lockId), res);
19371938
ExtractResultLocks<TLocksVer>(res, locks);
19381939
lockId = locks.back().LockId;
19391940
}
19401941

1941-
// We now have too many rnages attached to locks and new lock
1942+
// We now have too many ranges attached to locks and the oldest lock
19421943
// will be forced to be shard lock.
1944+
Cout << "... reading additional range with a new lock" << Endl;
19431945
cs.Client.FlatQuery(Sprintf(setLock, 0, 5, 0), res);
19441946
ExtractResultLocks<TLocksVer>(res, locks);
19451947

@@ -1953,6 +1955,7 @@ static void ShardLocks() {
19531955
))
19541956
))___";
19551957
{
1958+
Cout << "... checking the last lock (must be set)" << Endl;
19561959
cs.Client.FlatQuery(Sprintf(checkLock,
19571960
TLocksVer::TableName(),
19581961
TLocksVer::Key(locks.back().LockId,
@@ -1969,9 +1972,11 @@ static void ShardLocks() {
19691972
UNIT_ASSERT_VALUES_EQUAL(lock.Counter, locks.back().Counter);
19701973
}
19711974

1972-
// Break locks by single row update.
1975+
// Upsert key 48, which does not conflict with either lock.
1976+
// However since the first lock is forced to be a shard lock it will break.
1977+
Cout << "... upserting key 48 (will break the first lock despite no conflicts)" << Endl;
19731978
const char * lockUpdate = R"___((
1974-
(let row0_ '('('key (Uint32 '42))))
1979+
(let row0_ '('('key (Uint32 '48))))
19751980
(let update_ '('('value (Uint32 '0))))
19761981
(let ret_ (AsList
19771982
(UpdateRow '/dc-1/Dir/A row0_ update_)
@@ -1980,8 +1985,9 @@ static void ShardLocks() {
19801985
))___";
19811986
cs.Client.FlatQuery(lockUpdate, opts, res);
19821987

1983-
// Check locks are broken.
1988+
// Check the last lock is not broken.
19841989
{
1990+
Cout << "... checking the last lock (must not be broken)" << Endl;
19851991
cs.Client.FlatQuery(Sprintf(checkLock,
19861992
TLocksVer::TableName(),
19871993
TLocksVer::Key(locks.back().LockId,
@@ -1991,10 +1997,16 @@ static void ShardLocks() {
19911997
TLocksVer::Columns()), res);
19921998
TValue result = TValue::Create(res.GetValue(), res.GetType());
19931999
TValue xres = result["Result"];
1994-
UNIT_ASSERT(!xres.HaveValue());
2000+
UNIT_ASSERT(xres.HaveValue());
2001+
auto lock = ExtractRowLock<TLocksVer>(xres);
2002+
UNIT_ASSERT_VALUES_EQUAL(lock.LockId, locks.back().LockId);
2003+
UNIT_ASSERT_VALUES_EQUAL(lock.Generation, locks.back().Generation);
2004+
UNIT_ASSERT_VALUES_EQUAL(lock.Counter, locks.back().Counter);
19952005
}
19962006

2007+
// Check the first lock is broken.
19972008
{
2009+
Cout << "... checking the first lock (must be broken)" << Endl;
19982010
cs.Client.FlatQuery(Sprintf(checkLock,
19992011
TLocksVer::TableName(),
20002012
TLocksVer::Key(locks[0].LockId,

ydb/core/tx/locks/locks.cpp

Lines changed: 95 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -402,9 +402,53 @@ void TTableLocks::RemoveWriteLock(TLockInfo* lock) {
402402

403403
// TLockLocker
404404

405+
namespace {
406+
407+
static constexpr ui64 DefaultLockLimit() {
408+
// Valgrind and sanitizers are too slow
409+
// Some tests cannot exhaust default limit in under 5 minutes
410+
return NValgrind::PlainOrUnderValgrind(
411+
NSan::PlainOrUnderSanitizer(
412+
20000,
413+
1000),
414+
1000);
415+
}
416+
417+
static constexpr ui64 DefaultLockRangesLimit() {
418+
return DefaultLockLimit() * 50;
419+
}
420+
421+
static std::atomic<ui64> g_LockLimit{ DefaultLockLimit() };
422+
static std::atomic<ui64> g_LockRangesLimit{ DefaultLockRangesLimit() };
423+
424+
} // namespace
425+
426+
ui64 TLockLocker::LockLimit() {
427+
return g_LockLimit.load(std::memory_order_relaxed);
428+
}
429+
430+
ui64 TLockLocker::LockRangesLimit() {
431+
return g_LockRangesLimit.load(std::memory_order_relaxed);
432+
}
433+
434+
std::shared_ptr<void> TLockLocker::OverrideLockLimit(ui64 newLimit) {
435+
ui64 oldLimit = g_LockLimit.exchange(newLimit, std::memory_order_relaxed);
436+
return std::shared_ptr<void>(nullptr, [oldLimit](void*) {
437+
g_LockLimit.store(oldLimit, std::memory_order_relaxed);
438+
});
439+
}
440+
441+
std::shared_ptr<void> TLockLocker::OverrideLockRangesLimit(ui64 newLimit) {
442+
ui64 oldLimit = g_LockRangesLimit.exchange(newLimit, std::memory_order_relaxed);
443+
return std::shared_ptr<void>(nullptr, [oldLimit](void*) {
444+
g_LockRangesLimit.store(oldLimit, std::memory_order_relaxed);
445+
});
446+
}
447+
405448
void TLockLocker::AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key) {
406449
if (lock->AddPoint(key)) {
407450
key.Table->AddPointLock(key, lock.Get());
451+
LocksWithRanges.PushBack(lock.Get());
408452
} else {
409453
key.Table->AddShardLock(lock.Get());
410454
}
@@ -413,21 +457,27 @@ void TLockLocker::AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key
413457
void TLockLocker::AddRangeLock(const TLockInfo::TPtr& lock, const TRangeKey& key) {
414458
if (lock->AddRange(key)) {
415459
key.Table->AddRangeLock(key, lock.Get());
460+
LocksWithRanges.PushBack(lock.Get());
416461
} else {
417462
key.Table->AddShardLock(lock.Get());
418463
}
419464
}
420465

421-
void TLockLocker::AddShardLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) {
466+
void TLockLocker::MakeShardLock(TLockInfo* lock) {
422467
if (!lock->IsShardLock()) {
423468
for (const TPathId& tableId : lock->GetReadTables()) {
424-
Tables.at(tableId)->RemoveRangeLock(lock.Get());
469+
Tables.at(tableId)->RemoveRangeLock(lock);
425470
}
426471
lock->MakeShardLock();
472+
LocksWithRanges.Remove(lock);
427473
for (const TPathId& tableId : lock->GetReadTables()) {
428-
Tables.at(tableId)->AddShardLock(lock.Get());
474+
Tables.at(tableId)->AddShardLock(lock);
429475
}
430476
}
477+
}
478+
479+
void TLockLocker::AddShardLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) {
480+
MakeShardLock(lock.Get());
431481
for (auto& table : readTables) {
432482
const TPathId& tableId = table.GetTableId();
433483
Y_ABORT_UNLESS(Tables.at(tableId).Get() == &table);
@@ -519,6 +569,9 @@ void TLockLocker::RemoveBrokenRanges() {
519569
TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId, ui32 lockNodeId) {
520570
auto it = Locks.find(lockId);
521571
if (it != Locks.end()) {
572+
if (it->second->IsInList<TLockInfoRangesListTag>()) {
573+
LocksWithRanges.PushBack(it->second.Get());
574+
}
522575
if (it->second->IsInList<TLockInfoExpireListTag>()) {
523576
ExpireQueue.PushBack(it->second.Get());
524577
}
@@ -591,6 +644,7 @@ void TLockLocker::RemoveOneLock(ui64 lockTxId, ILocksDb* db) {
591644
for (const TPathId& tableId : txLock->GetWriteTables()) {
592645
Tables.at(tableId)->RemoveWriteLock(txLock.Get());
593646
}
647+
LocksWithRanges.Remove(txLock.Get());
594648
txLock->CleanupConflicts();
595649
Locks.erase(it);
596650

@@ -634,6 +688,7 @@ void TLockLocker::RemoveSchema(const TPathId& tableId, ILocksDb* db) {
634688
Y_ABORT_UNLESS(Tables.empty());
635689
Locks.clear();
636690
ShardLocks.clear();
691+
LocksWithRanges.Clear();
637692
ExpireQueue.Clear();
638693
BrokenLocks.Clear();
639694
BrokenPersistentLocks.Clear();
@@ -643,21 +698,41 @@ void TLockLocker::RemoveSchema(const TPathId& tableId, ILocksDb* db) {
643698
PendingSubscribeLocks.clear();
644699
}
645700

646-
bool TLockLocker::ForceShardLock(const TPathId& tableId) const {
647-
auto it = Tables.find(tableId);
648-
if (it != Tables.end()) {
649-
if (it->second->RangeCount() > LockLimit()) {
650-
return true;
651-
}
701+
bool TLockLocker::ForceShardLock(
702+
const TLockInfo::TPtr& lock,
703+
const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables,
704+
ui64 newRanges)
705+
{
706+
if (lock->NumPoints() + lock->NumRanges() + newRanges > LockRangesLimit()) {
707+
// Lock has too many ranges, will never fit in
708+
return true;
652709
}
653-
return false;
654-
}
655710

656-
bool TLockLocker::ForceShardLock(const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) const {
657711
for (auto& table : readTables) {
658-
if (table.RangeCount() > LockLimit())
659-
return true;
712+
while (table.RangeCount() + newRanges > LockRangesLimit()) {
713+
if (LocksWithRanges.Empty()) {
714+
// Too many new ranges (should never happen)
715+
return true;
716+
}
717+
718+
// Try to reduce the number of ranges until new ranges fit in
719+
TLockInfo* next = LocksWithRanges.PopFront();
720+
if (next == lock.Get()) {
721+
bool wasLast = LocksWithRanges.Empty();
722+
LocksWithRanges.PushBack(next);
723+
if (wasLast) {
724+
return true;
725+
}
726+
// We want to handle the newest lock last
727+
continue;
728+
}
729+
730+
// Reduce the number of ranges by making the oldest lock into a shard lock
731+
MakeShardLock(next);
732+
Self->IncCounter(COUNTER_LOCKS_WHOLE_SHARD);
733+
}
660734
}
735+
661736
return false;
662737
}
663738

@@ -771,8 +846,6 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
771846
return TVector<TLock>();
772847
}
773848

774-
bool shardLock = Locker.ForceShardLock(Update->ReadTables);
775-
776849
TLockInfo::TPtr lock;
777850
ui64 counter = TLock::ErrorNotSet;
778851

@@ -791,6 +864,12 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
791864
} else if (lock->IsBroken()) {
792865
counter = TLock::ErrorBroken;
793866
} else {
867+
bool shardLock = (
868+
lock->IsShardLock() ||
869+
Locker.ForceShardLock(
870+
lock,
871+
Update->ReadTables,
872+
Update->PointLocks.size() + Update->RangeLocks.size()));
794873
if (shardLock) {
795874
Locker.AddShardLock(lock, Update->ReadTables);
796875
Self->IncCounter(COUNTER_LOCKS_WHOLE_SHARD);

ydb/core/tx/locks/locks.h

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ struct TLockInfoWriteConflictListTag {};
252252
struct TLockInfoBrokenListTag {};
253253
struct TLockInfoBrokenPersistentListTag {};
254254
struct TLockInfoExpireListTag {};
255+
struct TLockInfoRangesListTag {};
255256

256257
/// Aggregates shard, point and range locks
257258
class TLockInfo
@@ -263,6 +264,7 @@ class TLockInfo
263264
, public TIntrusiveListItem<TLockInfo, TLockInfoBrokenListTag>
264265
, public TIntrusiveListItem<TLockInfo, TLockInfoBrokenPersistentListTag>
265266
, public TIntrusiveListItem<TLockInfo, TLockInfoExpireListTag>
267+
, public TIntrusiveListItem<TLockInfo, TLockInfoRangesListTag>
266268
{
267269
friend class TTableLocks;
268270
friend class TLockLocker;
@@ -508,16 +510,15 @@ class TLockLocker {
508510
friend class TSysLocks;
509511

510512
public:
511-
/// Prevent unlimited lock's count growth
512-
static constexpr ui64 LockLimit() {
513-
// Valgrind and sanitizers are too slow
514-
// Some tests cannot exhaust default limit in under 5 minutes
515-
return NValgrind::PlainOrUnderValgrind(
516-
NSan::PlainOrUnderSanitizer(
517-
16 * 1024,
518-
1024),
519-
1024);
520-
}
513+
/// Prevent unlimited locks count growth
514+
static ui64 LockLimit();
515+
516+
/// Prevent unlimited range count growth
517+
static ui64 LockRangesLimit();
518+
519+
/// Make it possible for tests to override defaults
520+
static std::shared_ptr<void> OverrideLockLimit(ui64 newLimit);
521+
static std::shared_ptr<void> OverrideLockRangesLimit(ui64 newLimit);
521522

522523
/// We don't expire locks until this time limit after they are created
523524
static constexpr TDuration LockTimeLimit() { return TDuration::Minutes(5); }
@@ -535,6 +536,7 @@ class TLockLocker {
535536

536537
void AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key);
537538
void AddRangeLock(const TLockInfo::TPtr& lock, const TRangeKey& key);
539+
void MakeShardLock(TLockInfo* lock);
538540
void AddShardLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables);
539541
void AddWriteLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksWriteListTag>& writeTables);
540542

@@ -592,8 +594,10 @@ class TLockLocker {
592594

593595
void UpdateSchema(const TPathId& tableId, const TVector<NScheme::TTypeInfo>& keyColumnTypes);
594596
void RemoveSchema(const TPathId& tableId, ILocksDb* db);
595-
bool ForceShardLock(const TPathId& tableId) const;
596-
bool ForceShardLock(const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) const;
597+
bool ForceShardLock(
598+
const TLockInfo::TPtr& lock,
599+
const TIntrusiveList<TTableLocks,
600+
TTableLocksReadListTag>& readTables, ui64 newRanges);
597601

598602
void ScheduleBrokenLock(TLockInfo* lock);
599603
void ScheduleRemoveBrokenRanges(ui64 lockId, const TRowVersion& at);
@@ -633,6 +637,8 @@ class TLockLocker {
633637
THashMap<ui64, TLockInfo::TPtr> Locks; // key is LockId
634638
THashMap<TPathId, TTableLocks::TPtr> Tables;
635639
THashSet<ui64> ShardLocks;
640+
// A list of locks that have ranges (from oldest to newest)
641+
TIntrusiveList<TLockInfo, TLockInfoRangesListTag> LocksWithRanges;
636642
// A list of locks that may be removed when enough time passes
637643
TIntrusiveList<TLockInfo, TLockInfoExpireListTag> ExpireQueue;
638644
// A list of broken, but not yet removed locks

0 commit comments

Comments
 (0)