源码解析:无损运维
SOFARegistry 是一个基于内存存储的分布式注册中心,数据是分散存储在各个节点,为了做到注册中心自身运维期间依然能够对外正常提供服务,需要进行节点下线快速感知和数据迁移。
session 存储 client 发送的发布和订阅数据,由于 client 断连重放的特性,session 单机下线后 client 会重放数据到其他节点。
data 会接收 session 发送的发布数据,data 单机下线后,meta 会通过 SlotTable 的变更把对应 Slot 的所有权移交其他 data,data 启动后会进行数据同步到达数据完整可对外提供服务的状态。
session、data下线的过程,如何避免下线期间数据丢失和抖动
SessionServer 和 DataServer 下线的相关代码都位于各自 bootstrap 类的 dostop 方法中。
首先讨论 SessionServer 的下线过程:
private void doStop() { try { LOGGER.info("{} Shutting down Session Server..", new Date().toString()); stopHttpServer(); clientNodeConnectionHandler.stop(); // stop process disconnect event stopServer(); // stop http server and client bolt server before add blacklist // make sure client reconnect to other sessions and data gracefulShutdown(); stopDataSyncServer(); stopConsoleServer(); executorManager.stopScheduler(); } catch (Throwable e) { LOGGER.error("Shutting down Session Server error!", e); } LOGGER.info("{} Session server is now shutdown...", new Date().toString()); } 从代码中可以看出,SessionServer 服务在下线时,首先会停掉 HttpServer,HttpServer 提供一系列 REST 接口,用于 dashboard 管理、数据查询等;然后停掉 clientNodeConnectionHandler,clientNodeConnectionHandler 是 Exchange 设置的一系列 ChannelHandler 之一,Exchange 作为 Client / Server 连接的抽象,负责节点之间的连接。gracefulShutdown 方法则是通知 Meta(元数据服务器)将本节点加入 Meta 的黑名单中。HttpServer 和客户端的 bolt Server 是在本节点加入黑名单前关停,以保证 client 已经重连到了其他 session 节点上。这样就保证了运维的 session 节点下线期间,client 的数据不会因为 session 节点的不可用,导致数据丢失和抖动。接下来又依次关停了 DataSyncServer,ConsoleServer。
DataServer 下线过程的代码如下:
private void doStop() { try { LOGGER.info("{} Shutting down Data Server..", new Date().toString()); gracefulShutdown(); stopHttpServer(); stopServer(); stopDataSyncServer(); stopNotifyServer(); } catch (Throwable e) { LOGGER.error("Shutting down Data Server error!", e); } LOGGER.info("{} Data server is now shutdown...", new Date().toString()); } DataServer 的下线则和 Session 的下线有些区别,由于 DataServer 数据服务器,负责存储具体的服务数据,而且 Slot 均匀地分配给每个节点上,所以下线前需要检测 DataServer 上的插槽状态,所以 doStop 的方法中,首先调用了如下的优雅下线的方法,其中代码中的主要内容如下:
addBlacklistRetryer.call( () -> { LOGGER.info("[GracefulShutdown] add self to blacklist"); metaServerService.addSelfToMetaBlacklist(); return true; }); addBlacklistRetryer.call( () -> { if (fetchStopPushService.isStopPushSwitch()) { return true; } SlotTableStatusResponse statusResponse = metaServerService.getSlotTableStatus(); if (statusResponse.isProtectionMode()) { return true; } LOGGER.info("[GracefulShutdown] wait no slot"); if (slotManager.hasSlot()) { throw new RuntimeException("current data server still own slot, waiting..."); } return true; }); LOGGER.info("add data self to blacklist successfully"); 首先节点通知 Meta 加入黑名单,stopmetaServerService.getSlotTableStatus() 获取节点上 SlotTable 的状态,当 Slot 重新分配给其他节点后,该 Data 节点才会成功加入黑名单,并进行接下来 HttpServer、DataSyncServer、NotifyServer 的下线动作。
整体流程是 Data 上的 Slot 逐步迁移,期间仍然对外提供服务。
迁移完成后主动从列表中剔除并设置节点黑名单并下线。
以下图为例,假设要下线的节点是 DataServer-1:将 Data-server 添加到注册中心迁移名单中;再将 Slot-2 对应的 follower(DataServer-2)提升为 Leader;等待 Data 节点达到稳定;继续将 Slot-3 对应的 follower(DataServer-3) 提升为 Leader;按照以上步骤分配掉剩余的 follower Slots;Slot 迁移完毕,Data-server 从 Data 列表中剔除,同时添加为黑名单;从迁移名单中设置为 succeed。
Session 和 Data下线中的优雅关闭和数据迁移保证了下线期间数据丢失和抖动,期间仍然对外提供服务。
Meta 如何处理 Session、Data 主动下线的请求通知
Meta 集群服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层能够感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。RegistryForbiddenServerHandler 注册在sessionServerHandlers、dataServerHandlers :
@Bean(name = "sessionServerHandlers") public Collection<AbstractServerHandler> sessionServerHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(heartbeatRequestHandler()); list.add(fetchProvideDataRequestHandler()); list.add(fetchSystemPropertyRequestHandler()); list.add(registryForbiddenServerHandler()); list.add(getSlotTableStatusRequestHandler()); return list; } @Bean(name = "dataServerHandlers") public Collection<AbstractServerHandler> dataServerHandlers() { Collection<AbstractServerHandler> list = new ArrayList<>(); list.add(heartbeatRequestHandler()); list.add(fetchProvideDataRequestHandler()); list.add(registryForbiddenServerHandler()); list.add(fetchSystemPropertyRequestHandler()); list.add(getSlotTableStatusRequestHandler()); return list; } 当有 Data 和 Session 上下线时,会触发 registryForbiddenServerHandler,具体的 doHandle 方法将下线的节点添加到黑名单中:
boolean success = false; switch (operation) { case ADD: success = registryForbiddenServerManager.addToBlacklist(ip); break; case REMOVE: success = registryForbiddenServerManager.removeFromBlacklist(ip); break; default: break; } Meta 的心跳检测 HeartbeatRequestHandler 会在心跳检测时,获取 Session 的状态,
@Override public VersionedList<SessionNode> getSessionServerMetaInfo() { VersionedList<Lease<SessionNode>> leaseMetaInfo = getLeaseMeta(); List<SessionNode> sessionNodes = Lists.newArrayList(); leaseMetaInfo .getClusterMembers() .forEach( lease -> { sessionNodes.add(lease.getRenewal()); }); return new VersionedList<>(leaseMetaInfo.getEpoch(), sessionNodes); } 获取 Meta 租约中最新的 Session 节点的信息,更新 Session 节点的信息。
Data 的节点信息也会通过类似的过程更新到 Meta 中,MetaServerManager 会根据节点类型,同步节点的在线信息:
@Override public VersionedList<DataNode> getDataServerMetaInfo() { VersionedList<Lease<DataNode>> leaseMetaInfo = getLeaseMeta(); List<DataNode> dataNodes = Lists.newArrayList(); leaseMetaInfo .getClusterMembers() .forEach( lease -> { dataNodes.add(lease.getRenewal()); }); return new VersionedList<>(leaseMetaInfo.getEpoch(), dataNodes); } client 重连回放机制
public void ensureConnected() throws InterruptedException { if (isConnected()) { return; } while (!connect()) { Thread.sleep(ClientConnection.RECONNECTING_DELAY); } } 在 client 的工作线程中,会通过上述代码以 5s 为间隔时间,反复检查链接状态,client 的检查函数位于工作线程的一个无限循环的检查语句中:
/** Handle. */ @Override public void handle() { //noinspection InfiniteLoopStatement while (true) { try { // check connection status, try to reconnect to the server when connection lose client.ensureConnected(); if (requestQueue.isEmpty()) { await(config.getRecheckInterval()); continue; } Iterator<TaskEvent> lt = requestQueue.iterator(); while (lt.hasNext()) { client.ensureConnected(); TaskEvent ev = lt.next(); lt.remove(); /**.........**/ Data 启动期以及 Data 的启动过程的状态转化,如何确保新分配的 Slot 的数据完整性
启动入口
Data 启动的过程代码位于com.alipay.sofa.registry.server.data.bootstrap.DataServerBootstrap#start
/** start dataserver */ public void start() { try { LOGGER.info("begin start server"); LOGGER.info("release properties: {}", ServerEnv.getReleaseProps()); LOGGER.info("the configuration items are as follows: " + dataServerConfig.toString()); ReporterUtils.enablePrometheusDefaultExports(); openDataServer(); openDataSyncServer(); openHttpServer(); renewNode(); fetchProviderData(); systemPropertyProcessorManager.startFetchMetaSystemProperty(); startScheduler(); TaskMetrics.getInstance().registerBolt(); postStart(); Runtime.getRuntime().addShutdownHook(new Thread(this::doStop)); LOGGER.info("start server success"); } catch (Throwable e) { throw new RuntimeException("start server error", e); } } DataServer 模块的各个 bean 在 JavaConfig 中统一配置,JavaConfig 类为 DataServerBeanConfiguration, 启动入口类为 DataServerInitializer,该类不由 JavaConfig 管理配置,而是继承了 SmartLifecycle 接口,在启动时由 Spring 框架调用其 start 方法。
该方法中调用了 DataServerBootstrap#start 方法(如上述代码所示),用于启动一系列的初始化服务。
sofaRegistry V6 版本引入了“路由表”(SlotTabel)的概念,“路由表”负责存放每个节点和 N 个 Slot 的映射关系,并保证尽量把所有 Slot 均匀地分配给每个节点。这样,当节点上下线时,只需要修改路由表内容即可。
从代码中可以看出,DataServer 服务在启动时,会启动 DataServer、DataSyncServer、HttpServer 三个 bolt 服务。在启动这些 Server 之时,DataServer 注册了一系列 Handler 来处理各类消息。启动这些 Server 后,调用了renewNode()方法,renewNode 调用 metaServerService的 renewNode,通过获取在心跳请求中将返回 SlotTable 路由表信息、Data 节点将路由表 SlotTable 保存在本地中,具体代码可以参考com.alipay.sofa.registry.server.meta.remoting.meta.MetaServerRenewService#renewNode,下一步又刷新了 Session 的信息,并创建了一个 renew meta 的守护进程,用来持续刷新节点上存储的集群信息:
private void renewNode() { metaServerService.renewNode(); // init session lease with first renew for (ProcessId processId : metaServerService.getSessionProcessIds()) { sessionLeaseManager.renewSession(processId); } metaServerService.startRenewer(); } slotManagerImpl 是 DataServer 的一个单例 ConditionalOnMissingBean 对象。当 Data 启动时,SlotManage 负责 Slot 数据的更新和迁移(com.alipay.sofa.registry.server.data.slot.SlotManagerImpl#syncMigrating):
private void syncMigrating( SlotState slotState, Collection<String> sessions, int syncSessionIntervalMs, long slotTableEpoch) { final Slot slot = slotState.slot; if (slotState.migratingStartTime == 0) { slotState.migratingStartTime = System.currentTimeMillis(); slotState.migratingTasks.clear(); observeLeaderMigratingStart(slot.getId()); LOGGER.info( "start migrating, slotId={}, sessionSize={}, sessions={}", slotState.slotId, sessions.size(), sessions); } final int notSyncedCount = sessions.size() - slotState.countSyncSuccess(sessions); for (String sessionIp : sessions) { MigratingTask mtask = slotState.migratingTasks.get(sessionIp); if (mtask == null) { KeyedTask<SyncSessionTask> ktask = commitSyncSessionTask(slot, slotTableEpoch, sessionIp, null, true); mtask = new MigratingTask(sessionIp, ktask); slotState.migratingTasks.put(sessionIp, mtask); LOGGER.info("migrating start,slotId={},session={}", slot.getId(), sessionIp); continue; } if (mtask.task.isFailed() && !mtask.forceSuccess) { // failed and not force Success, try to trigger emergency if (triggerEmergencyMigrating(slotState, sessions, mtask, notSyncedCount)) { LOGGER.info("[emergency]{},session={}", slotState.slotId, mtask.sessionIp); } else { KeyedTask<SyncSessionTask> ktask = commitSyncSessionTask(slot, slotTableEpoch, sessionIp, null, true); mtask.task = ktask; mtask.tryCount++; LOGGER.error( "migrating retry,slotId={},try={},session={},create={}/{}", slot.getId(), mtask.tryCount, sessionIp, mtask.createTimestamp, System.currentTimeMillis() - mtask.createTimestamp); continue; } } // force success or migrating finish. try to sync session // avoid the time of migrating is too long and block the syncing of session if (mtask.task.isOverAfter(syncSessionIntervalMs)) { if (syncSession(slotState, sessionIp, null, syncSessionIntervalMs, slotTableEpoch)) { LOGGER.info("slotId={}, sync session in migrating, session={}", slot.getId(), sessionIp); } } } } Slot 的数据的同步是一个 watchDog 的模式,获取 slotTabel 的变化,会自动同步节点上的 Slot 数据:
void syncWatch() { final int syncSessionIntervalMs = dataServerConfig.getSlotLeaderSyncSessionIntervalSecs() * 1000; final int syncLeaderIntervalMs = dataServerConfig.getSlotFollowerSyncLeaderIntervalSecs() * 1000; final long slotTableEpoch = slotTableStates.table.getEpoch(); for (SlotState slotState : slotTableStates.slotStates.values()) { try { sync(slotState, syncSessionIntervalMs, syncLeaderIntervalMs, slotTableEpoch); } catch (Throwable e) { SYNC_ERROR_LOGGER.error( "[syncCommit]failed to do sync slot {}, migrated={}", slotState.slot, slotState.migrated, e); } } } 由于节点上线后,Meta 会感知并获取最新的 DataServer 节点,leader Meta 对路由表进行了刷新,因为重新分配 SlotTable 本质上是 对 DataServer 节点和 Slot 槽位之间的映射关系进行重新分配。获取到当前存活的 DataServer 节点信息,从而方便的对之进行 Slot 分配。接下来对 Server 节点的 Slot 数据进行了同步。
上面的两部分代码总体可以概括为以下两个层次:
Data 和 Session 通过心跳的机制在 Meta 上进行续约,当 Data 发生节点变更的时候,Meta 此时会重新进行分配,生成新的 SlotTable,并通过广播和心跳的方式返回所有的节点,Session 就会用这份 SlotTable 寻址新的 Data 节点。存在一个时刻,集群中存在两份 SlotTable,分裂时间最大(1s)。
Session 上会缓存 Client 的 Pub 和 Sub 数据作为基准数据,在增量发送给 Data 的同时,Slot 的 leader 节点会定时和 Session 进行数据比对,Slot 的 follower 和 leader 也会定时进行数据对比,这样做到整个集群数据能快速达到最终一致,减少异常场景的不可服务时间。
Slot 数据完整度校验的作用在于当一个 Slot 数据所有副本所在节点全部宕机,数据发生丢失时,只能通过 Session 上的缓存数据进行数据回放。但在完全回放完成时,Slot 是不可对外提供读取服务的,以此避免推空或者推错,从而保证了数据的完整性。
流程如下图:
server 启动的最后一步,是通知 Meta 将上线的 Data 节点从 blacklist 移除:
private void postStart() throws Throwable { startupRetryer.call( () -> { LOGGER.info("successful start data server, remove self from blacklist"); metaServerService.removeSelfFromMetaBlacklist(); return true; }); } 至此,Server 的启动完成,Data 节点的 Slot 数据也根据变更后的 slotTable 同步完成,Data 节点开始在集群内工作。