温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Spring Cloud Eureka的服务与列表获取的方法是什么

发布时间:2021-11-16 16:38:55 来源:亿速云 阅读:213 作者:iii 栏目:大数据

本篇内容主要讲解“Spring Cloud Eureka的服务与列表获取的方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Spring Cloud Eureka的服务与列表获取的方法是什么”吧!

关于服务与实例列表获取

EurekaClient端

我们从Ribbon说起:EurekaClient也存在缓存,应用服务实例列表信息在每个EurekaClient服务消费端都有缓存。一般的,Ribbon的LoadBalancer会读取这个缓存,来知道当前有哪些实例可以调用,从而进行负载均衡。这个loadbalancer同样也有缓存。

首先看这个LoadBalancer的缓存更新机制,相关类是PollingServerListUpdater:

final Runnable wrapperRunnable = new Runnable() {     @Override     public void run() {         if (!isActive.get()) {             if (scheduledFuture != null) {                 scheduledFuture.cancel(true);             }             return;         }         try {             //从EurekaClient缓存中获取服务实例列表,保存在本地缓存             updateAction.doUpdate();             lastUpdated = System.currentTimeMillis();         } catch (Exception e) {             logger.warn("Failed one update cycle", e);         }     } }; //定时调度 scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(         wrapperRunnable,         initialDelayMs,         refreshIntervalMs,         TimeUnit.MILLISECONDS );

这个updateAction.doUpdate();就是从EurekaClient缓存中获取服务实例列表,保存在BaseLoadBalancer的本地缓存:

protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>()); public void setServersList(List lsrv) {     //写入allServerList的代码,这里略 } @Override public List<Server> getAllServers() {     return Collections.unmodifiableList(allServerList); }

这里的getAllServers会在每个负载均衡规则中被调用,例如RoundRobinRule:

public Server choose(ILoadBalancer lb, Object key) {     if (lb == null) {         log.warn("no load balancer");         return null;     }     Server server = null;     int count = 0;     while (server == null && count++ < 10) {         List<Server> reachableServers = lb.getReachableServers();         //获取服务实例列表,调用的就是刚刚提到的getAllServers         List<Server> allServers = lb.getAllServers();         int upCount = reachableServers.size();         int serverCount = allServers.size();         if ((upCount == 0) || (serverCount == 0)) {             log.warn("No up servers available from load balancer: " + lb);             return null;         }         int nextServerIndex = incrementAndGetModulo(serverCount);         server = allServers.get(nextServerIndex);         if (server == null) {             /* Transient. */             Thread.yield();             continue;         }         if (server.isAlive() && (server.isReadyToServe())) {             return (server);         }         // Next.         server = null;     }     if (count >= 10) {         log.warn("No available alive servers after 10 tries from load balancer: "                 + lb);     }     return server; }

这个缓存需要注意下,有时候我们只修改了EurekaClient缓存的更新时间,但是没有修改这个LoadBalancer的刷新本地缓存时间,就是ribbon.ServerListRefreshInterval,这个参数可以设置的很小,因为没有从网络读取,就是从一个本地缓存刷到另一个本地缓存(如何配置缓存配置来实现服务实例快速下线快速感知快速刷新,可以参考我的另一篇文章)。

然后我们来看一下EurekaClient本身的缓存,直接看关键类DiscoveryClient的相关源码,我们这里只关心本地Region的,多Region配置我们先忽略:

//本地缓存,可以理解为是一个软链接 private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>(); private void initScheduledTasks() {     //如果配置为需要拉取服务列表,则设置定时拉取任务,这个配置默认是需要拉取服务列表     if (clientConfig.shouldFetchRegistry()) {         // registry cache refresh timer         int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();         int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();         scheduler.schedule(                 new TimedSupervisorTask(                         "cacheRefresh",                         scheduler,                         cacheRefreshExecutor,                         registryFetchIntervalSeconds,                         TimeUnit.SECONDS,                         expBackOffBound,                         new CacheRefreshThread()                 ),                 registryFetchIntervalSeconds, TimeUnit.SECONDS);     }     //其他定时任务初始化的代码,忽略 } //定时从EurekaServer拉取服务列表的任务 class CacheRefreshThread implements Runnable {         public void run() {             refreshRegistry();         } } void refreshRegistry() {     try {         //多Region配置处理代码,忽略         boolean success = fetchRegistry(remoteRegionsModified);         if (success) {             registrySize = localRegionApps.get().size();             lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();         }         //日志代码,忽略     } catch (Throwable e) {         logger.error("Cannot fetch registry from server", e);     }         } //定时从EurekaServer拉取服务列表的核心方法 private boolean fetchRegistry(boolean forceFullRegistryFetch) {     Stopwatch tracer = FETCH_REGISTRY_TIMER.start();     try {         Applications applications = getApplications();         //判断,如果是第一次拉取,或者app列表为空,就进行全量拉取,否则就会进行增量拉取         if (clientConfig.shouldDisableDelta()                 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))                 || forceFullRegistryFetch                 || (applications == null)                 || (applications.getRegisteredApplications().size() == 0)                 || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta         {             getAndStoreFullRegistry();         } else {             getAndUpdateDelta(applications);         }         applications.setAppsHashCode(applications.getReconcileHashCode());         logTotalInstances();     } catch (Throwable e) {         logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);         return false;     } finally {         if (tracer != null) {             tracer.stop();         }     }     //缓存更新完成,发送个event给观察者,目前没啥用      onCacheRefreshed();     // 检查下远端的服务实例列表里面包括自己,并且状态是否对,这里我们不关心     updateInstanceRemoteStatus();     // registry was fetched successfully, so return true     return true; } //全量拉取代码 private void getAndStoreFullRegistry() throws Throwable {     long currentUpdateGeneration = fetchRegistryGeneration.get();     Applications apps = null;     //访问/eureka/apps接口,拉取所有服务实例信息     EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null             ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())             : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());     if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {         apps = httpResponse.getEntity();     }     logger.info("The response status is {}", httpResponse.getStatusCode());     if (apps == null) {         logger.error("The application is null for some reason. Not storing this information");     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {         localRegionApps.set(this.filterAndShuffle(apps));         logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());     } else {         logger.warn("Not updating applications as another thread is updating it already");     } } //增量拉取代码 private void getAndUpdateDelta(Applications applications) throws Throwable {     long currentUpdateGeneration = fetchRegistryGeneration.get();     Applications delta = null;     //访问/eureka/delta接口,拉取所有服务实例增量信息     EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());     if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {         delta = httpResponse.getEntity();     }     if (delta == null) {         //如果delta为空,拉取增量失败,就全量拉取         logger.warn("The server does not allow the delta revision to be applied because it is not safe. "                 + "Hence got the full registry.");         getAndStoreFullRegistry();     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {         //这里设置原子锁的原因是怕某次调度网络请求时间过长,导致同一时间有多线程拉取到增量信息并发修改         //拉取增量成功,检查hashcode是否一样,不一样的话也会全量拉取         logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());         String reconcileHashCode = "";         if (fetchRegistryUpdateLock.tryLock()) {             try {                 updateDelta(delta);                 reconcileHashCode = getReconcileHashCode(applications);             } finally {                 fetchRegistryUpdateLock.unlock();             }         } else {             logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");         }         // There is a diff in number of instances for some reason         if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {             reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall         }     } else {         logger.warn("Not updating application delta as another thread is updating it already");         logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());     } }

以上就是对于EurekaClient拉取服务实例信息的源代码分析,总结EurekaClient 重要缓存如下:

  1. EurekaClient第一次全量拉取,定时增量拉取应用服务实例信息,保存在缓存中。

  2. EurekaClient增量拉取失败,或者增量拉取之后对比hashcode发现不一致,就会执行全量拉取,这样避免了网络某时段分片带来的问题。

  3. 同时对于服务调用,如果涉及到ribbon负载均衡,那么ribbon对于这个实例列表也有自己的缓存,这个缓存定时从EurekaClient的缓存更新

EurekaServer端

在EurekaServer端,所有的读取请求都是读的ReadOnlyMap(这个可以配置) 有定时任务会定时从ReadWriteMap同步到ReadOnlyMap这个时间配置是:

#eureka server刷新readCacheMap的时间,注意,client读取的是readCacheMap,这个时间决定了多久会把readWriteCacheMap的缓存更新到readCacheMap上 #默认30s eureka.server.responseCacheUpdateInvervalMs=3000

相关代码:

if (shouldUseReadOnlyResponseCache) {             timer.schedule(getCacheUpdateTask(),                     new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)                             + responseCacheUpdateIntervalMs),                     responseCacheUpdateIntervalMs);         } private TimerTask getCacheUpdateTask() {     return new TimerTask() {         @Override         public void run() {             logger.debug("Updating the client cache from response cache");             for (Key key : readOnlyCacheMap.keySet()) {                 if (logger.isDebugEnabled()) {                     Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};                     logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);                 }                 try {                     CurrentRequestVersion.set(key.getVersion());                     Value cacheValue = readWriteCacheMap.get(key);                     Value currentCacheValue = readOnlyCacheMap.get(key);                     if (cacheValue != currentCacheValue) {                         readOnlyCacheMap.put(key, cacheValue);                     }                 } catch (Throwable th) {                     logger.error("Error while updating the client cache from response cache", th);                 }             }         }     }; }

ReadWriteMap是一个LoadingCache,将Registry中的服务实例信息封装成要返回的http响应(分别是经过gzip压缩和非压缩的),同时还有两个特殊key,ALL_APPS和ALL_APPS_DELTA ALL_APPS就是所有服务实例信息 ALL_APPS_DELTA就是之前讲注册说的RecentlyChangedQueue里面的实例列表封装的http响应信息

到此,相信大家对“Spring Cloud Eureka的服务与列表获取的方法是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI