The Raft Consensus Algorithm

The Raft Consensus Algorithm




  1. 任何一个服务器都可以成为一个候选者Candidate,它向其他服务器Follower发出要求选举自己的请求:

  2. 其他服务器同意了,发出OK。
    如果在这个过程中,有一个Follower当机,没有收到请求选举的要求,因此候选者可以自己选自己,只要达到N/2 + 1 的大多数票,候选人还是可以成为Leader的。

  3. 这样这个候选者就成为了Leader领导人,它可以向选民也就是Follower们发出指令,比如进行日志复制。

  4. 以后通过心跳进行日志复制的通知

  5. 如果一旦这个Leader当机崩溃了,那么Follower中有一个成为候选者,发出邀票选举。

  6. Follower同意后,其成为Leader,继续承担日志复制等指导工作:


自增currentTerm,由Follower转换为Candidate,设置votedFor为自身,并行发起RequestVote RPC,不断重试,直至满足以下任一条件:

  1. 获得超过半数Server的投票,转换为Leader,广播Heartbeat
  2. 接收到合法Leader的AppendEntries RPC,转换为Follower
  3. 选举超时,没有Server选举成功,自增currentTerm,重新选举


  1. Candidate在等待投票结果的过程中,可能会接收到来自其它Leader的AppendEntries RPC。如果该Leader的Term不小于本地的currentTerm,则认可该Leader身份的合法性,主动降级为Follower;反之,则维持Candidate身份,继续等待投票结果
  2. Candidate既没有选举成功,也没有收到其它Leader的RPC,这种情况一般出现在多个节点同时发起选举(如图Split Vote),最终每个Candidate都将超时。为了减少冲突,这里采取“随机退让”策略,每个Candidate重启选举定时器(随机值),大大降低了冲突概率



  1. 假设Leader领导人已经选出,这时客户端发出增加一个日志的要求,比如日志是”sally”:

  2. Leader要求Followe遵从他的指令,都将这个新的日志内容追加到他们各自日志中:

  3. 大多数follower服务器将日志写入磁盘文件后,确认追加成功,发出Commited Ok:

  4. 在下一个心跳heartbeat中,Leader会通知所有Follwer更新commited 项目。




  1. Client发送command给Leader
  2. Leader追加command至本地log
  3. Leader广播AppendEntriesRPC至Follower
  4. 一旦日志项committed成功:

    4.1 Leader应用对应的command至本地StateMachine,并返回结果至Client

    4.2 Leader通过后续AppendEntriesRPC将committed日志项通知到Follower

    4.3 Follower收到committed日志项后,将其应用至本地StateMachine




最上面这个是新leader,a~f是follower,每个格子代表一条log entry,格子内的数字代表这个log entry是在哪个term上产生的。




animated show

Source code analysis

referenced impl


State Machine in Event Loop

//               ________
//            --|Snapshot|                 timeout
//            |  --------                  ______
// recover    |       ^                   |      |
// snapshot / |       |snapshot           |      |
// higher     |       |                   v      |     recv majority votes
// term       |    --------    timeout    -----------                        -----------
//            |-> |Follower| ----------> | Candidate |--------------------> |  Leader   |
//                 --------               -----------                        -----------
//                    ^          higher term/ |                         higher term |
//                    |            new leader |                                     |
//                    |_______________________|____________________________________ |

func (s *server) loop() {
	defer s.debugln("server.loop.end")

	state := s.State()

	for state != Stopped {
		s.debugln(" ", state)
		switch state {
		case Follower:
		case Candidate:
		case Leader:
		case Snapshotting:
		state = s.State()

Log Replication

append entries processing in event loop

// Processes the "append entries" request.
func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {

	if req.Term < s.currentTerm {
		s.debugln(" stale term")
		return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false

	if req.Term == s.currentTerm {
		_assert(s.State() != Leader, "\n", s.currentTerm)

		// step-down to follower when it is a candidate
		if s.state == Candidate {
			// change state to follower

		// discover new leader when candidate
		// save leader name when follower
		s.leader = req.LeaderName
	} else {
		// Update term and leader.
		s.updateCurrentTerm(req.Term, req.LeaderName)

	// Reject if log doesn't contain a matching previous entry.
	if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
		s.debugln(" ", err)
		return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true

	// Append entries to the log.
	if err := s.log.appendEntries(req.Entries); err != nil {
		s.debugln(" ", err)
		return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true

	// Commit up to the commit index.
	if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
		s.debugln(" ", err)
		return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true

	// once the server appended and committed all the log entries from the leader

	return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true

// Processes the "append entries" response from the peer. This is only
// processed when the server is a leader. Responses received during other
// states are dropped.
func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
	// If we find a higher term then change to a follower and exit.
	if resp.Term() > s.Term() {
		s.updateCurrentTerm(resp.Term(), "")

	// panic response if it's not successful.
	if !resp.Success() {

	// if one peer successfully append a log from the leader term,
	// we add it to the synced list
	if resp.append == true {
		s.syncedPeer[resp.peer] = true

	// Increment the commit count to make sure we have a quorum before committing.
	if len(s.syncedPeer) < s.QuorumSize() {

	// Determine the committed index that a majority has.
	var indices []uint64
	indices = append(indices, s.log.currentIndex())
	for _, peer := range s.peers {
		indices = append(indices, peer.getPrevLogIndex())

	// We can commit up to the index which the majority of the members have appended.
	commitIndex := indices[s.QuorumSize()-1]
	committedIndex := s.log.commitIndex

	if commitIndex > committedIndex {
		// leader needs to do a fsync before committing log entries
		s.debugln("commit index ", commitIndex)

append entries processing in heatbeat

// Heartbeat
// The event loop that is run when the server is in a Leader state.
func (s *server) leaderLoop() {
    // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
    for _, peer := range s.peers {

// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeat(c chan bool) {

	ticker := time.Tick(p.heartbeatInterval)

	debugln("peer.heartbeat: ", p.Name, p.heartbeatInterval)

	for {
		select {
		case <-ticker:
			start := time.Now()
			duration := time.Now().Sub(start)
			p.server.DispatchEvent(newEvent(HeartbeatEventType, duration, nil))

func (p *Peer) flush() {
	debugln("peer.heartbeat.flush: ", p.Name)
	prevLogIndex := p.getPrevLogIndex()
	log.Print("heartbeat flush,prev log index is:",prevLogIndex)
	term := p.server.currentTerm
    // Retrieves a list of entries after a given index as well as the term of the
    // index provided. A nil list of entries is returned if the index no longer
    // exists because a snapshot was made.
	entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)

	if entries != nil {
		p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(),, entries))
	} else {
		p.sendSnapshotRequest(newSnapshotRequest(, p.server.snapshot))

// Append Entries

// Sends an AppendEntries request to the peer through the transport.
func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
	tracef("peer.append.send: %s->%s [prevLog:%v length: %v]\n",
		p.server.Name(), p.Name, req.PrevLogIndex, len(req.Entries))

	resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
	if resp == nil {
		p.server.DispatchEvent(newEvent(HeartbeatIntervalEventType, p, nil))
		debugln("peer.append.timeout: ", p.server.Name(), "->", p.Name)
	traceln("peer.append.resp: ", p.server.Name(), "<-", p.Name)

	// If successful then update the previous log index.
	if resp.Success() {
		if len(req.Entries) > 0 {
			p.prevLogIndex = req.Entries[len(req.Entries)-1].GetIndex()
		traceln("peer.append.resp.success: ", p.Name, "; idx =", p.prevLogIndex)
	} else {
		if resp.Term() > p.server.Term() {
			// this happens when there is a new leader comes up that this *leader* has not
			// known yet.
			// this server can know until the new leader send a ae with higher term
			// or this server finish processing this response.
			debugln("peer.append.resp.not.update: new.leader.found")
		} else if resp.Term() == req.Term && resp.CommitIndex() >= p.prevLogIndex {
			// we may miss a response from peer
			// so maybe the peer has committed the logs we just sent
			// but we did not receive the successful reply and did not increase
			// the prevLogIndex

			// peer failed to truncate the log and sent a fail reply at this time
			// we just need to update peer's prevLog index to commitIndex

			p.prevLogIndex = resp.CommitIndex()
			debugln("peer.append.resp.update: ", p.Name, "; idx =", p.prevLogIndex)

		} else if p.prevLogIndex > 0 {
            // If it was unsuccessful then decrement the previous log index and
            // we'll try again next time.
			// Decrement the previous log index down until we find a match. Don't
			// let it go below where the peer's commit index is though. That's a
			// problem.
			// if it is not enough, we directly decrease to the index of the peer log
			if p.prevLogIndex > resp.Index() {
				p.prevLogIndex = resp.Index()

			debugln("peer.append.resp.decrement: ", p.Name, "; idx =", p.prevLogIndex)

	// Attach the peer to resp, thus server can know where it comes from
	resp.peer = p.Name
	// Send response to server for processing.