@@ -322,6 +322,7 @@ class TSubscriptionClientSender : public TActorBootstrapped<TSubscriptionClientS
322322 HFuncTraced (TEvConsole::TEvConfigSubscriptionNotification, Handle);
323323 HFuncTraced (TEvents::TEvPoisonPill, Handle);
324324 HFuncTraced (TEvents::TEvUndelivered, Handle);
325+ HFuncTraced (TEvents::TEvWakeup, Handle);
325326 HFuncTraced (TEvInterconnect::TEvNodeDisconnected, Handle);
326327 IgnoreFunc (TEvInterconnect::TEvNodeConnected);
327328
@@ -350,6 +351,13 @@ class TSubscriptionClientSender : public TActorBootstrapped<TSubscriptionClientS
350351 Die (ctx);
351352 }
352353
354+ void Handle (TEvents::TEvWakeup::TPtr &/* ev*/ , const TActorContext &ctx)
355+ {
356+ LOG_DEBUG_S (ctx, NKikimrServices::CMS_CONFIGS,
357+ " TSubscriptionClientSender(" << Subscription->Subscriber .ToString () << " ) received wake up" );
358+ Send (OwnerId, new TConfigsProvider::TEvPrivate::TEvWorkerCoolDown (Subscription));
359+ }
360+
353361 void Handle (TEvInterconnect::TEvNodeDisconnected::TPtr &/* ev*/ , const TActorContext &ctx)
354362 {
355363 LOG_DEBUG_S (ctx, NKikimrServices::CMS_CONFIGS,
@@ -369,6 +377,8 @@ class TSubscriptionClientSender : public TActorBootstrapped<TSubscriptionClientS
369377 " TSubscriptionClientSender(" << Subscription->Subscriber .ToString () << " ) send TEvConfigSubscriptionNotificationRequest: "
370378 << notification.Get ()->Record .ShortDebugString ());
371379
380+ const float mbytes = notification.Get ()->GetCachedByteSize () / 1'000'000 .f ;
381+ Schedule (TDuration::MilliSeconds (100 ) * mbytes, new TEvents::TEvWakeup ());
372382 Send (Subscription->Subscriber , notification.Release (), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
373383 }
374384
@@ -521,7 +531,33 @@ void TConfigsProvider::CheckSubscriptions(const TInMemorySubscriptionSet &subscr
521531 const TActorContext &ctx)
522532{
523533 for (auto &subscription : subscriptions)
524- CheckSubscription (subscription, ctx);
534+ ScheduledUpdates[subscription->Subscriber ] = EUpdate::All;
535+ ProcessScheduledUpdates (ctx);
536+ }
537+
538+ void TConfigsProvider::ProcessScheduledUpdates (const TActorContext &ctx)
539+ {
540+ while (!ScheduledUpdates.empty () && InflightUpdates.size () < MAX_INFLIGHT_UPDATES) {
541+ auto it = ScheduledUpdates.begin ();
542+ if (auto subscription = InMemoryIndex.GetSubscription (it->first )) {
543+ switch (it->second ) {
544+ case EUpdate::All:
545+ if (CheckSubscription (subscription, ctx)) {
546+ InflightUpdates.insert (subscription->Subscriber );
547+ }
548+ break ;
549+ case EUpdate::Yaml:
550+ if (UpdateConfig (subscription, ctx)) {
551+ InflightUpdates.insert (subscription->Subscriber );
552+ }
553+ break ;
554+ }
555+ }
556+ ScheduledUpdates.erase (it);
557+ }
558+
559+ *Counters.ScheduledConfigUpdates = ScheduledUpdates.size ();
560+ *Counters.InflightConfigUpdates = InflightUpdates.size ();
525561}
526562
527563void TConfigsProvider::CheckSubscription (TSubscription::TPtr subscription,
@@ -582,7 +618,7 @@ void TConfigsProvider::CheckSubscription(TSubscription::TPtr subscription,
582618 subscription->Worker = ctx.RegisterWithSameMailbox (worker);
583619}
584620
585- void TConfigsProvider::CheckSubscription (TInMemorySubscription::TPtr subscription,
621+ bool TConfigsProvider::CheckSubscription (TInMemorySubscription::TPtr subscription,
586622 const TActorContext &ctx)
587623{
588624 LOG_TRACE_S (ctx, NKikimrServices::CMS_CONFIGS,
@@ -664,7 +700,7 @@ void TConfigsProvider::CheckSubscription(TInMemorySubscription::TPtr subscriptio
664700 LOG_TRACE_S (ctx, NKikimrServices::CMS_CONFIGS,
665701 " TConfigsProvider: no changes found for subscription"
666702 << " " << subscription->Subscriber .ToString () << " :" << subscription->Generation );
667- return ;
703+ return false ;
668704 }
669705
670706 LOG_TRACE_S (ctx, NKikimrServices::CMS_CONFIGS,
@@ -692,13 +728,6 @@ void TConfigsProvider::CheckSubscription(TInMemorySubscription::TPtr subscriptio
692728 for (auto &[id, hash] : VolatileYamlConfigHashes) {
693729 auto *volatileConfig = request->Record .AddVolatileConfigs ();
694730 volatileConfig->SetId (id);
695- auto hashes = subscription->VolatileYamlConfigHashes .size ();
696- Y_UNUSED (hashes);
697- auto itt = subscription->VolatileYamlConfigHashes .find (id);
698- if (itt != subscription->VolatileYamlConfigHashes .end ()) {
699- auto tmp = itt->second ;
700- Y_UNUSED (tmp);
701- }
702731 if (auto it = subscription->VolatileYamlConfigHashes .find (id); it != subscription->VolatileYamlConfigHashes .end () && it->second == hash) {
703732 volatileConfig->SetNotChanged (true );
704733 } else {
@@ -709,8 +738,9 @@ void TConfigsProvider::CheckSubscription(TInMemorySubscription::TPtr subscriptio
709738 subscription->VolatileYamlConfigHashes = VolatileYamlConfigHashes;
710739
711740 ctx.Send (subscription->Worker , request.Release ());
712-
713741 subscription->FirstUpdateSent = true ;
742+
743+ return true ;
714744}
715745
716746void TConfigsProvider::DumpStateHTML (IOutputStream &os) const {
@@ -822,7 +852,8 @@ void TConfigsProvider::Handle(TEvConsole::TEvConfigSubscriptionRequest::TPtr &ev
822852
823853 subscription->Worker = RegisterWithSameMailbox (new TSubscriptionClientSender (subscription, SelfId ()));
824854
825- CheckSubscription (subscription, ctx);
855+ ScheduledUpdates[subscription->Subscriber ] = EUpdate::All;
856+ ProcessScheduledUpdates (ctx);
826857}
827858
828859void TConfigsProvider::Handle (TEvConsole::TEvConfigSubscriptionCanceled::TPtr &ev, const TActorContext &ctx)
@@ -850,11 +881,27 @@ void TConfigsProvider::Handle(TEvPrivate::TEvWorkerDisconnected::TPtr &ev, const
850881 auto existing = InMemoryIndex.GetSubscription (subscription->Subscriber );
851882 if (existing == subscription) {
852883 InMemoryIndex.RemoveSubscription (subscription->Subscriber );
884+ ScheduledUpdates.erase (subscription->Subscriber );
885+ InflightUpdates.erase (subscription->Subscriber );
886+
853887 Send (subscription->Subscriber , new TEvConsole::TEvConfigSubscriptionCanceled (subscription->Generation ));
854888
855889 LOG_DEBUG_S (ctx, NKikimrServices::CMS_CONFIGS, " TConfigsProvider removed subscription "
856890 << subscription->Subscriber << " :" << subscription->Generation << " (subscription worker died)" );
857891 }
892+
893+ ProcessScheduledUpdates (ctx);
894+ }
895+
896+ void TConfigsProvider::Handle (TEvPrivate::TEvWorkerCoolDown::TPtr &ev, const TActorContext &ctx)
897+ {
898+ auto subscription = ev->Get ()->Subscription ;
899+ auto existing = InMemoryIndex.GetSubscription (subscription->Subscriber );
900+ if (existing == subscription) {
901+ InflightUpdates.erase (subscription->Subscriber );
902+ }
903+
904+ ProcessScheduledUpdates (ctx);
858905}
859906
860907void TConfigsProvider::Handle (TEvConsole::TEvCheckConfigUpdatesRequest::TPtr &ev, const TActorContext &ctx)
@@ -1237,34 +1284,45 @@ void TConfigsProvider::Handle(TEvPrivate::TEvUpdateYamlConfig::TPtr &ev, const T
12371284 }
12381285
12391286 for (auto &[_, subscription] : InMemoryIndex.GetSubscriptions ()) {
1240- if (subscription->ServeYaml ) {
1241- auto request = MakeHolder<TEvConsole::TEvConfigSubscriptionNotification>(
1242- subscription->Generation ,
1243- NKikimrConfig::TAppConfig{},
1244- THashSet<ui32>{});
1245-
1246- if (subscription->YamlConfigVersion != YamlConfigVersion) {
1247- subscription->YamlConfigVersion = YamlConfigVersion;
1248- request->Record .SetYamlConfig (YamlConfig);
1249- } else {
1250- request->Record .SetYamlConfigNotChanged (true );
1251- }
1287+ ScheduledUpdates.emplace (subscription->Subscriber , EUpdate::Yaml);
1288+ }
12521289
1253- for (auto &[id, hash] : VolatileYamlConfigHashes) {
1254- auto *volatileConfig = request->Record .AddVolatileConfigs ();
1255- volatileConfig->SetId (id);
1256- if (auto it = subscription->VolatileYamlConfigHashes .find (id); it != subscription->VolatileYamlConfigHashes .end () && it->second == hash) {
1257- volatileConfig->SetNotChanged (true );
1258- } else {
1259- volatileConfig->SetConfig (VolatileYamlConfigs[id]);
1260- }
1261- }
1290+ ProcessScheduledUpdates (ctx);
1291+ }
12621292
1263- subscription->VolatileYamlConfigHashes = VolatileYamlConfigHashes;
1293+ bool TConfigsProvider::UpdateConfig (TInMemorySubscription::TPtr subscription,
1294+ const TActorContext &ctx)
1295+ {
1296+ if (subscription->ServeYaml ) {
1297+ auto request = MakeHolder<TEvConsole::TEvConfigSubscriptionNotification>(
1298+ subscription->Generation ,
1299+ NKikimrConfig::TAppConfig{},
1300+ THashSet<ui32>{});
1301+
1302+ if (subscription->YamlConfigVersion != YamlConfigVersion) {
1303+ subscription->YamlConfigVersion = YamlConfigVersion;
1304+ request->Record .SetYamlConfig (YamlConfig);
1305+ } else {
1306+ request->Record .SetYamlConfigNotChanged (true );
1307+ }
12641308
1265- ctx.Send (subscription->Worker , request.Release ());
1309+ for (auto &[id, hash] : VolatileYamlConfigHashes) {
1310+ auto *volatileConfig = request->Record .AddVolatileConfigs ();
1311+ volatileConfig->SetId (id);
1312+ if (auto it = subscription->VolatileYamlConfigHashes .find (id); it != subscription->VolatileYamlConfigHashes .end () && it->second == hash) {
1313+ volatileConfig->SetNotChanged (true );
1314+ } else {
1315+ volatileConfig->SetConfig (VolatileYamlConfigs[id]);
1316+ }
12661317 }
1318+
1319+ subscription->VolatileYamlConfigHashes = VolatileYamlConfigHashes;
1320+
1321+ ctx.Send (subscription->Worker , request.Release ());
1322+ return true ;
12671323 }
1324+
1325+ return false ;
12681326}
12691327
12701328} // namespace NKikimr::NConsole
0 commit comments