背景
SOFARgeistry 分为 Session、Data 和 Meta 三个模块。Session 模块用于 client 交互,可以横向扩容,可以承受大量 client 连接请求;Data 是数据存储模块,利用 slot 分片机制来均衡负载,使用备份来解决高可用问题;Meta 是 Session、Data 的注册中心,采用分布式锁来选举 leader,本文详细阐述 Meta 如何选主。
基于 MySQL 的分布式锁
MySQL 表
drop table if exists distribute_lock; CREATE TABLE distribute_lock ( id bigint(20) NOT NULL AUTO_INCREMENT primary key, data_center varchar(128) NOT NULL, lock_name varchar(1024) NOT NULL, owner varchar(512) NOT NULL, duration bigint(20) NOT NULL, term bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '任期', term_duration bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '租期', gmt_create timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, gmt_modified timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY `uk_data_center_lock` (`data_center`, `lock_name`), KEY `idx_lock_owner` (`owner`) ); 表的增改查等操作
public interface DistributeLockMapper { /** * query by dataCenter and lockName * * @param dataCenter * @param lockName * @return */ public DistributeLockDomain queryDistLock( @Param("dataCenter") String dataCenter, @Param("lockName") String lockName); /** * compete lock, it will throw exception if lockName existed * * @param lock */ public void competeLockOnInsert(DistributeLockDomain lock) throws Exception; /** * compete lock with cas * * @param competeLock * @return */ public void competeLockOnUpdate(FollowCompeteLockDomain competeLock); /** renew lock last update time */ public void ownerHeartbeat(DistributeLockDomain lock); /** force reset owner and duration */ public void forceRefresh(DistributeLockDomain lock); } 整体流程
step1:启动时创建锁记录,
<insert id="competeLockOnInsert" parameterType="com.alipay.sofa.registry.jdbc.domain.DistributeLockDomain"> <![CDATA[ INSERT /*+ QUERY_TIMEOUT(2000000) */ INTO distribute_lock ( data_center, lock_name, owner, duration, gmt_create, gmt_modified, `term`, `term_duration` ) VALUES ( #{dataCenter}, #{lockName}, #{owner}, #{duration}, NOW(3), NOW(3), 1, 0 ) ON DUPLICATE KEY UPDATE lock_name = #{lockName} ]]> </insert> step2:leader 每秒提交心跳,更新表
<update id="ownerHeartbeat" parameterType="com.alipay.sofa.registry.jdbc.domain.DistributeLockDomain"> <![CDATA[ update /*+ QUERY_TIMEOUT(2000000) */ distribute_lock set owner = #{owner}, gmt_modified = NOW(3), `term_duration` = (`term_duration` + 1) where data_center = #{dataCenter} and lock_name = #{lockName} and owner = #{owner} and term = #{term} and `term_duration` = #{termDuration} and timestampdiff(SECOND, gmt_modified, NOW()) < #{duration}/1000 ]]> </update> step3:follower 每秒判断锁是否过期,如果过期,则 cas 竞选 leader
public DistributeLockDomain onFollowWorking(DistributeLockDomain lock, String myself) { /** as follow, do compete if lock expire */ if (lock.expire()) { LOG.info("lock expire: {}, meta elector start: {}", lock, myself); distributeLockMapper.competeLockOnUpdate( new FollowCompeteLockDomain( lock.getDataCenter(), lock.getLockName(), lock.getOwner(), lock.getGmtModified(), myself, lock.getDuration(), lock.getTerm(), lock.getTermDuration())); DistributeLockDomain newLock = distributeLockMapper.queryDistLock(lock.getDataCenter(), lock.getLockName()); LOG.info("elector finish, new lock: {}", lock); return newLock; } return lock; } public boolean expire() { return gmtDbServerTime.getTime() > gmtModified.getTime() + duration; } <update id="competeLockOnUpdate"> <!-- update cas with dataCenter,lockName,owner,gmtModified --> <![CDATA[ update /*+ QUERY_TIMEOUT(2000000) */ distribute_lock set owner = #{newOwner}, gmt_modified = NOW(3), term = (term + 1) , `term_duration` = 0 where data_center = #{dataCenter} and lock_name = #{lockName} and owner = #{owner} and term = #{term} and `term_duration` = #{termDuration} and timestampdiff(SECOND, gmt_modified, NOW()) > #{duration}/1000 ]]> </update> step4:如果 leader 发生切换,通知 xxx
时序图
💡 Tips:输入/画板或点击上方工具栏,选择「画板」、绘制流程图、架构图等各种图形。
类图
主要源代码解析
public void elect() { synchronized (this) { if (isObserver) {//如果是Observer,不参与选主 leaderInfo = doQuery(); } else { leaderInfo = doElect(); } if (amILeader()) {//我是leader onIamLeader();//我从follower变成leader,通知xxx } else {//我不是leader onIamNotLeader();//我从leader变成follower,通知xxx } } } @Override protected LeaderInfo doElect() { //1、查询锁 DistributeLockDomain lock = distributeLockMapper.queryDistLock(defaultCommonConfig.getClusterId(tableName()), lockName); //2、不存在则创建锁 /** compete and return leader */ if (lock == null) { return competeLeader(defaultCommonConfig.getClusterId(tableName())); } //3、判断角色 ElectorRole role = amILeader(lock.getOwner()) ? ElectorRole.LEADER : ElectorRole.FOLLOWER; if (role == ElectorRole.LEADER) { lock = onLeaderWorking(lock, myself());//4、提交心跳 } else { lock = onFollowWorking(lock, myself());//5、判断过期与否,如过期,则cas竞争锁 } LeaderInfo result = leaderFrom(lock);//6、锁信息转换为LeaderInfo LOG.info("meta role : {}, leaderInfo: {}", role, result); return result; } Meta 集群 leader 变更
1、初始化
| ip | 角色 | 备注 |
|---|---|---|
| 10.177.41.99 | follower | |
| 10.177.41.100 | leader | |
| 10.181.152.223 | follower |
2、kill 10.177.41.100 节点后
| ip | 角色 | 备注 |
|---|---|---|
| 10.177.41.99 | follower | competeLockOnUpdate执行失败,即 term 、term_duration匹配不上,sql没有修改行数据 |
| 10.177.41.100 | kill | |
| 10.181.160.18 | leader | competeLockOnUpdate执行成功,即 term 、term_duration匹配的上,即当选leader,之后term 递增 |
3、kill 10.181.160.18 节点后
| ip | 角色 | 备注 |
|---|---|---|
| 10.177.41.99 | leader | competeLockOnUpdate执行成功,当选leader |
| 10.177.41.100 | kill | |
| 10.181.160.18 | kill |