温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何使用golang etcd raft协议

发布时间:2021-10-09 16:08:26 来源:亿速云 阅读:193 作者:iii 栏目:编程语言

本篇内容介绍了“如何使用golang etcd raft协议”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

raft分布式一致性算法

分布式存储系统通常会通过维护多个副本来进行容错, 以提高系统的可用性。 这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性? Raft算法把问题分解成了四个子问题: 1. 领袖选举(leader election)、 2. 日志复制(log replication)、 3. 安全性(safety) 4. 成员关系变化(membership changes) 这几个子问题。 源码gitee地址: https://gitee.com/ioly/learning.gooop

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 10)

  • 添加put/get/del kv键值对的rpc接口

  • 继续完善Leader状态的raft协议响应

设计

  • rpc/IKVStoreRPC: kv操作的rpc接口

  • store/IKVStore: kv操作的持久化接口

  • stoer/ILogStore: 从IKVStore继承,以支持kv持久化

  • lsm/IRaftState: 继承rpc.IKVStoreRPC接口,以支持kv操作

  • lsm/tLeaderState: 初步实现Leader状态的raft协议处理,事件驱动的逻辑编排,读写分离的字段管理。

rpc/IKVStoreRPC.go

kv操作的rpc接口

package rpc type IKVStoreRPC interface {	ExecuteKVCmd(cmd *KVCmd, ret *KVRet) error } type KVCmd struct {	OPCode KVOPCode	Key []byte	Content []byte } type KVOPCode int const (	KVGet KVOPCode = iota	KVPut KVOPCode = iota	KVDel KVOPCode = iota ) type KVRet struct {	Code KVRetCode	Key []byte	Content []byte } type KVRetCode int const (	KVOk KVRetCode = iota	KVKeyNotFound KVRetCode = iota	KVInternalError KVRetCode = iota )

store/IKVStore.go

kv操作的持久化接口

package store type IKVStore interface {	Get(key []byte) (error, []byte)	Put(key []byte, content []byte) error	Del(key []byte) error }

stoer/ILogStore.go

从IKVStore继承,以支持kv持久化

package store import (	"learning/gooop/etcd/raft/model" ) type ILogStore interface {	IKVStore	LastAppendedTerm() int64	LastAppendedIndex() int64	LastCommittedTerm() int64	LastCommittedIndex() int64	Append(entry *model.LogEntry) error	Commit(index int64) error	GetLog(index int64) (error, *model.LogEntry) }

lsm/IRaftState.go

继承rpc.IKVStoreRPC接口,以支持kv操作

package lsm import (	"learning/gooop/etcd/raft/roles"	"learning/gooop/etcd/raft/rpc" ) type IRaftState interface {	rpc.IRaftRPC	rpc.IKVStoreRPC	Role() roles.RaftRole	Start() }

lsm/tLeaderState.go

初步实现Leader状态的raft协议处理,事件驱动的逻辑编排,读写分离的字段管理。

package lsm import (	"errors"	"learning/gooop/etcd/raft/config"	"learning/gooop/etcd/raft/model"	"learning/gooop/etcd/raft/roles"	"learning/gooop/etcd/raft/rpc"	"learning/gooop/etcd/raft/store"	"learning/gooop/etcd/raft/timeout"	"sync"	"time" ) // tLeaderState presents a leader node type tLeaderState struct {	tEventDrivenModel	context    iRaftStateContext	mInitOnce  sync.Once	mStartOnce sync.Once	// update: leInit / leLeaderHeartbeat	mTerm int64	// update: leInit / leDisposing	mDisposedFlag bool	// update: leVoteToCandidate	mVotedTerm int64	mVotedCandidateID string	mVotedTimestamp int64 } // trigger: init() // args: empty const leInit = "leader.init" // trigger: Start() // args: empty const leStart = "leader.Start" // trigger: whenNewLeaderAnnouncedThenSwitchToFollower // args: empty const leDiposing = "leader.Disposing" // trigger : Heartbeat() / AppendLog() // args: term int64 const leNewLeaderAnnounced = "leader.NewLeaderAnnounced" // trigger: RequestVote() // args: *rpc.RequestVoteCmd const leBeforeRequestVote = "leader.BeforeRequestVote" // trigger: // args: *rpc.RequestVoteCmd const leVoteToCandidate = "leader.VoteToCandidate" // trigger: handleHeartbeat() // args: term int64 const leHeartbeatRejected = "leader.HeartbeatRejected" func newLeaderState(ctx iRaftStateContext, term int64) IRaftState {	it := new(tLeaderState)	it.init(ctx, term)	return it } func (me *tLeaderState) init(ctx iRaftStateContext, term int64) {	me.mInitOnce.Do(func() {	me.context = ctx	me.mTerm = term	me.initEventHandlers()	me.raise(leInit)	}) } func (me *tLeaderState) initEventHandlers() {	// write only logic	me.hookEventsForDisposedFlag()	me.hookEventsForVotedTerm()	// read only logic	me.hook(leStart,	me.whenStartThenBeginHeartbeatToOthers)	me.hook(leNewLeaderAnnounced,	me.whenNewLeaderAnnouncedThenSwitchToFollower)	me.hook(leHeartbeatRejected,	me.whenHeartbeatRejectedThenSwitchToFollower) } func (me *tLeaderState) hookEventsForDisposedFlag() {	me.hook(leInit, func(e string, args ...interface{}) {	me.mDisposedFlag = false	})	me.hook(leDiposing, func(e string, args ...interface{}) {	me.mDisposedFlag = true	}) } func (me *tLeaderState) hookEventsForVotedTerm() {	me.hook(leBeforeRequestVote, func(e string, args ...interface{}) {	// check last vote timeout	if me.mVotedTerm == 0 {	return	}	if time.Duration(time.Now().UnixNano() - me.mVotedTimestamp)*time.Nanosecond >= timeout.ElectionTimeout {	me.mVotedTerm = 0	me.mVotedTimestamp = 0	me.mVotedCandidateID = ""	}	})	me.hook(leVoteToCandidate, func(e string, args ...interface{}) {	// after vote to candidate	cmd := args[0].(*rpc.RequestVoteCmd)	me.mVotedTerm = cmd.Term	me.mVotedCandidateID = cmd.CandidateID	me.mVotedTimestamp = time.Now().UnixNano()	}) } func (me *tLeaderState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {	// check term	if cmd.Term <= me.mTerm {	ret.Code = rpc.HBTermMismatch	return nil	}	// new leader	me.raise(leNewLeaderAnnounced, cmd.Term)	// return ok	ret.Code = rpc.HBOk	return nil } func (me *tLeaderState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {	// check term	if cmd.Term <= me.mTerm {	ret.Code = rpc.ALTermMismatch	return nil	}	// new leader	me.raise(leNewLeaderAnnounced, cmd.Term)	// return ok	ret.Code = rpc.ALInternalError	return nil } func (me *tLeaderState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {	// just ignore	ret.Code = rpc.CLInternalError	return nil } func (me *tLeaderState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {	me.raise(leBeforeRequestVote, cmd)	// check voted term	if cmd.Term < me.mVotedTerm {	ret.Code = rpc.RVTermMismatch	return nil	}	if cmd.Term == me.mVotedTerm {	if me.mVotedCandidateID != "" && me.mVotedCandidateID != cmd.CandidateID {	// already vote another	ret.Code = rpc.RVVotedAnother	return nil	} else {	// already voted	ret.Code = rpc.RVOk	return nil	}	}	if cmd.Term > me.mVotedTerm {	// new term, check log	if cmd.LastLogIndex >= me.context.Store().LastCommittedIndex() {	// good log	me.raise(leVoteToCandidate, cmd)	ret.Code = rpc.RVOk	} else {	// bad log	ret.Code = rpc.RVLogMismatch	}	return nil	}	// should not reach here	ret.Code = rpc.RVTermMismatch	return nil } func (me *tLeaderState) Role() roles.RaftRole {	return roles.Leader } func (me *tLeaderState) Start() {	me.mStartOnce.Do(func() {	me.raise(leStart)	}) } func (me *tLeaderState) whenStartThenBeginHeartbeatToOthers(_ string, _ ...interface{}) {	go func() {	for !me.mDisposedFlag {	_ = me.boardcast(func(_ config.IRaftNodeConfig, client rpc.IRaftRPC) error {	return me.handleHeartbeat(client)	})	time.Sleep(timeout.HeartbeatInterval)	}	}() } func (me *tLeaderState) boardcast(action func(config.IRaftNodeConfig, rpc.IRaftRPC) error) error {	for _,it := range me.context.Config().Nodes() {	if it.ID() == me.context.Config().ID() {	continue	}	e := me.context.RaftClientService().Using(it.ID(), func(client rpc.IRaftRPC) error {	return action(it, client)	})	if e != nil {	return e	}	}	return nil } func (me *tLeaderState) handleHeartbeat(client rpc.IRaftRPC) error {	cmd := new(rpc.HeartbeatCmd)	cmd.Term = me.mTerm	cmd.LeaderID = me.context.Config().ID()	ret := new(rpc.HeartbeatRet)	e := client.Heartbeat(cmd, ret)	if e != nil {	return e	}	switch ret.Code {	case rpc.HBTermMismatch:	me.raise(leHeartbeatRejected, ret.Term)	break	}	return nil } func (me *tLeaderState) whenNewLeaderAnnouncedThenSwitchToFollower(_ string, args ...interface{}) {	me.raise(leDiposing)	term := args[0].(int64)	me.context.HandleStateChanged(newFollowerState(me.context, term)) } func (me *tLeaderState) whenHeartbeatRejectedThenSwitchToFollower(_ string, args ...interface{}) {	me.raise(leDiposing)	term := args[0].(int64)	me.context.HandleStateChanged(newFollowerState(me.context, term)) } func (me *tLeaderState) ExecuteKVCmd(cmd *rpc.KVCmd, ret *rpc.KVRet) error {	switch cmd.OPCode {	case rpc.KVGet:	return me.handleKVGet(cmd, ret)	case rpc.KVPut:	return me.handleKVPut(cmd, ret)	case rpc.KVDel:	return me.handleKVDel(cmd, ret)	}	return nil } func (me *tLeaderState) handleKVGet(cmd *rpc.KVCmd, ret *rpc.KVRet) error {	e, v := me.context.Store().Get(cmd.Key)	if e != nil {	ret.Code = rpc.KVInternalError	return e	}	ret.Code = rpc.KVOk	ret.Content = v	return nil } func (me *tLeaderState) handleKVPut(cmd *rpc.KVCmd, ret *rpc.KVRet) error {	kvcmd := new(store.PutCmd)	kvcmd.Key = cmd.Key	kvcmd.Value = cmd.Content	// create/append/commit log	e := me.broadcastKVCmd(kvcmd, ret)	if e != nil {	return e	}	// apply cmd	return me.context.Store().Put(cmd.Key, cmd.Content) } func (me *tLeaderState) handleKVDel(cmd *rpc.KVCmd, ret *rpc.KVRet) error {	kvcmd := new(store.DelCmd)	kvcmd.Key = cmd.Key	// create/append/commit log	e := me.broadcastKVCmd(kvcmd, ret)	if e != nil {	return e	}	// apply cmd	return me.context.Store().Put(cmd.Key, cmd.Content) } func (me *tLeaderState) broadcastKVCmd(cmd store.IKVCmd, ret *rpc.KVRet) error {	// create log	st := me.context.Store()	log := new(model.LogEntry)	log.Term = me.mTerm	log.Index = st.LastCommittedIndex() + 1	log.PrevTerm = st.LastCommittedTerm()	log.PrevIndex = st.LastCommittedIndex()	log.Command = cmd.Marshal()	// append log	e := st.Append(log)	if e != nil {	ret.Code = rpc.KVInternalError	return e	}	// ask other nodes to append log	alcmd := new(rpc.AppendLogCmd)	alcmd.Term = me.mTerm	alcmd.LeaderID = me.context.Config().ID()	alcmd.Entry = log	sumOk := []int{ 0 }	_ = me.boardcast(func(_ config.IRaftNodeConfig, client rpc.IRaftRPC) error {	alret := new(rpc.AppendLogRet)	e := client.AppendLog(alcmd, alret)	if e != nil {	return e	}	switch alret.Code {	case rpc.ALOk:	sumOk[0]++	break	case rpc.ALTermMismatch:	// todo: fixme	break	case rpc.ALIndexMismatch:	// todo: fixme	break	}	return nil	})	// wait for most nodes	if sumOk[0] >= len(me.context.Config().Nodes()) / 2 {	// commit log	clcmd := new(rpc.CommitLogCmd)	clcmd.LeaderID = me.context.Config().ID()	clcmd.Term = me.mTerm	clcmd.Index = log.Index	_ = me.boardcast(func(_ config.IRaftNodeConfig, client rpc.IRaftRPC) error {	ret := new(rpc.CommitLogRet)	e := client.CommitLog(clcmd, ret)	if e != nil {	return e	}	switch ret.Code {	case rpc.CLInternalError:	// todo: fixme	break	case rpc.CLLogNotFound:	// todo: fixme	break	case rpc.CLOk:	return nil	}	return nil	})	// ok	return nil	} else {	return gErrorCannotReachAgreement	} } var gErrorCannotReachAgreement = errors.New("cannot reach agreement")

“如何使用golang etcd raft协议”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

go
AI