Consensus in Distributed Systems: Raft, Paxos, and the CAP Theorem in Practice

December 5, 2024

The CAP Theorem

The CAP theorem, proved by Gilbert and Lynch in 2002 (based on Brewer's 2000 conjecture), states that a distributed data store can satisfy at most two of three properties simultaneously:

  • Consistency (C): Every read receives the most recent write or an error.
  • Availability (A): Every request receives a non-error response, without guarantee that it reflects the most recent write.
  • Partition Tolerance (P): The system continues to operate despite an arbitrary number of messages being dropped or delayed between nodes.

Network partitions are inevitable in any system spanning multiple machines. Hardware fails, switches get misconfigured, cables get disconnected. Partition tolerance is therefore not optional. The real engineering choice reduces to CP (consistency + partition tolerance) or AP (availability + partition tolerance).

Network Partition Between Two Groups:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Partition A    β”‚   X  X  X    β”‚   Partition B    β”‚
β”‚                  β”‚              β”‚                  β”‚
β”‚   Node 1         β”‚  link down   β”‚   Node 4         β”‚
β”‚   Node 2         β”‚              β”‚   Node 5         β”‚
β”‚   Node 3         β”‚              β”‚                  β”‚
β”‚                  β”‚              β”‚                  β”‚
β”‚   (3 nodes)      β”‚              β”‚   (2 nodes)      β”‚
β”‚   (majority)     β”‚              β”‚   (minority)     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

CP behavior (e.g., etcd):
  Partition A: continues serving reads and writes (has quorum)
  Partition B: rejects writes, returns errors to clients
  After heal:  Partition B replays the leader log and converges

AP behavior (e.g., Cassandra with CL=ONE):
  Partition A: accepts writes (e.g., SET x = 5)
  Partition B: accepts writes (e.g., SET x = 7)
  Both succeed. Both clients receive acknowledgments.
  After heal:  conflict resolution required
               (last-write-wins, vector clocks, CRDTs)

The choice between CP and AP is not necessarily system-wide. Different categories of data within the same application can use different consistency models. Financial balances typically require CP guarantees, while session caches or activity feeds can tolerate AP semantics with eventual convergence.

Eventual Consistency Windows

The term "eventually consistent" describes a spectrum rather than a single behavior. The critical variable is the duration of the inconsistency window.

Write event                                  Convergence
    |                                             |
    v                                             v
────*─────────────────────────────────────────────*────> time
    |<─────── inconsistency window ──────────────>|
    |                                             |
    |   During this interval, different nodes     |
    |   may return different values for the       |
    |   same key.                                 |
    |                                             |
    |   Typical durations:                        |
    |     Single datacenter: 1-50 ms              |
    |     Cross-datacenter:  50-500 ms            |
    |     Under load/faults: seconds to minutes   |

Acceptable staleness depends on the data type:

Data CategoryStaleness ToleranceTypical Store
Financial transactionsNoneCP (Spanner, CockroachDB)
User authentication stateNoneCP (etcd, ZooKeeper)
Social media feedsSecondsAP (Cassandra, DynamoDB)
Analytics countersMinutesAP (Redis, DynamoDB)
Search indexesMinutes to hoursAP (Elasticsearch)

PACELC: A More Complete Model

The PACELC theorem extends CAP by addressing behavior during normal (non-partitioned) operation. During a Partition, choose Availability or Consistency. Else (normal operation), choose Latency or Consistency.

This captures a tradeoff that CAP ignores: even without partitions, enforcing consistency requires coordination between nodes, which increases latency.

PACELC Classification of Production Systems:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   System     β”‚ During Partition     β”‚ Normal Operation      β”‚
β”‚              β”‚ (choose A or C)      β”‚ (choose L or C)       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ Spanner      β”‚ C (reject minority)  β”‚ C (commit-wait ~7ms)  β”‚
β”‚ CockroachDB  β”‚ C (reject minority)  β”‚ C (quorum writes)     β”‚
β”‚ etcd         β”‚ C (reject minority)  β”‚ C (Raft quorum)       β”‚
β”‚ ZooKeeper    β”‚ C (reject minority)  β”‚ C (Zab quorum)        β”‚
β”‚ Cassandra    β”‚ A (accept both)      β”‚ L (local reads)       β”‚
β”‚ DynamoDB     β”‚ A (accept both)      β”‚ L (eventual default)  β”‚
β”‚ MongoDB      β”‚ Configurable         β”‚ Configurable          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

CP/EC systems pay latency cost on every operation, not
only during partitions. This is the fundamental cost of
strong consistency.

Paxos

Leslie Lamport published the Paxos algorithm in 1998 ("The Part-Time Parliament") and simplified the presentation in 2001 ("Paxos Made Simple"). It remains the foundation of consensus in systems like Google Spanner, Chubby, and Megastore.

Single-Decree Paxos (Basic Paxos)

Basic Paxos solves the problem of getting a set of nodes to agree on a single value. Three roles are defined: Proposers, Acceptors, and Learners. In practice, a single node typically serves all three roles.

The protocol operates in two phases.

Phase 1: Prepare

The proposer selects a proposal number n and sends a Prepare(n) request to a majority of acceptors. Each acceptor that has not already promised to a higher-numbered proposal responds with a Promise(n) and includes any value it has previously accepted.

Phase 2: Accept

If the proposer receives promises from a majority, it sends an Accept(n, v) request. The value v is either the value from the highest-numbered previously accepted proposal (if any), or the proposer's own value. Acceptors accept the proposal unless they have already promised to a higher number.

Basic Paxos, Two-Phase Protocol:

PHASE 1 (Prepare):

  Proposer                   Acceptors
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”
  β”‚        β”‚        β”‚    β”‚  β”‚    β”‚  β”‚    β”‚
  β”‚Prepare │───────>β”‚ A1 β”‚  β”‚    β”‚  β”‚    β”‚
  β”‚  n=1   │────────────>β”‚ A2 β”‚  β”‚    β”‚
  β”‚        │─────────────────>β”‚ A3 β”‚
  β”‚        β”‚        β”‚    β”‚  β”‚    β”‚  β”‚    β”‚
  β”‚        β”‚<───────│Promβ”‚  β”‚    β”‚  β”‚    β”‚
  β”‚        β”‚<────────────│Promβ”‚  β”‚    β”‚
  β”‚        β”‚<─────────────────│Promβ”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β””β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”˜

  Each acceptor responds:
    "I will not accept proposals numbered less than 1."
    "The highest-numbered proposal I have accepted is: (none)"

PHASE 2 (Accept):

  Proposer                   Acceptors
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”
  β”‚        β”‚        β”‚    β”‚  β”‚    β”‚  β”‚    β”‚
  β”‚Accept  │───────>β”‚ A1 β”‚  β”‚    β”‚  β”‚    β”‚
  β”‚ n=1    │────────────>β”‚ A2 β”‚  β”‚    β”‚
  β”‚ v="X"  │─────────────────>β”‚ A3 β”‚
  β”‚        β”‚        β”‚    β”‚  β”‚    β”‚  β”‚    β”‚
  β”‚        β”‚<───────│Acc β”‚  β”‚    β”‚  β”‚    β”‚
  β”‚        β”‚<────────────│Acc β”‚  β”‚    β”‚
  β”‚        β”‚<─────────────────│Acc β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β””β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”˜

  Majority accepts. Value "X" is chosen.

Dueling Proposers and Livelock

When multiple proposers operate concurrently, their prepare phases can invalidate each other, preventing either from completing.

Livelock Scenario (two concurrent proposers):

Time ──────────────────────────────────────────────────────>

P1: Prepare(n=1) ──> Acceptors promise n >= 1
                          |
P2: Prepare(n=2) ──> Acceptors promise n >= 2
                          |  (invalidates P1)
                          |
P1: Accept(n=1, "X") ──> REJECTED (acceptors promised n >= 2)
                          |
P1: Prepare(n=3) ──> Acceptors promise n >= 3
                          |  (invalidates P2)
                          |
P2: Accept(n=2, "Y") ──> REJECTED (acceptors promised n >= 3)
                          |
P2: Prepare(n=4) ──> Acceptors promise n >= 4
                          |  (invalidates P1)
                          ...

Neither proposer ever gets a value accepted.
This can continue indefinitely.

Mitigations:
  1. Randomized exponential backoff before retrying
  2. Elect a distinguished proposer (leader)
  3. Use Multi-Paxos with a stable leader

Multi-Paxos

Single-decree Paxos decides one value. Real systems need to agree on a sequence of values (a replicated log). Multi-Paxos runs one Paxos instance per log slot, with an optimization: a stable leader can skip Phase 1 for consecutive slots after establishing leadership once.

Multi-Paxos Message Flow:

Single-decree (3 instances, no leader optimization):

  Client    Proposer    Acceptors (A1, A2, A3)
    |          |            |    |    |
    |  --------|-- Slot 1 --|----|----|--------
    |          |--Prepare-->|    |    |     Phase 1
    |          |--Prepare------->|    |
    |          |--Prepare------------>|
    |          |<--Promise--|    |    |
    |          |<--Promise-------|    |
    |          |--Accept--->|    |    |     Phase 2
    |          |--Accept-------->|    |
    |          |<--Accepted-|    |    |
    |          |<--Accepted------|    |
    |  --------|-- Slot 2 --|----|----|--------
    |          |   (repeat both phases)
    |  --------|-- Slot 3 --|----|----|--------
    |          |   (repeat both phases)

  Total messages per slot: 4 (2 round trips)

Multi-Paxos (with stable leader, skip Phase 1):

  Client    Leader      Acceptors (A1, A2, A3)
    |          |            |    |    |
    |  --------|-- Slot 1 --|----|----|--------
    |          |--Prepare-->|    |    |     Phase 1 (once)
    |          |--Prepare------->|    |
    |          |<--Promise--|    |    |
    |          |<--Promise-------|    |
    |          |--Accept--->|    |    |     Phase 2
    |          |--Accept-------->|    |
    |          |<--Accepted-|    |    |
    |          |<--Accepted------|    |
    |  --------|-- Slot 2 --|----|----|--------
    |          |--Accept--->|    |    |     Phase 2 only
    |          |--Accept-------->|    |
    |          |<--Accepted-|    |    |
    |          |<--Accepted------|    |
    |  --------|-- Slot 3 --|----|----|--------
    |          |--Accept--->|    |    |     Phase 2 only
    |          |--Accept-------->|    |
    |          |<--Accepted-|    |    |
    |          |<--Accepted------|    |

  Total messages per slot (after first): 2 (1 round trip)

The gap between the Paxos algorithm as described in academic papers and a production implementation is significant. Lamport's papers do not specify log compaction, membership changes, snapshotting, or crash recovery in detail. Google's "Paxos Made Live" (Chandra et al., 2007) documents many of these engineering challenges, including master leases, epoch numbering, group membership, and the need for a database layer on top of the consensus core.

Paxos vs. Multi-Paxos Comparison

AspectBasic PaxosMulti-Paxos
DecidesOne valueSequence of values
Round trips per decision21 (amortized, with stable leader)
Leader requiredNoEffectively yes
Livelock possibleYesNo (with stable leader)
Specification completenessFullPartial (implementation-dependent)

Raft

Raft was published by Ongaro and Ousterhout in 2014 with an explicit design goal of understandability. It decomposes consensus into three sub-problems: leader election, log replication, and safety. All client interactions go through a single leader, which simplifies reasoning about correctness.

Node State Machine

Every Raft node is in one of three states: Follower, Candidate, or Leader.

Raft Node State Transitions:

                  election timeout         receives majority
                  (no heartbeat             of votes
                   received)
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚          │──>β”‚              │──>β”‚              β”‚
  β”‚ Follower β”‚   β”‚  Candidate   β”‚   β”‚    Leader    β”‚
  β”‚          β”‚<──│              β”‚<──│              β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       ^          discovers current        |
       β”‚          leader or higher         |
       β”‚          term                     |
       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                discovers higher term

  Invariant: at most one leader exists per term.
  Terms are monotonically increasing integers.
  Any message with a higher term causes the receiver
  to update its term and revert to Follower.

Leader Election

Followers expect periodic heartbeats (empty AppendEntries RPCs) from the leader. When a follower's election timeout expires without receiving a heartbeat, it transitions to Candidate, increments its term, votes for itself, and sends RequestVote RPCs to all other nodes.

The election timeout is randomized, typically in the range of 150 to 300 ms. This randomization reduces the probability of split votes, where two candidates start elections simultaneously and neither achieves a majority.

Leader Election Sequence (5-node cluster):

Time ──────────────────────────────────────────────────>

State: All nodes are Followers in Term 1.
       Leader has crashed.

t=0     Node C: election timeout fires (t_elapsed = 223 ms)
        Node C -> Candidate, Term = 2
        Node C: votes for self (1/5)

t=1ms   Node C -> Node A: RequestVote(term=2, lastLogIdx=8, lastLogTerm=1)
        Node C -> Node B: RequestVote(term=2, lastLogIdx=8, lastLogTerm=1)
        Node C -> Node D: RequestVote(term=2, lastLogIdx=8, lastLogTerm=1)
        Node C -> Node E: RequestVote(term=2, lastLogIdx=8, lastLogTerm=1)

t=2ms   Node A: term 2 > my term 1. Update term.
                C's log (idx=8, term=1) >= my log (idx=8, term=1).
                Have not voted in term 2.
                -> Grant vote to C.  (C: 2/5)

t=2ms   Node B: same evaluation.
                -> Grant vote to C.  (C: 3/5, MAJORITY)

t=3ms   Node C: received 3 votes (self + A + B) out of 5.
                -> Becomes Leader for Term 2.
                -> Sends heartbeat (empty AppendEntries) to all nodes.

t=4ms   Node D: receives heartbeat from C with term=2.
                -> Recognizes C as leader, remains Follower, updates term.

t=5ms   Node E: receives heartbeat from C with term=2.
                -> Recognizes C as leader, remains Follower, updates term.

RequestVote RPC contents:

RequestVote {
    term:          uint64   // candidate's current term
    candidateId:   string   // candidate requesting the vote
    lastLogIndex:  uint64   // index of candidate's last log entry
    lastLogTerm:   uint64   // term of candidate's last log entry
}

Vote is granted if and only if:
  1. candidate's term >= voter's current term
  2. voter has not already voted in this term
     (or voted for this same candidate)
  3. candidate's log is at least as up-to-date as voter's log
     (compared by lastLogTerm first, then lastLogIndex)

Election timeout constraints:

The election timeout must be significantly larger than the broadcast round-trip time. A common guideline:

broadcastTime << electionTimeout << MTBF

Where:
  broadcastTime  = network round-trip time (~0.5-20 ms within a datacenter)
  electionTimeout = 150-300 ms (typical)
  MTBF           = mean time between failures for a single node

If broadcastTime approaches electionTimeout, heartbeats may not arrive
before timeouts expire, causing spurious elections and leader instability.

Log Replication

The leader appends client requests to its local log and replicates entries to followers via AppendEntries RPCs. An entry is committed once replicated to a majority. Only committed entries are applied to the state machine.

Log Replication, Step by Step:

1. Client sends write request to Leader

   Client ──> Leader: "SET x = 42"

2. Leader appends entry to local log

   Leader's Log:
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚ idx=1 β”‚ idx=2 β”‚ idx=3 β”‚ idx=4 β”‚ idx=5         β”‚
   β”‚ t=1   β”‚ t=1   β”‚ t=1   β”‚ t=2   β”‚ t=2           β”‚
   β”‚ SET   β”‚ SET   β”‚ DEL   β”‚ SET   β”‚ SET x=42  NEW β”‚
   β”‚ a=1   β”‚ b=2   β”‚ a     β”‚ c=3   β”‚               β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                     commitIndex = 4

3. Leader sends AppendEntries to all followers

   AppendEntries {
       term:         2
       leaderId:     "leader"
       prevLogIndex: 4         // index of entry preceding new entries
       prevLogTerm:  2         // term of entry at prevLogIndex
       entries:      [{idx:5, term:2, cmd:"SET x=42"}]
       leaderCommit: 4         // leader's current commit index
   }

4. Followers validate and append

   Node B (log: [1,2,3,4]):
     prevLogIndex=4, prevLogTerm=2 matches local entry at idx=4
     -> Append entry 5. Send ACK.

   Node C (log: [1,2,3,4]):
     -> Same. Append entry 5. Send ACK.

   Node D (log: [1,2,3]):
     prevLogIndex=4: no entry at index 4
     -> REJECT. Send NACK.

   Node E (log: [1,2,3,4]):
     -> Append entry 5. Send ACK.

5. Leader counts acknowledgments

   ACKs from B, C, E + self = 4/5 (majority)
   -> Advance commitIndex to 5.
   -> Apply "SET x=42" to state machine.
   -> Respond to client: success.
   -> Include commitIndex=5 in next heartbeat so followers
      advance their own commitIndex and apply the entry.

6. For Node D (behind):

   Leader decrements nextIndex[D] from 5 to 4.
   Retries AppendEntries with prevLogIndex=3.
   On success, sends entries 4 and 5.
   Node D catches up.

Consistency check mechanism:

The prevLogIndex and prevLogTerm fields in AppendEntries form an inductive consistency check. If a follower's log matches at prevLogIndex, the Log Matching Property guarantees all preceding entries also match.

When a follower rejects an AppendEntries due to a log mismatch, the leader decrements nextIndex for that follower and retries. This backtracking can be slow if the follower is many entries behind. The etcd implementation optimizes this by including a ConflictIndex and ConflictTerm in the rejection response, allowing the leader to skip directly to the divergence point.

Log Divergence and Repair:

Leader (Term 4):  [1:t1] [2:t1] [3:t2] [4:t3] [5:t3] [6:t4] [7:t4]
Follower A:       [1:t1] [2:t1] [3:t2] [4:t3] [5:t3]
Follower B:       [1:t1] [2:t1] [3:t2] [4:t3]
Follower C:       [1:t1] [2:t1] [3:t2] [4:t2] [5:t2]
                                         ^^^^   ^^^^
                                         diverges from leader

Repairing Follower C:
  Leader sends AppendEntries(prevLogIndex=6, ...)
    C: no entry at 6. REJECT(conflictIndex=5).
  Leader sends AppendEntries(prevLogIndex=4, prevLogTerm=3, ...)
    C: entry at 4 has term=2, not term=3. REJECT(conflictIndex=4, conflictTerm=2).
  Leader sends AppendEntries(prevLogIndex=3, prevLogTerm=2, entries=[4:t3, 5:t3, 6:t4, 7:t4])
    C: entry at 3 has term=2. Match.
    C: delete entries 4 and 5 (conflicting).
    C: append entries 4,5,6,7 from leader.

Follower C after repair:
  [1:t1] [2:t1] [3:t2] [4:t3] [5:t3] [6:t4] [7:t4]
  Now matches leader.

Safety: The Election Restriction

The core safety property of Raft: if a log entry has been committed in a given term, that entry will be present in the logs of all leaders for all higher-numbered terms.

This is enforced by the election restriction. A voter rejects a RequestVote if the candidate's log is less up-to-date than its own. "Up-to-date" is determined by comparing the term and index of the last log entry.

Election Restriction Proof Sketch:

Given: Entry E at index 5, term 2 is committed
       (replicated to nodes A, B, C in a 5-node cluster)

Claim: No future leader can lack entry E.

  Nodes with E: {A, B, C}     (3 nodes, a majority)
  Any majority:                (requires >= 3 nodes)

  For any candidate to win election:
    It needs votes from >= 3 nodes.
    Any set of 3 nodes intersects {A, B, C} by at least 1 node.
    That intersecting node has entry E and will reject
    any candidate whose log does not contain E.

  Therefore, every future leader must have entry E.

  Formal basis: quorum intersection property.
  Two majorities of a set of size N must overlap by
  at least 1 element (since each has > N/2 elements).

  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚         All 5 nodes: {A,B,C,D,E}       β”‚
  β”‚                                         β”‚
  β”‚  Committed to: {A, B, C}  (majority)   β”‚
  β”‚                  ∩                      β”‚
  β”‚  Any majority:  {?, ?, ?}  (>= 3)      β”‚
  β”‚                  = at least 1 common    β”‚
  β”‚                                         β”‚
  β”‚  That common node enforces the          β”‚
  β”‚  election restriction.                  β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Leader Crash During Replication

When a leader crashes mid-replication, the following cases arise depending on how many followers received the entry:

Leader Crash Scenarios:

Scenario 1: Entry replicated to majority before crash

  Leader (crashed): [1] [2] [3] [4*]    (* = uncommitted)
  Node B:           [1] [2] [3] [4*]
  Node C:           [1] [2] [3] [4*]
  Node D:           [1] [2] [3]
  Node E:           [1] [2] [3]

  Entry 4 reached B and C (majority with leader = 3/5).
  Leader crashed before advancing commitIndex.

  New election: B or C can become leader (up-to-date log).
  New leader has entry 4. It will be re-replicated and committed.
  Entry 4 is preserved.

Scenario 2: Entry replicated to minority before crash

  Leader (crashed): [1] [2] [3] [4*]
  Node B:           [1] [2] [3] [4*]
  Node C:           [1] [2] [3]
  Node D:           [1] [2] [3]
  Node E:           [1] [2] [3]

  Entry 4 reached only B (2/5 including leader, not majority).
  Entry was never committed.

  New election: C, D, or E could become leader.
  If C becomes leader with a new entry 4 in term 3:
    B's entry 4 (term 2) gets overwritten.
  Uncommitted entry is lost. This is correct behavior,
  as the client never received an acknowledgment.

Scenario 3: Leader crashes after committing but before responding

  Leader committed entry 4, applied to state machine,
  then crashed before sending response to client.

  Client does not know if write succeeded.
  Client must retry with an idempotency key.
  New leader has the committed entry.
  Write is durable. Retry is a no-op (with idempotency).

Linearizable Reads

Achieving linearizable reads in a Raft-based system requires more than simply reading from the leader's state machine. A leader that has been partitioned from the majority may not know it has been superseded.

Stale Read Problem:

  Node A: leader, term 5
  Network partition isolates A from {B, C, D, E}

  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β”
  β”‚  A      β”‚  X  X  X β”‚  B      β”‚ β”‚ C β”‚ β”‚ D β”‚ β”‚ E β”‚
  β”‚         β”‚          β”‚         β”‚ β”‚   β”‚ β”‚   β”‚ β”‚   β”‚
  β”‚ term=5  β”‚  no link β”‚ elected β”‚ β”‚   β”‚ β”‚   β”‚ β”‚   β”‚
  β”‚ "I am   β”‚          β”‚ leader  β”‚ β”‚   β”‚ β”‚   β”‚ β”‚   β”‚
  β”‚  leader"β”‚          β”‚ term=6  β”‚ β”‚   β”‚ β”‚   β”‚ β”‚   β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜

  Client reads from A. A returns its local state.
  Meanwhile B (actual leader, term 6) has committed new writes.
  Client received stale data. Linearizability violated.

Three approaches address this problem:

ReadIndex

Before serving a read, the leader confirms it still holds leadership by broadcasting a heartbeat and waiting for majority acknowledgment. This adds one network round trip per read (or per batch of reads).

// ReadIndex approach (used by etcd by default)
func (s *RaftServer) LinearizableRead(key string) ([]byte, error) {
    // 1. Record current commit index as the read index
    readIndex := s.raft.CommitIndex()
 
    // 2. Confirm leadership by sending a round of heartbeats
    if err := s.raft.ConfirmLeadership(); err != nil {
        return nil, ErrNotLeader
    }
 
    // 3. Wait for state machine to catch up to readIndex
    //    (committed entries may not yet be applied)
    if err := s.waitForApply(readIndex); err != nil {
        return nil, err
    }
 
    // 4. Read from state machine
    return s.stateMachine.Get(key), nil
}

Lease-Based Reads

The leader maintains a time-based lease. After receiving heartbeat acknowledgments from a majority, the leader knows that no election can succeed for at least the election timeout duration (since followers reset their election timers on heartbeat receipt). Reads can be served locally without additional RPCs as long as the lease is valid.

// Lease-based reads (used by CockroachDB, TiKV)
func (s *RaftServer) LeaseRead(key string) ([]byte, error) {
    // Check if the leader lease is still valid
    // Lease duration < election timeout
    if time.Now().Before(s.leaseExpiry) {
        return s.stateMachine.Get(key), nil
    }
    // Lease expired. Fall back to ReadIndex or step down.
    return nil, ErrLeaseExpired
}

This approach depends on bounded clock skew between nodes. If a follower's clock runs fast, it may start an election before the leader's lease has expired from the leader's perspective.

Lease-Based Read Timing:

Leader sends heartbeat ──> Followers reset election timer
  |                              |
  |  lease_start = now()         |  election_timeout starts
  |  lease_end = now() +         |
  |    (election_timeout -       |
  |     max_clock_drift)         |
  |                              |
  |<──── lease valid ───────>|   |
  |                          |   |<── earliest possible election
  |                          |   |
  ──────────────────────────────────────────> time

As long as max_clock_drift is correctly bounded,
no election can start while the lease is valid.

Follower Reads

For read-heavy workloads, directing all reads through the leader creates a bottleneck. Follower reads distribute read load across the cluster.

Follower Read Protocol:

  Client ──> Follower C: "Read key x"

  Follower C ──> Leader: "What is your current commit index?"
  Leader ──> Follower C: "commitIndex = 47"

  Follower C: wait until local applied index >= 47
  Follower C: read x from local state machine
  Follower C ──> Client: value of x

  This provides linearizability at the point in time when the
  leader responded with its commit index.

  Performance benefit:
    Read throughput scales with number of nodes.
    Write throughput remains limited by the leader.

Read Approach Comparison

ApproachExtra RTTs per ReadClock DependencyImplementation Complexity
ReadIndex1 (heartbeat round)NoneLow
Lease-based0Requires bounded clock skewMedium
Follower read1 (query leader)NoneMedium
Quorum read1 (read from majority)NoneHigh

Google Spanner: TrueTime and External Consistency

Spanner provides external consistency: if transaction T1 commits before transaction T2 starts (in absolute wall-clock time), then T1's commit timestamp is less than T2's commit timestamp. This property holds globally across all data centers.

TrueTime

Spanner's TrueTime API exposes clock uncertainty explicitly.

TrueTime API:

  TT.now() -> TTinterval { earliest, latest }

  Guarantee: earliest <= absolute_time <= latest

  The uncertainty interval (epsilon = latest - earliest)
  depends on time since last synchronization with a
  TrueTime master.

  TrueTime masters contain:
    - GPS receivers (synchronized to GPS time)
    - Atomic clocks (free-running, independent of GPS)
    GPS and atomic clocks have uncorrelated failure modes.

  Typical epsilon values:
    Immediately after sync:   ~1 ms
    Between syncs:            1-7 ms (sawtooth pattern)
    Average:                  ~4 ms

  Comparison with other time sources:
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ Time Source         β”‚ Typical Uncertainty  β”‚
  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
  β”‚ TrueTime (Spanner)  β”‚ 1-7 ms              β”‚
  β”‚ NTP (well-tuned)    β”‚ 1-10 ms             β”‚
  β”‚ NTP (default)       β”‚ 50-500 ms           β”‚
  β”‚ Unsynced clocks     β”‚ seconds to minutes  β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Commit Wait Protocol

Spanner's commit protocol uses TrueTime to assign globally meaningful timestamps.

Spanner Commit Protocol:

  1. Transaction T executes (reads, writes, lock acquisition)
  2. Coordinator acquires Paxos-replicated locks
  3. Coordinator picks commit timestamp:
       s = TT.now().latest
       (guarantees s >= absolute_time at this moment)

  4. COMMIT WAIT:
       Wait until TT.now().earliest > s

       After this wait, absolute_time > s.
       Any future transaction starting after this point
       will observe TT.now().earliest > s, and will
       therefore pick a timestamp > s.

  5. Release locks and apply the commit.

  Timeline:

  ──────────────────────────────────────────────────> time
      |              |                  |
    T picks       commit-wait         T releases
    s = t1        interval            locks
                  (~7 ms avg)         commit visible
                                      |
                                      Any T2 starting here
                                      will get timestamp > t1

  The commit-wait duration equals the TrueTime uncertainty
  at the time of commit. Average: ~7 ms. This is the cost
  of external consistency.

Lock-Free Snapshot Reads

Because every committed write has a globally meaningful timestamp, Spanner supports snapshot reads at any past timestamp without locking or coordination.

Snapshot Read:

  Client: "Read accounts A, B, C as of timestamp T=1000"

  Any Spanner replica (need not be leader):
    1. Check: is my safe_time >= 1000?
       (safe_time = timestamp up to which all writes are applied)
    2. If yes: read A, B, C from local versioned storage at T=1000.
       No locks acquired. No coordination with leader.
    3. If no: wait for replication to advance safe_time past 1000.

  This enables:
    - Read-only transactions with zero lock contention
    - Consistent global snapshots for analytics
    - Time-travel queries (read data as it was N seconds ago)

  Trade-off: snapshot data is at least epsilon old
  (where epsilon is the TrueTime uncertainty).

CockroachDB: Raft-Based Distributed SQL

CockroachDB uses Raft for replication within ranges (contiguous spans of the key space). Each range (default 512 MB) is a Raft group with typically 3 or 5 replicas.

CockroachDB Architecture:

  Key Space: [min_key ────────────────────────── max_key]

  Divided into ranges:
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ Range 1 β”‚ β”‚ Range 2 β”‚ β”‚ Range 3 β”‚ β”‚ Range 4 β”‚
  β”‚ [a-f)   β”‚ β”‚ [f-m)   β”‚ β”‚ [m-s)   β”‚ β”‚ [s-z)   β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
      β”‚            β”‚            β”‚            β”‚
      β”‚  Each range is a separate Raft group β”‚
      v            v            v            v
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”
  β”‚Node 1 β”‚   β”‚Node 2 β”‚   β”‚Node 1 β”‚   β”‚Node 3 β”‚
  β”‚Node 2 β”‚   β”‚Node 3 β”‚   β”‚Node 3 β”‚   β”‚Node 1 β”‚  (replicas)
  β”‚Node 3 β”‚   β”‚Node 1 β”‚   β”‚Node 2 β”‚   β”‚Node 2 β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”˜

  Each range has its own Raft leader.
  Leaders are distributed across nodes for load balancing.
  Transactions spanning multiple ranges use parallel consensus
  with a two-phase commit (2PC) protocol across range leaders.

CockroachDB uses a hybrid logical clock (HLC) for timestamp ordering. HLCs combine a physical clock component with a logical counter, providing causality tracking without requiring specialized hardware like Spanner's TrueTime.

Hybrid Logical Clock (HLC):

  HLC = (physical_time, logical_counter)

  Comparison: (pt1, lc1) < (pt2, lc2)
    if pt1 < pt2, or (pt1 == pt2 and lc1 < lc2)

  On local event:
    pt = max(hlc.pt, wall_clock)
    if pt == hlc.pt: lc = hlc.lc + 1
    else:            lc = 0
    hlc = (pt, lc)

  On receiving message with timestamp (msg_pt, msg_lc):
    pt = max(hlc.pt, msg_pt, wall_clock)
    if pt == hlc.pt == msg_pt: lc = max(hlc.lc, msg_lc) + 1
    elif pt == hlc.pt:         lc = hlc.lc + 1
    elif pt == msg_pt:         lc = msg_lc + 1
    else:                      lc = 0
    hlc = (pt, lc)

  Trade-off vs. TrueTime:
    HLC: no special hardware, but cannot bound uncertainty
         without clock skew assumptions
    TrueTime: requires GPS/atomic clocks, but provides
              bounded uncertainty with formal guarantees

etcd: Raft Reference Implementation

etcd is widely used for service discovery, configuration management, and distributed locking. It implements Raft using the etcd/raft library in Go, which provides a clean separation between the Raft state machine and the application layer.

Key implementation details:

etcd Raft Implementation Characteristics:

  Write path:
    Client -> gRPC -> Leader -> Raft proposal
    -> WAL write (fsync) -> Replicate to followers
    -> Followers WAL write (fsync) -> ACK to leader
    -> Commit -> Apply to boltdb -> Respond to client

  Performance characteristics (3-node, NVMe SSD, same DC):
    Sequential writes:     ~10,000 ops/sec (no batching)
    Batched writes:        ~50,000-100,000 ops/sec
    Linearizable reads:    ~30,000 ops/sec (ReadIndex)
    Serializable reads:    ~100,000+ ops/sec (local follower)

  Default configuration:
    heartbeat-interval:    100 ms
    election-timeout:      1000 ms (10x heartbeat)
    snapshot-count:        100,000 (entries before snapshot)
    max-request-bytes:     1.5 MB
    quota-backend-bytes:   2 GB (default storage limit)

  WAL (Write-Ahead Log):
    Format: sequential binary records
    Sync policy: fsync after every batch
    Segment size: 64 MB
    Location: dedicated disk recommended for latency isolation

Membership Changes

Adding or removing nodes from a consensus cluster is a source of subtle bugs. The core risk: if different nodes have different views of cluster membership, they may disagree on what constitutes a majority, potentially allowing two leaders to be elected simultaneously.

Joint Consensus (Raft Dissertation)

The Raft dissertation describes a two-phase approach: the cluster transitions through a joint configuration C_old,new that requires agreement from majorities of both the old and new configurations.

Joint Consensus (two-phase membership change):

Phase 1: Transition to joint configuration C_{old,new}
  Leader replicates C_{old,new} entry.
  Decisions require majorities from BOTH old AND new configs.

  Old config: {A, B, C}         majority = 2
  New config: {A, B, C, D, E}   majority = 3
  Joint:      requires 2 from {A,B,C} AND 3 from {A,B,C,D,E}

Phase 2: Transition to new configuration C_new
  Once C_{old,new} is committed, leader replicates C_new entry.
  After C_new is committed, old-config-only nodes can be shut down.

Timeline:
  ──────────────────────────────────────────────────────> time
       |                    |                    |
    C_old only           C_{old,new}           C_new only
    majority=2           requires both         majority=3
                         majorities

Single-Node Changes (Practical Approach)

Most production systems, including etcd, use single-node membership changes. Adding or removing one node at a time guarantees that any two majorities (old and new) overlap, preventing split-brain.

Single-Node Change Safety:

  {A, B, C}       -> {A, B, C, D}
  majority = 2       majority = 3

  Any majority of {A,B,C}:   {A,B}, {A,C}, {B,C}
  Any majority of {A,B,C,D}: {A,B,C}, {A,B,D}, {A,C,D}, {B,C,D}

  Every pair intersects by at least 1 node. No split-brain possible.

  To change from 3 nodes to 5 nodes:
    Step 1: {A,B,C} -> {A,B,C,D}         (add D)
    Step 2: {A,B,C,D} -> {A,B,C,D,E}     (add E)

  Each step is individually safe.

Log Compaction and Snapshotting

The Raft log grows without bound unless compacted. Log compaction replaces a prefix of the log with a snapshot of the state machine at that point.

Log Compaction:

Before:
  Log:      [1] [2] [3] [4] [5] [6] [7] [8] [9] [10]
  Applied:   *   *   *   *   *   *   *
  Committed: *   *   *   *   *   *   *   *   *
                                      ^
                                  snapshot point

After snapshot at index 7:
  Snapshot: { state_machine_state_at_index_7, last_included_index=7, last_included_term=2 }
  Log:      [8] [9] [10]

  Entries 1-7 are discarded. The snapshot captures their
  cumulative effect on the state machine.

InstallSnapshot RPC (for slow followers or new nodes):

  Leader ──> Slow Follower: InstallSnapshot {
      term:               current leader term
      leaderId:           leader node ID
      lastIncludedIndex:  7
      lastIncludedTerm:   2
      data:               [serialized state machine snapshot]
  }

  Follower:
    1. Discard entire log (or entries up to lastIncludedIndex)
    2. Load snapshot into state machine
    3. Resume normal AppendEntries replication from index 8

Snapshot implementation strategies:

StrategyUsed ByMechanismTrade-off
fork() + serializeRedis, some etcd configsOS copy-on-write via forkSimple, but memory spike risk
LSM-tree checkpointCockroachDB (RocksDB/Pebble)Native point-in-time snapshotEfficient, no extra memory
Incremental snapshotTiKVStream changes since last snapshotLower I/O, more complex

Disk I/O and Write Performance

Consensus protocols require durable writes (fsync) before acknowledging entries. Disk I/O latency directly bounds throughput.

Write Latency Breakdown (5-node cluster, same datacenter):

Component                    NVMe SSD     SATA SSD     HDD
─────────────────────────────────────────────────────────────
Network (client to leader)   0.5 ms       0.5 ms       0.5 ms
Leader WAL fsync             0.05 ms      0.5 ms       5 ms
Network (leader to followers) 0.5 ms      0.5 ms       0.5 ms
Follower WAL fsync           0.05 ms      0.5 ms       5 ms
Network (follower ACK)       0.5 ms       0.5 ms       0.5 ms
Apply + respond              0.1 ms       0.1 ms       0.1 ms
─────────────────────────────────────────────────────────────
Total (single entry)         ~1.7 ms      ~2.6 ms      ~11.6 ms
Theoretical max ops/sec      ~590         ~385         ~86

With batching (100 entries per fsync):
Amortized per entry          ~0.17 ms     ~0.26 ms     ~1.16 ms
Theoretical max ops/sec      ~59,000      ~38,500      ~8,600

Cross-datacenter (leader to remote follower RTT = 40 ms):
Total (single entry)         ~81 ms       ~82 ms       ~91 ms

Batching is critical for production throughput.

etcd batching behavior:

etcd batches multiple proposals into a single Raft round when they arrive within the same event loop tick (~1 ms). Under high load, this naturally groups entries, reducing the number of fsyncs per second.

Monitoring Consensus Clusters

Key metrics for operating a Raft-based system:

Critical Monitoring Metrics:

1. Leader elections per hour
   Normal:    0
   Warning:   > 0 (investigate cause)
   Critical:  Continuous flapping

   Common causes of unexpected elections:
     - Disk latency spikes (fsync exceeding heartbeat interval)
     - Network partitions or packet loss
     - CPU saturation delaying heartbeat processing
     - GC pauses exceeding election timeout (JVM-based systems)

2. Raft proposal apply latency (p99)
   Normal:    < 10 ms
   Warning:   > 50 ms (state machine cannot keep up)
   Critical:  > 500 ms (risk of OOM from log growth)

3. Entries behind leader (per follower)
   Normal:    0-10
   Warning:   > 1000 (follower falling behind)
   Critical:  > snapshot-count (will require snapshot transfer)

4. WAL fsync duration (p99)
   Normal:    < 1 ms (SSD)
   Warning:   > 10 ms
   Critical:  > heartbeat-interval (will cause elections)

5. Proposal failure rate
   Normal:    0%
   Warning:   > 0% (leader may be losing quorum)

6. Network RTT between nodes (p99)
   Requirement: RTT << election-timeout
   Rule of thumb: RTT < election-timeout / 10

System Comparison

PropertyetcdCockroachDBSpannerZooKeeperTiKV
ConsensusRaftRaft (per-range)PaxosZabRaft (per-region)
Data modelKey-valueSQL (relational)SQL (relational)Hierarchical (znodes)Key-value + txn
ConsistencyLinearizableSerializable (SSI)ExternalLinearizableLinearizable
Clock mechanismNoneHLCTrueTime (GPS/atomic)NoneHLC or TSO
Replication unitEntire storeRange (512 MB)Split (~8 GB)Entire storeRegion (96 MB)
Typical write latency10-20 ms20-50 ms10-15 ms (commit-wait)10-30 ms5-15 ms
Max data size~8 GB (recommended)Terabytes+Petabytes+Hundreds of MBTerabytes+
Primary use caseCoordination, configOLTP, multi-region SQLGlobal OLTPCoordination, lockingStorage engine for TiDB

Failure Mode Reference

Common Failure Modes and System Behavior:

Failure: Single follower crashes
  Impact: None if quorum maintained
  Recovery: Node restarts, replays WAL, catches up via AppendEntries
  Duration: Seconds to minutes depending on log gap

Failure: Leader crashes
  Impact: Brief unavailability during election (~election-timeout)
  Recovery: New leader elected, uncommitted entries may be lost
  Duration: 150-300 ms (typical election timeout) + election round

Failure: Minority partition
  Impact: Partitioned nodes cannot serve writes
  Recovery: Automatic when partition heals
  Duration: Duration of partition

Failure: Majority partition (quorum lost)
  Impact: Entire cluster unavailable for writes
  Recovery: Requires partition heal or manual intervention
  Duration: Until quorum restored

Failure: Disk full on leader
  Impact: Leader cannot append to WAL, cannot send heartbeats
  Recovery: Free disk space or replace node
  Risk: Leader steps down, election occurs

Failure: Slow disk on one node
  Impact: That node falls behind. If it is the leader,
          heartbeats are delayed, causing elections.
          Cluster may thrash between leaders.
  Recovery: Replace disk or remove node from cluster

Failure: Clock skew (lease-based reads)
  Impact: Stale reads possible if skew exceeds lease margin
  Recovery: Fix NTP configuration, restart affected nodes
  Risk: Silent data staleness (no error returned to client)

Failure: Full network partition (split-brain)
  Impact: CP systems: minority side unavailable
          AP systems: divergent writes, conflicts on heal
  Recovery: CP: automatic. AP: conflict resolution required.

Raft Implementation in Go (Simplified)

The following simplified implementation illustrates the core Raft data structures and RPCs:

package raft
 
type NodeState int
 
const (
    Follower NodeState = iota
    Candidate
    Leader
)
 
type LogEntry struct {
    Term    uint64
    Index   uint64
    Command []byte
}
 
type RaftNode struct {
    mu sync.Mutex
 
    id          string
    state       NodeState
    currentTerm uint64
    votedFor    string
    log         []LogEntry
 
    commitIndex uint64
    lastApplied uint64
 
    // Leader state (reinitialized after election)
    nextIndex  map[string]uint64  // for each follower: next log index to send
    matchIndex map[string]uint64  // for each follower: highest known replicated index
 
    peers            []string
    electionTimeout  time.Duration
    heartbeatTicker  *time.Ticker
    electionTimer    *time.Timer
}
 
type AppendEntriesRequest struct {
    Term         uint64
    LeaderID     string
    PrevLogIndex uint64
    PrevLogTerm  uint64
    Entries      []LogEntry
    LeaderCommit uint64
}
 
type AppendEntriesResponse struct {
    Term          uint64
    Success       bool
    ConflictIndex uint64  // optimization: hint for faster log repair
    ConflictTerm  uint64
}
 
type RequestVoteRequest struct {
    Term         uint64
    CandidateID  string
    LastLogIndex uint64
    LastLogTerm  uint64
}
 
type RequestVoteResponse struct {
    Term        uint64
    VoteGranted bool
}
 
// AppendEntries handles incoming AppendEntries RPCs (follower side)
func (n *RaftNode) AppendEntries(req AppendEntriesRequest, resp *AppendEntriesResponse) {
    n.mu.Lock()
    defer n.mu.Unlock()
 
    resp.Term = n.currentTerm
    resp.Success = false
 
    // Reply false if term < currentTerm
    if req.Term < n.currentTerm {
        return
    }
 
    // If RPC term is higher, update term and convert to follower
    if req.Term > n.currentTerm {
        n.currentTerm = req.Term
        n.state = Follower
        n.votedFor = ""
    }
 
    // Reset election timer (leader is alive)
    n.resetElectionTimer()
 
    // Check if log contains entry at prevLogIndex with prevLogTerm
    if req.PrevLogIndex > 0 {
        if req.PrevLogIndex > uint64(len(n.log)) {
            resp.ConflictIndex = uint64(len(n.log)) + 1
            return
        }
        if n.log[req.PrevLogIndex-1].Term != req.PrevLogTerm {
            resp.ConflictTerm = n.log[req.PrevLogIndex-1].Term
            // Find first index of conflicting term
            for i := req.PrevLogIndex - 1; i > 0; i-- {
                if n.log[i-1].Term != resp.ConflictTerm {
                    resp.ConflictIndex = i + 1
                    break
                }
            }
            return
        }
    }
 
    // Append new entries, removing any conflicting entries
    for i, entry := range req.Entries {
        idx := req.PrevLogIndex + uint64(i) + 1
        if idx <= uint64(len(n.log)) {
            if n.log[idx-1].Term != entry.Term {
                n.log = n.log[:idx-1] // truncate conflicting entries
                n.log = append(n.log, req.Entries[i:]...)
                break
            }
        } else {
            n.log = append(n.log, req.Entries[i:]...)
            break
        }
    }
 
    // Update commit index
    if req.LeaderCommit > n.commitIndex {
        lastNewIndex := req.PrevLogIndex + uint64(len(req.Entries))
        if req.LeaderCommit < lastNewIndex {
            n.commitIndex = req.LeaderCommit
        } else {
            n.commitIndex = lastNewIndex
        }
    }
 
    resp.Success = true
}

References

Relevant papers and systems documentation:

  • Lamport, L. "The Part-Time Parliament." ACM TOCS, 1998.
  • Lamport, L. "Paxos Made Simple." ACM SIGACT News, 2001.
  • Ongaro, D. and Ousterhout, J. "In Search of an Understandable Consensus Algorithm." USENIX ATC, 2014.
  • Chandra, T., Griesemer, R., and Redstone, J. "Paxos Made Live: An Engineering Perspective." PODC, 2007.
  • Corbett, J. et al. "Spanner: Google's Globally-Distributed Database." OSDI, 2012.
  • Taft, R. et al. "CockroachDB: The Resilient Geo-Distributed SQL Database." SIGMOD, 2020.
  • Gilbert, S. and Lynch, N. "Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services." ACM SIGACT News, 2002.
  • Abadi, D. "Consistency Tradeoffs in Modern Distributed Database System Design." IEEE Computer, 2012. (PACELC)
  • Kingsbury, K. Jepsen: Distributed Systems Safety Analysis. https://jepsen.io/