Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/mind/dynamic_nameserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ void TDynamicNameserver::UpdateState(const NKikimrNodeBroker::TNodesInfo &rec,
ctx.Schedule(config->Epoch.End - ctx.Now(),
new TEvPrivate::TEvUpdateEpoch(domain, config->Epoch.Id + 1));
} else {
// Note: this update may be optimized to only include new nodes
for (auto &node : rec.GetNodes()) {
auto nodeId = node.GetNodeId();
if (!config->DynamicNodes.contains(nodeId))
Expand Down
57 changes: 54 additions & 3 deletions ydb/core/mind/node_broker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,45 @@ void TNodeBroker::AddDelayedListNodesRequest(ui64 epoch,

void TNodeBroker::ProcessListNodesRequest(TEvNodeBroker::TEvListNodes::TPtr &ev)
{
ui64 version = ev->Get()->Record.GetCachedVersion();
auto *msg = ev->Get();

NKikimrNodeBroker::TNodesInfo info;
Epoch.Serialize(*info.MutableEpoch());
info.SetDomain(AppData()->DomainsInfo->GetDomain()->DomainUid);
TAutoPtr<TEvNodeBroker::TEvNodesInfo> resp = new TEvNodeBroker::TEvNodesInfo(info);
if (version != Epoch.Version)

bool optimized = false;

if (msg->Record.HasCachedVersion()) {
if (msg->Record.GetCachedVersion() == Epoch.Version) {
// Client has an up-to-date list already
optimized = true;
} else {
// We may be able to only send added nodes in the same epoch when
// all deltas are cached up to the current epoch inclusive.
ui64 neededFirstVersion = msg->Record.GetCachedVersion() + 1;
if (!EpochDeltasVersions.empty() &&
EpochDeltasVersions.front() <= neededFirstVersion &&
EpochDeltasVersions.back() == Epoch.Version &&
neededFirstVersion <= Epoch.Version)
{
ui64 firstIndex = neededFirstVersion - EpochDeltasVersions.front();
if (firstIndex > 0) {
// Note: usually there is a small number of nodes added
// between subsequent requests, so this substr should be
// very cheap.
resp->PreSerializedData = EpochDeltasCache.substr(EpochDeltasEndOffsets[firstIndex - 1]);
} else {
resp->PreSerializedData = EpochDeltasCache;
}
optimized = true;
}
}
}

if (!optimized) {
resp->PreSerializedData = EpochCache;
}

TabletCounters->Percentile()[COUNTER_LIST_NODES_BYTES].IncrementFor(resp->GetCachedByteSize());
LOG_TRACE_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
Expand All @@ -308,12 +339,16 @@ void TNodeBroker::ProcessListNodesRequest(TEvNodeBroker::TEvListNodes::TPtr &ev)

void TNodeBroker::ProcessDelayedListNodesRequests()
{
THashSet<TActorId> processed;
while (!DelayedListNodesRequests.empty()) {
auto it = DelayedListNodesRequests.begin();
if (it->first > Epoch.Id)
break;

ProcessListNodesRequest(it->second);
// Avoid processing more than one request from the same sender
if (processed.insert(it->second->Sender).second) {
ProcessListNodesRequest(it->second);
}
DelayedListNodesRequests.erase(it);
}
}
Expand Down Expand Up @@ -432,6 +467,11 @@ void TNodeBroker::PrepareEpochCache()

Y_PROTOBUF_SUPPRESS_NODISCARD info.SerializeToString(&EpochCache);
TabletCounters->Simple()[COUNTER_EPOCH_SIZE_BYTES].Set(EpochCache.Size());

EpochDeltasCache.clear();
EpochDeltasVersions.clear();
EpochDeltasEndOffsets.clear();
TabletCounters->Simple()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set(EpochDeltasCache.size());
}

void TNodeBroker::AddNodeToEpochCache(const TNodeInfo &node)
Expand All @@ -447,6 +487,17 @@ void TNodeBroker::AddNodeToEpochCache(const TNodeInfo &node)

EpochCache += delta;
TabletCounters->Simple()[COUNTER_EPOCH_SIZE_BYTES].Set(EpochCache.Size());

if (!EpochDeltasVersions.empty() && EpochDeltasVersions.back() + 1 != Epoch.Version) {
EpochDeltasCache.clear();
EpochDeltasVersions.clear();
EpochDeltasEndOffsets.clear();
}

EpochDeltasCache += delta;
EpochDeltasVersions.push_back(Epoch.Version);
EpochDeltasEndOffsets.push_back(EpochDeltasCache.size());
TabletCounters->Simple()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set(EpochDeltasCache.size());
}

void TNodeBroker::SubscribeForConfigUpdates(const TActorContext &ctx)
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/mind/node_broker_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ class TNodeBroker : public TActor<TNodeBroker>
TSchedulerCookieHolder EpochTimerCookieHolder;
TString EpochCache;

TString EpochDeltasCache;
TVector<ui64> EpochDeltasVersions;
TVector<ui64> EpochDeltasEndOffsets;

TTabletCountersBase* TabletCounters;
TAutoPtr<TTabletCountersBase> TabletCountersPtr;

Expand Down
42 changes: 42 additions & 0 deletions ydb/core/mind/node_broker_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,48 @@ Y_UNIT_TEST_SUITE(TNodeBrokerTest) {
UNIT_ASSERT_VALUES_EQUAL(epoch1.GetId(), epoch.GetId() + 5);
}

Y_UNIT_TEST(TestListNodesEpochDeltas)
{
TTestBasicRuntime runtime(8, false);
Setup(runtime, 10);
TActorId sender = runtime.AllocateEdgeActor();

WaitForEpochUpdate(runtime, sender);
WaitForEpochUpdate(runtime, sender);

auto epoch0 = GetEpoch(runtime, sender);
CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
1, 2, 3, 4, TStatus::OK, NODE1, epoch0.GetNextEnd());
auto epoch1 = CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch0.GetVersion());
CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.5",
1, 2, 3, 5, TStatus::OK, NODE2, epoch1.GetNextEnd());
auto epoch2 = CheckFilteredNodesList(runtime, sender, {NODE2}, {}, 0, epoch1.GetVersion());
CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.6",
1, 2, 3, 6, TStatus::OK, NODE3, epoch2.GetNextEnd());
auto epoch3 = CheckFilteredNodesList(runtime, sender, {NODE3}, {}, 0, epoch2.GetVersion());

CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch0.GetVersion());
CheckFilteredNodesList(runtime, sender, {NODE2, NODE3}, {}, 0, epoch1.GetVersion());
CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch3.GetVersion());

RebootTablet(runtime, MakeNodeBrokerID(), sender);
CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch3.GetVersion());

CheckRegistration(runtime, sender, "host4", 1001, "host4.yandex.net", "1.2.3.7",
1, 2, 3, 7, TStatus::OK, NODE4, epoch3.GetNextEnd());
auto epoch4 = CheckFilteredNodesList(runtime, sender, {NODE4}, {}, 0, epoch3.GetVersion());

// NodeBroker doesn't have enough history in memory and replies with the full node list
CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3, NODE4}, {}, 0, epoch2.GetVersion());

WaitForEpochUpdate(runtime, sender);
auto epoch5 = GetEpoch(runtime, sender);
CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch5.GetVersion());

// New epoch may remove nodes, so deltas are not returned on epoch change
CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3, NODE4}, {}, 0, epoch3.GetVersion());
}

Y_UNIT_TEST(TestRandomActions)
{
TTestBasicRuntime runtime(8, false);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/counters_node_broker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ option (NKikimr.TabletTypeName) = "NodeBroker"; // Used as prefix for all counte

enum ESimpleCounters {
COUNTER_EPOCH_SIZE_BYTES = 0 [(CounterOpts) = {Name: "EpochSizeBytes"}];
COUNTER_EPOCH_DELTAS_SIZE_BYTES = 1 [(CounterOpts) = {Name: "EpochDeltasSizeBytes"}];
}

enum ECumulativeCounters {
Expand Down