Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions ydb/core/client/locks_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1810,8 +1810,14 @@ static void LocksLimit() {

using TLock = TSysTables::TLocksTable::TLock;

ui32 limit = NDataShard::TLockLocker::LockLimit();
const ui32 factor = 100;
auto prevLimit = NDataShard::TLockLocker::LockLimit();
NDataShard::TLockLocker::SetLockLimit(20);
Y_DEFER {
NDataShard::TLockLocker::SetLockLimit(prevLimit);
};

const ui32 limit = NDataShard::TLockLocker::LockLimit();
const ui32 factor = 5;

const char * query = R"((
(let row0_ '('('key (Uint32 '%u))))
Expand Down Expand Up @@ -1916,9 +1922,13 @@ static void ShardLocks() {
NKikimrMiniKQL::TResult res;
TClient::TFlatQueryOptions opts;

auto prevLimit = NDataShard::TLockLocker::TotalRangesLimit();
NDataShard::TLockLocker::SetTotalRangesLimit(10);
Y_DEFER {
NDataShard::TLockLocker::SetTotalRangesLimit(prevLimit);
};

ui32 limit = NDataShard::TLockLocker::LockLimit();
//const ui32 factor = 100;
const ui32 limit = NDataShard::TLockLocker::TotalRangesLimit();

const char * setLock = R"___((
(let range_ '('IncFrom 'IncTo '('key (Uint32 '%u) (Uint32 '%u))))
Expand All @@ -1932,14 +1942,16 @@ static void ShardLocks() {
// Attach lots of ranges to a single lock.
TVector<NMiniKQL::IEngineFlat::TTxLock> locks;
ui64 lockId = 0;
for (ui32 i = 0; i < limit + 1; ++i) {
for (ui32 i = 0; i < limit; ++i) {
Cout << "... reading range " << i << Endl;
cs.Client.FlatQuery(Sprintf(setLock, i * 10, i * 10 + 5, lockId), res);
ExtractResultLocks<TLocksVer>(res, locks);
lockId = locks.back().LockId;
}

// We now have too many rnages attached to locks and new lock
// will be forced to be shard lock.
// We now have too many ranges attached to locks and the oldest lock
// will be forced to be a shard lock.
Cout << "... reading additional range with a new lock" << Endl;
cs.Client.FlatQuery(Sprintf(setLock, 0, 5, 0), res);
ExtractResultLocks<TLocksVer>(res, locks);

Expand All @@ -1953,6 +1965,7 @@ static void ShardLocks() {
))
))___";
{
Cout << "... checking the last lock (must be set)" << Endl;
cs.Client.FlatQuery(Sprintf(checkLock,
TLocksVer::TableName(),
TLocksVer::Key(locks.back().LockId,
Expand All @@ -1969,9 +1982,11 @@ static void ShardLocks() {
UNIT_ASSERT_VALUES_EQUAL(lock.Counter, locks.back().Counter);
}

// Break locks by single row update.
// Upsert key 48, which does not conflict with either lock.
// However since the first lock is forced to be a shard lock it will break.
Cout << "... upserting key 48 (will break the first lock despite no conflicts)" << Endl;
const char * lockUpdate = R"___((
(let row0_ '('('key (Uint32 '42))))
(let row0_ '('('key (Uint32 '48))))
(let update_ '('('value (Uint32 '0))))
(let ret_ (AsList
(UpdateRow '/dc-1/Dir/A row0_ update_)
Expand All @@ -1980,8 +1995,9 @@ static void ShardLocks() {
))___";
cs.Client.FlatQuery(lockUpdate, opts, res);

// Check locks are broken.
// Check the last lock is not broken.
{
Cout << "... checking the last lock (must not be broken)" << Endl;
cs.Client.FlatQuery(Sprintf(checkLock,
TLocksVer::TableName(),
TLocksVer::Key(locks.back().LockId,
Expand All @@ -1991,10 +2007,16 @@ static void ShardLocks() {
TLocksVer::Columns()), res);
TValue result = TValue::Create(res.GetValue(), res.GetType());
TValue xres = result["Result"];
UNIT_ASSERT(!xres.HaveValue());
UNIT_ASSERT(xres.HaveValue());
auto lock = ExtractRowLock<TLocksVer>(xres);
UNIT_ASSERT_VALUES_EQUAL(lock.LockId, locks.back().LockId);
UNIT_ASSERT_VALUES_EQUAL(lock.Generation, locks.back().Generation);
UNIT_ASSERT_VALUES_EQUAL(lock.Counter, locks.back().Counter);
}

// Check the first lock is broken.
{
Cout << "... checking the first lock (must be broken)" << Endl;
cs.Client.FlatQuery(Sprintf(checkLock,
TLocksVer::TableName(),
TLocksVer::Key(locks[0].LockId,
Expand Down
118 changes: 102 additions & 16 deletions ydb/core/tx/locks/locks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,60 @@ void TTableLocks::RemoveWriteLock(TLockInfo* lock) {

// TLockLocker

namespace {

static constexpr ui64 DefaultLockLimit() {
// Valgrind and sanitizers are too slow
// Some tests cannot exhaust default limit in under 5 minutes
return NValgrind::PlainOrUnderValgrind(
NSan::PlainOrUnderSanitizer(
20000,
1000),
1000);
}

static constexpr ui64 DefaultLockRangesLimit() {
return 50000;
}

static constexpr ui64 DefaultTotalRangesLimit() {
return 1000000;
}

static std::atomic<ui64> g_LockLimit{ DefaultLockLimit() };
static std::atomic<ui64> g_LockRangesLimit{ DefaultLockRangesLimit() };
static std::atomic<ui64> g_TotalRangesLimit{ DefaultTotalRangesLimit() };

} // namespace

ui64 TLockLocker::LockLimit() {
return g_LockLimit.load(std::memory_order_relaxed);
}

ui64 TLockLocker::LockRangesLimit() {
return g_LockRangesLimit.load(std::memory_order_relaxed);
}

ui64 TLockLocker::TotalRangesLimit() {
return g_TotalRangesLimit.load(std::memory_order_relaxed);
}

void TLockLocker::SetLockLimit(ui64 newLimit) {
g_LockLimit.store(newLimit, std::memory_order_relaxed);
}

void TLockLocker::SetLockRangesLimit(ui64 newLimit) {
g_LockRangesLimit.store(newLimit, std::memory_order_relaxed);
}

void TLockLocker::SetTotalRangesLimit(ui64 newLimit) {
g_TotalRangesLimit.store(newLimit, std::memory_order_relaxed);
}

void TLockLocker::AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key) {
if (lock->AddPoint(key)) {
key.Table->AddPointLock(key, lock.Get());
LocksWithRanges.PushBack(lock.Get());
} else {
key.Table->AddShardLock(lock.Get());
}
Expand All @@ -413,21 +464,27 @@ void TLockLocker::AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key
void TLockLocker::AddRangeLock(const TLockInfo::TPtr& lock, const TRangeKey& key) {
if (lock->AddRange(key)) {
key.Table->AddRangeLock(key, lock.Get());
LocksWithRanges.PushBack(lock.Get());
} else {
key.Table->AddShardLock(lock.Get());
}
}

void TLockLocker::AddShardLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) {
void TLockLocker::MakeShardLock(TLockInfo* lock) {
if (!lock->IsShardLock()) {
for (const TPathId& tableId : lock->GetReadTables()) {
Tables.at(tableId)->RemoveRangeLock(lock.Get());
Tables.at(tableId)->RemoveRangeLock(lock);
}
lock->MakeShardLock();
LocksWithRanges.Remove(lock);
for (const TPathId& tableId : lock->GetReadTables()) {
Tables.at(tableId)->AddShardLock(lock.Get());
Tables.at(tableId)->AddShardLock(lock);
}
}
}

void TLockLocker::AddShardLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) {
MakeShardLock(lock.Get());
for (auto& table : readTables) {
const TPathId& tableId = table.GetTableId();
Y_ABORT_UNLESS(Tables.at(tableId).Get() == &table);
Expand Down Expand Up @@ -519,6 +576,9 @@ void TLockLocker::RemoveBrokenRanges() {
TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId, ui32 lockNodeId) {
auto it = Locks.find(lockId);
if (it != Locks.end()) {
if (it->second->IsInList<TLockInfoRangesListTag>()) {
LocksWithRanges.PushBack(it->second.Get());
}
if (it->second->IsInList<TLockInfoExpireListTag>()) {
ExpireQueue.PushBack(it->second.Get());
}
Expand Down Expand Up @@ -591,6 +651,7 @@ void TLockLocker::RemoveOneLock(ui64 lockTxId, ILocksDb* db) {
for (const TPathId& tableId : txLock->GetWriteTables()) {
Tables.at(tableId)->RemoveWriteLock(txLock.Get());
}
LocksWithRanges.Remove(txLock.Get());
txLock->CleanupConflicts();
Locks.erase(it);

Expand Down Expand Up @@ -634,6 +695,7 @@ void TLockLocker::RemoveSchema(const TPathId& tableId, ILocksDb* db) {
Y_ABORT_UNLESS(Tables.empty());
Locks.clear();
ShardLocks.clear();
LocksWithRanges.Clear();
ExpireQueue.Clear();
BrokenLocks.Clear();
BrokenPersistentLocks.Clear();
Expand All @@ -643,21 +705,41 @@ void TLockLocker::RemoveSchema(const TPathId& tableId, ILocksDb* db) {
PendingSubscribeLocks.clear();
}

bool TLockLocker::ForceShardLock(const TPathId& tableId) const {
auto it = Tables.find(tableId);
if (it != Tables.end()) {
if (it->second->RangeCount() > LockLimit()) {
return true;
}
bool TLockLocker::ForceShardLock(
const TLockInfo::TPtr& lock,
const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables,
ui64 newRanges)
{
if (lock->NumPoints() + lock->NumRanges() + newRanges > LockRangesLimit()) {
// Lock has too many ranges, will never fit in
return true;
}
return false;
}

bool TLockLocker::ForceShardLock(const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) const {
for (auto& table : readTables) {
if (table.RangeCount() > LockLimit())
return true;
while (table.RangeCount() + newRanges > TotalRangesLimit()) {
if (LocksWithRanges.Empty()) {
// Too many new ranges (e.g. TotalRangesLimit < LockRangesLimit)
return true;
}

// Try to reduce the number of ranges until new ranges fit in
TLockInfo* next = LocksWithRanges.PopFront();
if (next == lock.Get()) {
bool wasLast = LocksWithRanges.Empty();
LocksWithRanges.PushBack(next);
if (wasLast) {
return true;
}
// We want to handle the newest lock last
continue;
}

// Reduce the number of ranges by making the oldest lock into a shard lock
MakeShardLock(next);
Self->IncCounter(COUNTER_LOCKS_WHOLE_SHARD);
}
}

return false;
}

Expand Down Expand Up @@ -771,8 +853,6 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
return TVector<TLock>();
}

bool shardLock = Locker.ForceShardLock(Update->ReadTables);

TLockInfo::TPtr lock;
ui64 counter = TLock::ErrorNotSet;

Expand All @@ -791,6 +871,12 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
} else if (lock->IsBroken()) {
counter = TLock::ErrorBroken;
} else {
bool shardLock = (
lock->IsShardLock() ||
Locker.ForceShardLock(
lock,
Update->ReadTables,
Update->PointLocks.size() + Update->RangeLocks.size()));
if (shardLock) {
Locker.AddShardLock(lock, Update->ReadTables);
Self->IncCounter(COUNTER_LOCKS_WHOLE_SHARD);
Expand Down
34 changes: 22 additions & 12 deletions ydb/core/tx/locks/locks.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ struct TLockInfoWriteConflictListTag {};
struct TLockInfoBrokenListTag {};
struct TLockInfoBrokenPersistentListTag {};
struct TLockInfoExpireListTag {};
struct TLockInfoRangesListTag {};

/// Aggregates shard, point and range locks
class TLockInfo
Expand All @@ -263,6 +264,7 @@ class TLockInfo
, public TIntrusiveListItem<TLockInfo, TLockInfoBrokenListTag>
, public TIntrusiveListItem<TLockInfo, TLockInfoBrokenPersistentListTag>
, public TIntrusiveListItem<TLockInfo, TLockInfoExpireListTag>
, public TIntrusiveListItem<TLockInfo, TLockInfoRangesListTag>
{
friend class TTableLocks;
friend class TLockLocker;
Expand Down Expand Up @@ -508,16 +510,19 @@ class TLockLocker {
friend class TSysLocks;

public:
/// Prevent unlimited lock's count growth
static constexpr ui64 LockLimit() {
// Valgrind and sanitizers are too slow
// Some tests cannot exhaust default limit in under 5 minutes
return NValgrind::PlainOrUnderValgrind(
NSan::PlainOrUnderSanitizer(
16 * 1024,
1024),
1024);
}
/// Prevent unlimited locks count growth
static ui64 LockLimit();

/// Prevent unlimited range count growth
static ui64 LockRangesLimit();

/// Prevent unlimited number of total ranges
static ui64 TotalRangesLimit();

/// Make it possible to override defaults (e.g. for tests)
static void SetLockLimit(ui64 newLimit);
static void SetLockRangesLimit(ui64 newLimit);
static void SetTotalRangesLimit(ui64 newLimit);

/// We don't expire locks until this time limit after they are created
static constexpr TDuration LockTimeLimit() { return TDuration::Minutes(5); }
Expand All @@ -535,6 +540,7 @@ class TLockLocker {

void AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key);
void AddRangeLock(const TLockInfo::TPtr& lock, const TRangeKey& key);
void MakeShardLock(TLockInfo* lock);
void AddShardLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables);
void AddWriteLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksWriteListTag>& writeTables);

Expand Down Expand Up @@ -592,8 +598,10 @@ class TLockLocker {

void UpdateSchema(const TPathId& tableId, const TVector<NScheme::TTypeInfo>& keyColumnTypes);
void RemoveSchema(const TPathId& tableId, ILocksDb* db);
bool ForceShardLock(const TPathId& tableId) const;
bool ForceShardLock(const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) const;
bool ForceShardLock(
const TLockInfo::TPtr& lock,
const TIntrusiveList<TTableLocks,
TTableLocksReadListTag>& readTables, ui64 newRanges);

void ScheduleBrokenLock(TLockInfo* lock);
void ScheduleRemoveBrokenRanges(ui64 lockId, const TRowVersion& at);
Expand Down Expand Up @@ -633,6 +641,8 @@ class TLockLocker {
THashMap<ui64, TLockInfo::TPtr> Locks; // key is LockId
THashMap<TPathId, TTableLocks::TPtr> Tables;
THashSet<ui64> ShardLocks;
// A list of locks that have ranges (from oldest to newest)
TIntrusiveList<TLockInfo, TLockInfoRangesListTag> LocksWithRanges;
// A list of locks that may be removed when enough time passes
TIntrusiveList<TLockInfo, TLockInfoExpireListTag> ExpireQueue;
// A list of broken, but not yet removed locks
Expand Down
Loading