The CAP Theorem
The CAP theorem, proved by Gilbert and Lynch in 2002 (based on Brewer's 2000 conjecture), states that a distributed data store can satisfy at most two of three properties simultaneously:
- Consistency (C): Every read receives the most recent write or an error.
- Availability (A): Every request receives a non-error response, without guarantee that it reflects the most recent write.
- Partition Tolerance (P): The system continues to operate despite an arbitrary number of messages being dropped or delayed between nodes.
Network partitions are inevitable in any system spanning multiple machines. Hardware fails, switches get misconfigured, cables get disconnected. Partition tolerance is therefore not optional. The real engineering choice reduces to CP (consistency + partition tolerance) or AP (availability + partition tolerance).
Network Partition Between Two Groups:
ββββββββββββββββββββ ββββββββββββββββββββ
β Partition A β X X X β Partition B β
β β β β
β Node 1 β link down β Node 4 β
β Node 2 β β Node 5 β
β Node 3 β β β
β β β β
β (3 nodes) β β (2 nodes) β
β (majority) β β (minority) β
ββββββββββββββββββββ ββββββββββββββββββββ
CP behavior (e.g., etcd):
Partition A: continues serving reads and writes (has quorum)
Partition B: rejects writes, returns errors to clients
After heal: Partition B replays the leader log and converges
AP behavior (e.g., Cassandra with CL=ONE):
Partition A: accepts writes (e.g., SET x = 5)
Partition B: accepts writes (e.g., SET x = 7)
Both succeed. Both clients receive acknowledgments.
After heal: conflict resolution required
(last-write-wins, vector clocks, CRDTs)
The choice between CP and AP is not necessarily system-wide. Different categories of data within the same application can use different consistency models. Financial balances typically require CP guarantees, while session caches or activity feeds can tolerate AP semantics with eventual convergence.
Eventual Consistency Windows
The term "eventually consistent" describes a spectrum rather than a single behavior. The critical variable is the duration of the inconsistency window.
Write event Convergence
| |
v v
ββββ*βββββββββββββββββββββββββββββββββββββββββββββ*ββββ> time
|<βββββββ inconsistency window ββββββββββββββ>|
| |
| During this interval, different nodes |
| may return different values for the |
| same key. |
| |
| Typical durations: |
| Single datacenter: 1-50 ms |
| Cross-datacenter: 50-500 ms |
| Under load/faults: seconds to minutes |
Acceptable staleness depends on the data type:
| Data Category | Staleness Tolerance | Typical Store |
|---|---|---|
| Financial transactions | None | CP (Spanner, CockroachDB) |
| User authentication state | None | CP (etcd, ZooKeeper) |
| Social media feeds | Seconds | AP (Cassandra, DynamoDB) |
| Analytics counters | Minutes | AP (Redis, DynamoDB) |
| Search indexes | Minutes to hours | AP (Elasticsearch) |
PACELC: A More Complete Model
The PACELC theorem extends CAP by addressing behavior during normal (non-partitioned) operation. During a Partition, choose Availability or Consistency. Else (normal operation), choose Latency or Consistency.
This captures a tradeoff that CAP ignores: even without partitions, enforcing consistency requires coordination between nodes, which increases latency.
PACELC Classification of Production Systems:
ββββββββββββββββ¬βββββββββββββββββββββββ¬ββββββββββββββββββββββββ
β System β During Partition β Normal Operation β
β β (choose A or C) β (choose L or C) β
ββββββββββββββββΌβββββββββββββββββββββββΌββββββββββββββββββββββββ€
β Spanner β C (reject minority) β C (commit-wait ~7ms) β
β CockroachDB β C (reject minority) β C (quorum writes) β
β etcd β C (reject minority) β C (Raft quorum) β
β ZooKeeper β C (reject minority) β C (Zab quorum) β
β Cassandra β A (accept both) β L (local reads) β
β DynamoDB β A (accept both) β L (eventual default) β
β MongoDB β Configurable β Configurable β
ββββββββββββββββ΄βββββββββββββββββββββββ΄ββββββββββββββββββββββββ
CP/EC systems pay latency cost on every operation, not
only during partitions. This is the fundamental cost of
strong consistency.
Paxos
Leslie Lamport published the Paxos algorithm in 1998 ("The Part-Time Parliament") and simplified the presentation in 2001 ("Paxos Made Simple"). It remains the foundation of consensus in systems like Google Spanner, Chubby, and Megastore.
Single-Decree Paxos (Basic Paxos)
Basic Paxos solves the problem of getting a set of nodes to agree on a single value. Three roles are defined: Proposers, Acceptors, and Learners. In practice, a single node typically serves all three roles.
The protocol operates in two phases.
Phase 1: Prepare
The proposer selects a proposal number n and sends a Prepare(n) request to a majority of acceptors. Each acceptor that has not already promised to a higher-numbered proposal responds with a Promise(n) and includes any value it has previously accepted.
Phase 2: Accept
If the proposer receives promises from a majority, it sends an Accept(n, v) request. The value v is either the value from the highest-numbered previously accepted proposal (if any), or the proposer's own value. Acceptors accept the proposal unless they have already promised to a higher number.
Basic Paxos, Two-Phase Protocol:
PHASE 1 (Prepare):
Proposer Acceptors
ββββββββββ ββββββ ββββββ ββββββ
β β β β β β β β
βPrepare ββββββββ>β A1 β β β β β
β n=1 βββββββββββββ>β A2 β β β
β ββββββββββββββββββ>β A3 β
β β β β β β β β
β β<ββββββββPromβ β β β β
β β<βββββββββββββPromβ β β
β β<ββββββββββββββββββPromβ
ββββββββββ ββββββ ββββββ ββββββ
Each acceptor responds:
"I will not accept proposals numbered less than 1."
"The highest-numbered proposal I have accepted is: (none)"
PHASE 2 (Accept):
Proposer Acceptors
ββββββββββ ββββββ ββββββ ββββββ
β β β β β β β β
βAccept ββββββββ>β A1 β β β β β
β n=1 βββββββββββββ>β A2 β β β
β v="X" ββββββββββββββββββ>β A3 β
β β β β β β β β
β β<ββββββββAcc β β β β β
β β<βββββββββββββAcc β β β
β β<ββββββββββββββββββAcc β
ββββββββββ ββββββ ββββββ ββββββ
Majority accepts. Value "X" is chosen.
Dueling Proposers and Livelock
When multiple proposers operate concurrently, their prepare phases can invalidate each other, preventing either from completing.
Livelock Scenario (two concurrent proposers):
Time ββββββββββββββββββββββββββββββββββββββββββββββββββββββ>
P1: Prepare(n=1) ββ> Acceptors promise n >= 1
|
P2: Prepare(n=2) ββ> Acceptors promise n >= 2
| (invalidates P1)
|
P1: Accept(n=1, "X") ββ> REJECTED (acceptors promised n >= 2)
|
P1: Prepare(n=3) ββ> Acceptors promise n >= 3
| (invalidates P2)
|
P2: Accept(n=2, "Y") ββ> REJECTED (acceptors promised n >= 3)
|
P2: Prepare(n=4) ββ> Acceptors promise n >= 4
| (invalidates P1)
...
Neither proposer ever gets a value accepted.
This can continue indefinitely.
Mitigations:
1. Randomized exponential backoff before retrying
2. Elect a distinguished proposer (leader)
3. Use Multi-Paxos with a stable leader
Multi-Paxos
Single-decree Paxos decides one value. Real systems need to agree on a sequence of values (a replicated log). Multi-Paxos runs one Paxos instance per log slot, with an optimization: a stable leader can skip Phase 1 for consecutive slots after establishing leadership once.
Multi-Paxos Message Flow:
Single-decree (3 instances, no leader optimization):
Client Proposer Acceptors (A1, A2, A3)
| | | | |
| --------|-- Slot 1 --|----|----|--------
| |--Prepare-->| | | Phase 1
| |--Prepare------->| |
| |--Prepare------------>|
| |<--Promise--| | |
| |<--Promise-------| |
| |--Accept--->| | | Phase 2
| |--Accept-------->| |
| |<--Accepted-| | |
| |<--Accepted------| |
| --------|-- Slot 2 --|----|----|--------
| | (repeat both phases)
| --------|-- Slot 3 --|----|----|--------
| | (repeat both phases)
Total messages per slot: 4 (2 round trips)
Multi-Paxos (with stable leader, skip Phase 1):
Client Leader Acceptors (A1, A2, A3)
| | | | |
| --------|-- Slot 1 --|----|----|--------
| |--Prepare-->| | | Phase 1 (once)
| |--Prepare------->| |
| |<--Promise--| | |
| |<--Promise-------| |
| |--Accept--->| | | Phase 2
| |--Accept-------->| |
| |<--Accepted-| | |
| |<--Accepted------| |
| --------|-- Slot 2 --|----|----|--------
| |--Accept--->| | | Phase 2 only
| |--Accept-------->| |
| |<--Accepted-| | |
| |<--Accepted------| |
| --------|-- Slot 3 --|----|----|--------
| |--Accept--->| | | Phase 2 only
| |--Accept-------->| |
| |<--Accepted-| | |
| |<--Accepted------| |
Total messages per slot (after first): 2 (1 round trip)
The gap between the Paxos algorithm as described in academic papers and a production implementation is significant. Lamport's papers do not specify log compaction, membership changes, snapshotting, or crash recovery in detail. Google's "Paxos Made Live" (Chandra et al., 2007) documents many of these engineering challenges, including master leases, epoch numbering, group membership, and the need for a database layer on top of the consensus core.
Paxos vs. Multi-Paxos Comparison
| Aspect | Basic Paxos | Multi-Paxos |
|---|---|---|
| Decides | One value | Sequence of values |
| Round trips per decision | 2 | 1 (amortized, with stable leader) |
| Leader required | No | Effectively yes |
| Livelock possible | Yes | No (with stable leader) |
| Specification completeness | Full | Partial (implementation-dependent) |
Raft
Raft was published by Ongaro and Ousterhout in 2014 with an explicit design goal of understandability. It decomposes consensus into three sub-problems: leader election, log replication, and safety. All client interactions go through a single leader, which simplifies reasoning about correctness.
Node State Machine
Every Raft node is in one of three states: Follower, Candidate, or Leader.
Raft Node State Transitions:
election timeout receives majority
(no heartbeat of votes
received)
ββββββββββββ ββββββββββββββββ ββββββββββββββββ
β βββ>β βββ>β β
β Follower β β Candidate β β Leader β
β β<βββ β<βββ β
ββββββββββββ ββββββββββββββββ ββββββββββββββββ
^ discovers current |
β leader or higher |
β term |
βββββββββββββββββββββββββββββββββββββ
discovers higher term
Invariant: at most one leader exists per term.
Terms are monotonically increasing integers.
Any message with a higher term causes the receiver
to update its term and revert to Follower.
Leader Election
Followers expect periodic heartbeats (empty AppendEntries RPCs) from the leader. When a follower's election timeout expires without receiving a heartbeat, it transitions to Candidate, increments its term, votes for itself, and sends RequestVote RPCs to all other nodes.
The election timeout is randomized, typically in the range of 150 to 300 ms. This randomization reduces the probability of split votes, where two candidates start elections simultaneously and neither achieves a majority.
Leader Election Sequence (5-node cluster):
Time ββββββββββββββββββββββββββββββββββββββββββββββββββ>
State: All nodes are Followers in Term 1.
Leader has crashed.
t=0 Node C: election timeout fires (t_elapsed = 223 ms)
Node C -> Candidate, Term = 2
Node C: votes for self (1/5)
t=1ms Node C -> Node A: RequestVote(term=2, lastLogIdx=8, lastLogTerm=1)
Node C -> Node B: RequestVote(term=2, lastLogIdx=8, lastLogTerm=1)
Node C -> Node D: RequestVote(term=2, lastLogIdx=8, lastLogTerm=1)
Node C -> Node E: RequestVote(term=2, lastLogIdx=8, lastLogTerm=1)
t=2ms Node A: term 2 > my term 1. Update term.
C's log (idx=8, term=1) >= my log (idx=8, term=1).
Have not voted in term 2.
-> Grant vote to C. (C: 2/5)
t=2ms Node B: same evaluation.
-> Grant vote to C. (C: 3/5, MAJORITY)
t=3ms Node C: received 3 votes (self + A + B) out of 5.
-> Becomes Leader for Term 2.
-> Sends heartbeat (empty AppendEntries) to all nodes.
t=4ms Node D: receives heartbeat from C with term=2.
-> Recognizes C as leader, remains Follower, updates term.
t=5ms Node E: receives heartbeat from C with term=2.
-> Recognizes C as leader, remains Follower, updates term.
RequestVote RPC contents:
RequestVote {
term: uint64 // candidate's current term
candidateId: string // candidate requesting the vote
lastLogIndex: uint64 // index of candidate's last log entry
lastLogTerm: uint64 // term of candidate's last log entry
}
Vote is granted if and only if:
1. candidate's term >= voter's current term
2. voter has not already voted in this term
(or voted for this same candidate)
3. candidate's log is at least as up-to-date as voter's log
(compared by lastLogTerm first, then lastLogIndex)
Election timeout constraints:
The election timeout must be significantly larger than the broadcast round-trip time. A common guideline:
broadcastTime << electionTimeout << MTBF
Where:
broadcastTime = network round-trip time (~0.5-20 ms within a datacenter)
electionTimeout = 150-300 ms (typical)
MTBF = mean time between failures for a single node
If broadcastTime approaches electionTimeout, heartbeats may not arrive
before timeouts expire, causing spurious elections and leader instability.
Log Replication
The leader appends client requests to its local log and replicates entries to followers via AppendEntries RPCs. An entry is committed once replicated to a majority. Only committed entries are applied to the state machine.
Log Replication, Step by Step:
1. Client sends write request to Leader
Client ββ> Leader: "SET x = 42"
2. Leader appends entry to local log
Leader's Log:
βββββββββ¬ββββββββ¬ββββββββ¬ββββββββ¬ββββββββββββββββ
β idx=1 β idx=2 β idx=3 β idx=4 β idx=5 β
β t=1 β t=1 β t=1 β t=2 β t=2 β
β SET β SET β DEL β SET β SET x=42 NEW β
β a=1 β b=2 β a β c=3 β β
βββββββββ΄ββββββββ΄ββββββββ΄ββββββββ΄ββββββββββββββββ
commitIndex = 4
3. Leader sends AppendEntries to all followers
AppendEntries {
term: 2
leaderId: "leader"
prevLogIndex: 4 // index of entry preceding new entries
prevLogTerm: 2 // term of entry at prevLogIndex
entries: [{idx:5, term:2, cmd:"SET x=42"}]
leaderCommit: 4 // leader's current commit index
}
4. Followers validate and append
Node B (log: [1,2,3,4]):
prevLogIndex=4, prevLogTerm=2 matches local entry at idx=4
-> Append entry 5. Send ACK.
Node C (log: [1,2,3,4]):
-> Same. Append entry 5. Send ACK.
Node D (log: [1,2,3]):
prevLogIndex=4: no entry at index 4
-> REJECT. Send NACK.
Node E (log: [1,2,3,4]):
-> Append entry 5. Send ACK.
5. Leader counts acknowledgments
ACKs from B, C, E + self = 4/5 (majority)
-> Advance commitIndex to 5.
-> Apply "SET x=42" to state machine.
-> Respond to client: success.
-> Include commitIndex=5 in next heartbeat so followers
advance their own commitIndex and apply the entry.
6. For Node D (behind):
Leader decrements nextIndex[D] from 5 to 4.
Retries AppendEntries with prevLogIndex=3.
On success, sends entries 4 and 5.
Node D catches up.
Consistency check mechanism:
The prevLogIndex and prevLogTerm fields in AppendEntries form an inductive consistency check. If a follower's log matches at prevLogIndex, the Log Matching Property guarantees all preceding entries also match.
When a follower rejects an AppendEntries due to a log mismatch, the leader decrements nextIndex for that follower and retries. This backtracking can be slow if the follower is many entries behind. The etcd implementation optimizes this by including a ConflictIndex and ConflictTerm in the rejection response, allowing the leader to skip directly to the divergence point.
Log Divergence and Repair:
Leader (Term 4): [1:t1] [2:t1] [3:t2] [4:t3] [5:t3] [6:t4] [7:t4]
Follower A: [1:t1] [2:t1] [3:t2] [4:t3] [5:t3]
Follower B: [1:t1] [2:t1] [3:t2] [4:t3]
Follower C: [1:t1] [2:t1] [3:t2] [4:t2] [5:t2]
^^^^ ^^^^
diverges from leader
Repairing Follower C:
Leader sends AppendEntries(prevLogIndex=6, ...)
C: no entry at 6. REJECT(conflictIndex=5).
Leader sends AppendEntries(prevLogIndex=4, prevLogTerm=3, ...)
C: entry at 4 has term=2, not term=3. REJECT(conflictIndex=4, conflictTerm=2).
Leader sends AppendEntries(prevLogIndex=3, prevLogTerm=2, entries=[4:t3, 5:t3, 6:t4, 7:t4])
C: entry at 3 has term=2. Match.
C: delete entries 4 and 5 (conflicting).
C: append entries 4,5,6,7 from leader.
Follower C after repair:
[1:t1] [2:t1] [3:t2] [4:t3] [5:t3] [6:t4] [7:t4]
Now matches leader.
Safety: The Election Restriction
The core safety property of Raft: if a log entry has been committed in a given term, that entry will be present in the logs of all leaders for all higher-numbered terms.
This is enforced by the election restriction. A voter rejects a RequestVote if the candidate's log is less up-to-date than its own. "Up-to-date" is determined by comparing the term and index of the last log entry.
Election Restriction Proof Sketch:
Given: Entry E at index 5, term 2 is committed
(replicated to nodes A, B, C in a 5-node cluster)
Claim: No future leader can lack entry E.
Nodes with E: {A, B, C} (3 nodes, a majority)
Any majority: (requires >= 3 nodes)
For any candidate to win election:
It needs votes from >= 3 nodes.
Any set of 3 nodes intersects {A, B, C} by at least 1 node.
That intersecting node has entry E and will reject
any candidate whose log does not contain E.
Therefore, every future leader must have entry E.
Formal basis: quorum intersection property.
Two majorities of a set of size N must overlap by
at least 1 element (since each has > N/2 elements).
βββββββββββββββββββββββββββββββββββββββββββ
β All 5 nodes: {A,B,C,D,E} β
β β
β Committed to: {A, B, C} (majority) β
β β© β
β Any majority: {?, ?, ?} (>= 3) β
β = at least 1 common β
β β
β That common node enforces the β
β election restriction. β
βββββββββββββββββββββββββββββββββββββββββββ
Leader Crash During Replication
When a leader crashes mid-replication, the following cases arise depending on how many followers received the entry:
Leader Crash Scenarios:
Scenario 1: Entry replicated to majority before crash
Leader (crashed): [1] [2] [3] [4*] (* = uncommitted)
Node B: [1] [2] [3] [4*]
Node C: [1] [2] [3] [4*]
Node D: [1] [2] [3]
Node E: [1] [2] [3]
Entry 4 reached B and C (majority with leader = 3/5).
Leader crashed before advancing commitIndex.
New election: B or C can become leader (up-to-date log).
New leader has entry 4. It will be re-replicated and committed.
Entry 4 is preserved.
Scenario 2: Entry replicated to minority before crash
Leader (crashed): [1] [2] [3] [4*]
Node B: [1] [2] [3] [4*]
Node C: [1] [2] [3]
Node D: [1] [2] [3]
Node E: [1] [2] [3]
Entry 4 reached only B (2/5 including leader, not majority).
Entry was never committed.
New election: C, D, or E could become leader.
If C becomes leader with a new entry 4 in term 3:
B's entry 4 (term 2) gets overwritten.
Uncommitted entry is lost. This is correct behavior,
as the client never received an acknowledgment.
Scenario 3: Leader crashes after committing but before responding
Leader committed entry 4, applied to state machine,
then crashed before sending response to client.
Client does not know if write succeeded.
Client must retry with an idempotency key.
New leader has the committed entry.
Write is durable. Retry is a no-op (with idempotency).
Linearizable Reads
Achieving linearizable reads in a Raft-based system requires more than simply reading from the leader's state machine. A leader that has been partitioned from the majority may not know it has been superseded.
Stale Read Problem:
Node A: leader, term 5
Network partition isolates A from {B, C, D, E}
βββββββββββ βββββββββββ βββββ βββββ βββββ
β A β X X X β B β β C β β D β β E β
β β β β β β β β β β
β term=5 β no link β elected β β β β β β β
β "I am β β leader β β β β β β β
β leader"β β term=6 β β β β β β β
βββββββββββ βββββββββββ βββββ βββββ βββββ
Client reads from A. A returns its local state.
Meanwhile B (actual leader, term 6) has committed new writes.
Client received stale data. Linearizability violated.
Three approaches address this problem:
ReadIndex
Before serving a read, the leader confirms it still holds leadership by broadcasting a heartbeat and waiting for majority acknowledgment. This adds one network round trip per read (or per batch of reads).
// ReadIndex approach (used by etcd by default)
func (s *RaftServer) LinearizableRead(key string) ([]byte, error) {
// 1. Record current commit index as the read index
readIndex := s.raft.CommitIndex()
// 2. Confirm leadership by sending a round of heartbeats
if err := s.raft.ConfirmLeadership(); err != nil {
return nil, ErrNotLeader
}
// 3. Wait for state machine to catch up to readIndex
// (committed entries may not yet be applied)
if err := s.waitForApply(readIndex); err != nil {
return nil, err
}
// 4. Read from state machine
return s.stateMachine.Get(key), nil
}Lease-Based Reads
The leader maintains a time-based lease. After receiving heartbeat acknowledgments from a majority, the leader knows that no election can succeed for at least the election timeout duration (since followers reset their election timers on heartbeat receipt). Reads can be served locally without additional RPCs as long as the lease is valid.
// Lease-based reads (used by CockroachDB, TiKV)
func (s *RaftServer) LeaseRead(key string) ([]byte, error) {
// Check if the leader lease is still valid
// Lease duration < election timeout
if time.Now().Before(s.leaseExpiry) {
return s.stateMachine.Get(key), nil
}
// Lease expired. Fall back to ReadIndex or step down.
return nil, ErrLeaseExpired
}This approach depends on bounded clock skew between nodes. If a follower's clock runs fast, it may start an election before the leader's lease has expired from the leader's perspective.
Lease-Based Read Timing:
Leader sends heartbeat ββ> Followers reset election timer
| |
| lease_start = now() | election_timeout starts
| lease_end = now() + |
| (election_timeout - |
| max_clock_drift) |
| |
|<ββββ lease valid βββββββ>| |
| | |<ββ earliest possible election
| | |
ββββββββββββββββββββββββββββββββββββββββββ> time
As long as max_clock_drift is correctly bounded,
no election can start while the lease is valid.
Follower Reads
For read-heavy workloads, directing all reads through the leader creates a bottleneck. Follower reads distribute read load across the cluster.
Follower Read Protocol:
Client ββ> Follower C: "Read key x"
Follower C ββ> Leader: "What is your current commit index?"
Leader ββ> Follower C: "commitIndex = 47"
Follower C: wait until local applied index >= 47
Follower C: read x from local state machine
Follower C ββ> Client: value of x
This provides linearizability at the point in time when the
leader responded with its commit index.
Performance benefit:
Read throughput scales with number of nodes.
Write throughput remains limited by the leader.
Read Approach Comparison
| Approach | Extra RTTs per Read | Clock Dependency | Implementation Complexity |
|---|---|---|---|
| ReadIndex | 1 (heartbeat round) | None | Low |
| Lease-based | 0 | Requires bounded clock skew | Medium |
| Follower read | 1 (query leader) | None | Medium |
| Quorum read | 1 (read from majority) | None | High |
Google Spanner: TrueTime and External Consistency
Spanner provides external consistency: if transaction T1 commits before transaction T2 starts (in absolute wall-clock time), then T1's commit timestamp is less than T2's commit timestamp. This property holds globally across all data centers.
TrueTime
Spanner's TrueTime API exposes clock uncertainty explicitly.
TrueTime API:
TT.now() -> TTinterval { earliest, latest }
Guarantee: earliest <= absolute_time <= latest
The uncertainty interval (epsilon = latest - earliest)
depends on time since last synchronization with a
TrueTime master.
TrueTime masters contain:
- GPS receivers (synchronized to GPS time)
- Atomic clocks (free-running, independent of GPS)
GPS and atomic clocks have uncorrelated failure modes.
Typical epsilon values:
Immediately after sync: ~1 ms
Between syncs: 1-7 ms (sawtooth pattern)
Average: ~4 ms
Comparison with other time sources:
ββββββββββββββββββββββ¬ββββββββββββββββββββββ
β Time Source β Typical Uncertainty β
ββββββββββββββββββββββΌββββββββββββββββββββββ€
β TrueTime (Spanner) β 1-7 ms β
β NTP (well-tuned) β 1-10 ms β
β NTP (default) β 50-500 ms β
β Unsynced clocks β seconds to minutes β
ββββββββββββββββββββββ΄ββββββββββββββββββββββ
Commit Wait Protocol
Spanner's commit protocol uses TrueTime to assign globally meaningful timestamps.
Spanner Commit Protocol:
1. Transaction T executes (reads, writes, lock acquisition)
2. Coordinator acquires Paxos-replicated locks
3. Coordinator picks commit timestamp:
s = TT.now().latest
(guarantees s >= absolute_time at this moment)
4. COMMIT WAIT:
Wait until TT.now().earliest > s
After this wait, absolute_time > s.
Any future transaction starting after this point
will observe TT.now().earliest > s, and will
therefore pick a timestamp > s.
5. Release locks and apply the commit.
Timeline:
ββββββββββββββββββββββββββββββββββββββββββββββββββ> time
| | |
T picks commit-wait T releases
s = t1 interval locks
(~7 ms avg) commit visible
|
Any T2 starting here
will get timestamp > t1
The commit-wait duration equals the TrueTime uncertainty
at the time of commit. Average: ~7 ms. This is the cost
of external consistency.
Lock-Free Snapshot Reads
Because every committed write has a globally meaningful timestamp, Spanner supports snapshot reads at any past timestamp without locking or coordination.
Snapshot Read:
Client: "Read accounts A, B, C as of timestamp T=1000"
Any Spanner replica (need not be leader):
1. Check: is my safe_time >= 1000?
(safe_time = timestamp up to which all writes are applied)
2. If yes: read A, B, C from local versioned storage at T=1000.
No locks acquired. No coordination with leader.
3. If no: wait for replication to advance safe_time past 1000.
This enables:
- Read-only transactions with zero lock contention
- Consistent global snapshots for analytics
- Time-travel queries (read data as it was N seconds ago)
Trade-off: snapshot data is at least epsilon old
(where epsilon is the TrueTime uncertainty).
CockroachDB: Raft-Based Distributed SQL
CockroachDB uses Raft for replication within ranges (contiguous spans of the key space). Each range (default 512 MB) is a Raft group with typically 3 or 5 replicas.
CockroachDB Architecture:
Key Space: [min_key ββββββββββββββββββββββββββ max_key]
Divided into ranges:
βββββββββββ βββββββββββ βββββββββββ βββββββββββ
β Range 1 β β Range 2 β β Range 3 β β Range 4 β
β [a-f) β β [f-m) β β [m-s) β β [s-z) β
βββββββββββ βββββββββββ βββββββββββ βββββββββββ
β β β β
β Each range is a separate Raft group β
v v v v
βββββββββ βββββββββ βββββββββ βββββββββ
βNode 1 β βNode 2 β βNode 1 β βNode 3 β
βNode 2 β βNode 3 β βNode 3 β βNode 1 β (replicas)
βNode 3 β βNode 1 β βNode 2 β βNode 2 β
βββββββββ βββββββββ βββββββββ βββββββββ
Each range has its own Raft leader.
Leaders are distributed across nodes for load balancing.
Transactions spanning multiple ranges use parallel consensus
with a two-phase commit (2PC) protocol across range leaders.
CockroachDB uses a hybrid logical clock (HLC) for timestamp ordering. HLCs combine a physical clock component with a logical counter, providing causality tracking without requiring specialized hardware like Spanner's TrueTime.
Hybrid Logical Clock (HLC):
HLC = (physical_time, logical_counter)
Comparison: (pt1, lc1) < (pt2, lc2)
if pt1 < pt2, or (pt1 == pt2 and lc1 < lc2)
On local event:
pt = max(hlc.pt, wall_clock)
if pt == hlc.pt: lc = hlc.lc + 1
else: lc = 0
hlc = (pt, lc)
On receiving message with timestamp (msg_pt, msg_lc):
pt = max(hlc.pt, msg_pt, wall_clock)
if pt == hlc.pt == msg_pt: lc = max(hlc.lc, msg_lc) + 1
elif pt == hlc.pt: lc = hlc.lc + 1
elif pt == msg_pt: lc = msg_lc + 1
else: lc = 0
hlc = (pt, lc)
Trade-off vs. TrueTime:
HLC: no special hardware, but cannot bound uncertainty
without clock skew assumptions
TrueTime: requires GPS/atomic clocks, but provides
bounded uncertainty with formal guarantees
etcd: Raft Reference Implementation
etcd is widely used for service discovery, configuration management, and distributed locking. It implements Raft using the etcd/raft library in Go, which provides a clean separation between the Raft state machine and the application layer.
Key implementation details:
etcd Raft Implementation Characteristics:
Write path:
Client -> gRPC -> Leader -> Raft proposal
-> WAL write (fsync) -> Replicate to followers
-> Followers WAL write (fsync) -> ACK to leader
-> Commit -> Apply to boltdb -> Respond to client
Performance characteristics (3-node, NVMe SSD, same DC):
Sequential writes: ~10,000 ops/sec (no batching)
Batched writes: ~50,000-100,000 ops/sec
Linearizable reads: ~30,000 ops/sec (ReadIndex)
Serializable reads: ~100,000+ ops/sec (local follower)
Default configuration:
heartbeat-interval: 100 ms
election-timeout: 1000 ms (10x heartbeat)
snapshot-count: 100,000 (entries before snapshot)
max-request-bytes: 1.5 MB
quota-backend-bytes: 2 GB (default storage limit)
WAL (Write-Ahead Log):
Format: sequential binary records
Sync policy: fsync after every batch
Segment size: 64 MB
Location: dedicated disk recommended for latency isolation
Membership Changes
Adding or removing nodes from a consensus cluster is a source of subtle bugs. The core risk: if different nodes have different views of cluster membership, they may disagree on what constitutes a majority, potentially allowing two leaders to be elected simultaneously.
Joint Consensus (Raft Dissertation)
The Raft dissertation describes a two-phase approach: the cluster transitions through a joint configuration C_old,new that requires agreement from majorities of both the old and new configurations.
Joint Consensus (two-phase membership change):
Phase 1: Transition to joint configuration C_{old,new}
Leader replicates C_{old,new} entry.
Decisions require majorities from BOTH old AND new configs.
Old config: {A, B, C} majority = 2
New config: {A, B, C, D, E} majority = 3
Joint: requires 2 from {A,B,C} AND 3 from {A,B,C,D,E}
Phase 2: Transition to new configuration C_new
Once C_{old,new} is committed, leader replicates C_new entry.
After C_new is committed, old-config-only nodes can be shut down.
Timeline:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββ> time
| | |
C_old only C_{old,new} C_new only
majority=2 requires both majority=3
majorities
Single-Node Changes (Practical Approach)
Most production systems, including etcd, use single-node membership changes. Adding or removing one node at a time guarantees that any two majorities (old and new) overlap, preventing split-brain.
Single-Node Change Safety:
{A, B, C} -> {A, B, C, D}
majority = 2 majority = 3
Any majority of {A,B,C}: {A,B}, {A,C}, {B,C}
Any majority of {A,B,C,D}: {A,B,C}, {A,B,D}, {A,C,D}, {B,C,D}
Every pair intersects by at least 1 node. No split-brain possible.
To change from 3 nodes to 5 nodes:
Step 1: {A,B,C} -> {A,B,C,D} (add D)
Step 2: {A,B,C,D} -> {A,B,C,D,E} (add E)
Each step is individually safe.
Log Compaction and Snapshotting
The Raft log grows without bound unless compacted. Log compaction replaces a prefix of the log with a snapshot of the state machine at that point.
Log Compaction:
Before:
Log: [1] [2] [3] [4] [5] [6] [7] [8] [9] [10]
Applied: * * * * * * *
Committed: * * * * * * * * *
^
snapshot point
After snapshot at index 7:
Snapshot: { state_machine_state_at_index_7, last_included_index=7, last_included_term=2 }
Log: [8] [9] [10]
Entries 1-7 are discarded. The snapshot captures their
cumulative effect on the state machine.
InstallSnapshot RPC (for slow followers or new nodes):
Leader ββ> Slow Follower: InstallSnapshot {
term: current leader term
leaderId: leader node ID
lastIncludedIndex: 7
lastIncludedTerm: 2
data: [serialized state machine snapshot]
}
Follower:
1. Discard entire log (or entries up to lastIncludedIndex)
2. Load snapshot into state machine
3. Resume normal AppendEntries replication from index 8
Snapshot implementation strategies:
| Strategy | Used By | Mechanism | Trade-off |
|---|---|---|---|
| fork() + serialize | Redis, some etcd configs | OS copy-on-write via fork | Simple, but memory spike risk |
| LSM-tree checkpoint | CockroachDB (RocksDB/Pebble) | Native point-in-time snapshot | Efficient, no extra memory |
| Incremental snapshot | TiKV | Stream changes since last snapshot | Lower I/O, more complex |
Disk I/O and Write Performance
Consensus protocols require durable writes (fsync) before acknowledging entries. Disk I/O latency directly bounds throughput.
Write Latency Breakdown (5-node cluster, same datacenter):
Component NVMe SSD SATA SSD HDD
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Network (client to leader) 0.5 ms 0.5 ms 0.5 ms
Leader WAL fsync 0.05 ms 0.5 ms 5 ms
Network (leader to followers) 0.5 ms 0.5 ms 0.5 ms
Follower WAL fsync 0.05 ms 0.5 ms 5 ms
Network (follower ACK) 0.5 ms 0.5 ms 0.5 ms
Apply + respond 0.1 ms 0.1 ms 0.1 ms
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Total (single entry) ~1.7 ms ~2.6 ms ~11.6 ms
Theoretical max ops/sec ~590 ~385 ~86
With batching (100 entries per fsync):
Amortized per entry ~0.17 ms ~0.26 ms ~1.16 ms
Theoretical max ops/sec ~59,000 ~38,500 ~8,600
Cross-datacenter (leader to remote follower RTT = 40 ms):
Total (single entry) ~81 ms ~82 ms ~91 ms
Batching is critical for production throughput.
etcd batching behavior:
etcd batches multiple proposals into a single Raft round when they arrive within the same event loop tick (~1 ms). Under high load, this naturally groups entries, reducing the number of fsyncs per second.
Monitoring Consensus Clusters
Key metrics for operating a Raft-based system:
Critical Monitoring Metrics:
1. Leader elections per hour
Normal: 0
Warning: > 0 (investigate cause)
Critical: Continuous flapping
Common causes of unexpected elections:
- Disk latency spikes (fsync exceeding heartbeat interval)
- Network partitions or packet loss
- CPU saturation delaying heartbeat processing
- GC pauses exceeding election timeout (JVM-based systems)
2. Raft proposal apply latency (p99)
Normal: < 10 ms
Warning: > 50 ms (state machine cannot keep up)
Critical: > 500 ms (risk of OOM from log growth)
3. Entries behind leader (per follower)
Normal: 0-10
Warning: > 1000 (follower falling behind)
Critical: > snapshot-count (will require snapshot transfer)
4. WAL fsync duration (p99)
Normal: < 1 ms (SSD)
Warning: > 10 ms
Critical: > heartbeat-interval (will cause elections)
5. Proposal failure rate
Normal: 0%
Warning: > 0% (leader may be losing quorum)
6. Network RTT between nodes (p99)
Requirement: RTT << election-timeout
Rule of thumb: RTT < election-timeout / 10
System Comparison
| Property | etcd | CockroachDB | Spanner | ZooKeeper | TiKV |
|---|---|---|---|---|---|
| Consensus | Raft | Raft (per-range) | Paxos | Zab | Raft (per-region) |
| Data model | Key-value | SQL (relational) | SQL (relational) | Hierarchical (znodes) | Key-value + txn |
| Consistency | Linearizable | Serializable (SSI) | External | Linearizable | Linearizable |
| Clock mechanism | None | HLC | TrueTime (GPS/atomic) | None | HLC or TSO |
| Replication unit | Entire store | Range (512 MB) | Split (~8 GB) | Entire store | Region (96 MB) |
| Typical write latency | 10-20 ms | 20-50 ms | 10-15 ms (commit-wait) | 10-30 ms | 5-15 ms |
| Max data size | ~8 GB (recommended) | Terabytes+ | Petabytes+ | Hundreds of MB | Terabytes+ |
| Primary use case | Coordination, config | OLTP, multi-region SQL | Global OLTP | Coordination, locking | Storage engine for TiDB |
Failure Mode Reference
Common Failure Modes and System Behavior:
Failure: Single follower crashes
Impact: None if quorum maintained
Recovery: Node restarts, replays WAL, catches up via AppendEntries
Duration: Seconds to minutes depending on log gap
Failure: Leader crashes
Impact: Brief unavailability during election (~election-timeout)
Recovery: New leader elected, uncommitted entries may be lost
Duration: 150-300 ms (typical election timeout) + election round
Failure: Minority partition
Impact: Partitioned nodes cannot serve writes
Recovery: Automatic when partition heals
Duration: Duration of partition
Failure: Majority partition (quorum lost)
Impact: Entire cluster unavailable for writes
Recovery: Requires partition heal or manual intervention
Duration: Until quorum restored
Failure: Disk full on leader
Impact: Leader cannot append to WAL, cannot send heartbeats
Recovery: Free disk space or replace node
Risk: Leader steps down, election occurs
Failure: Slow disk on one node
Impact: That node falls behind. If it is the leader,
heartbeats are delayed, causing elections.
Cluster may thrash between leaders.
Recovery: Replace disk or remove node from cluster
Failure: Clock skew (lease-based reads)
Impact: Stale reads possible if skew exceeds lease margin
Recovery: Fix NTP configuration, restart affected nodes
Risk: Silent data staleness (no error returned to client)
Failure: Full network partition (split-brain)
Impact: CP systems: minority side unavailable
AP systems: divergent writes, conflicts on heal
Recovery: CP: automatic. AP: conflict resolution required.
Raft Implementation in Go (Simplified)
The following simplified implementation illustrates the core Raft data structures and RPCs:
package raft
type NodeState int
const (
Follower NodeState = iota
Candidate
Leader
)
type LogEntry struct {
Term uint64
Index uint64
Command []byte
}
type RaftNode struct {
mu sync.Mutex
id string
state NodeState
currentTerm uint64
votedFor string
log []LogEntry
commitIndex uint64
lastApplied uint64
// Leader state (reinitialized after election)
nextIndex map[string]uint64 // for each follower: next log index to send
matchIndex map[string]uint64 // for each follower: highest known replicated index
peers []string
electionTimeout time.Duration
heartbeatTicker *time.Ticker
electionTimer *time.Timer
}
type AppendEntriesRequest struct {
Term uint64
LeaderID string
PrevLogIndex uint64
PrevLogTerm uint64
Entries []LogEntry
LeaderCommit uint64
}
type AppendEntriesResponse struct {
Term uint64
Success bool
ConflictIndex uint64 // optimization: hint for faster log repair
ConflictTerm uint64
}
type RequestVoteRequest struct {
Term uint64
CandidateID string
LastLogIndex uint64
LastLogTerm uint64
}
type RequestVoteResponse struct {
Term uint64
VoteGranted bool
}
// AppendEntries handles incoming AppendEntries RPCs (follower side)
func (n *RaftNode) AppendEntries(req AppendEntriesRequest, resp *AppendEntriesResponse) {
n.mu.Lock()
defer n.mu.Unlock()
resp.Term = n.currentTerm
resp.Success = false
// Reply false if term < currentTerm
if req.Term < n.currentTerm {
return
}
// If RPC term is higher, update term and convert to follower
if req.Term > n.currentTerm {
n.currentTerm = req.Term
n.state = Follower
n.votedFor = ""
}
// Reset election timer (leader is alive)
n.resetElectionTimer()
// Check if log contains entry at prevLogIndex with prevLogTerm
if req.PrevLogIndex > 0 {
if req.PrevLogIndex > uint64(len(n.log)) {
resp.ConflictIndex = uint64(len(n.log)) + 1
return
}
if n.log[req.PrevLogIndex-1].Term != req.PrevLogTerm {
resp.ConflictTerm = n.log[req.PrevLogIndex-1].Term
// Find first index of conflicting term
for i := req.PrevLogIndex - 1; i > 0; i-- {
if n.log[i-1].Term != resp.ConflictTerm {
resp.ConflictIndex = i + 1
break
}
}
return
}
}
// Append new entries, removing any conflicting entries
for i, entry := range req.Entries {
idx := req.PrevLogIndex + uint64(i) + 1
if idx <= uint64(len(n.log)) {
if n.log[idx-1].Term != entry.Term {
n.log = n.log[:idx-1] // truncate conflicting entries
n.log = append(n.log, req.Entries[i:]...)
break
}
} else {
n.log = append(n.log, req.Entries[i:]...)
break
}
}
// Update commit index
if req.LeaderCommit > n.commitIndex {
lastNewIndex := req.PrevLogIndex + uint64(len(req.Entries))
if req.LeaderCommit < lastNewIndex {
n.commitIndex = req.LeaderCommit
} else {
n.commitIndex = lastNewIndex
}
}
resp.Success = true
}References
Relevant papers and systems documentation:
- Lamport, L. "The Part-Time Parliament." ACM TOCS, 1998.
- Lamport, L. "Paxos Made Simple." ACM SIGACT News, 2001.
- Ongaro, D. and Ousterhout, J. "In Search of an Understandable Consensus Algorithm." USENIX ATC, 2014.
- Chandra, T., Griesemer, R., and Redstone, J. "Paxos Made Live: An Engineering Perspective." PODC, 2007.
- Corbett, J. et al. "Spanner: Google's Globally-Distributed Database." OSDI, 2012.
- Taft, R. et al. "CockroachDB: The Resilient Geo-Distributed SQL Database." SIGMOD, 2020.
- Gilbert, S. and Lynch, N. "Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services." ACM SIGACT News, 2002.
- Abadi, D. "Consistency Tradeoffs in Modern Distributed Database System Design." IEEE Computer, 2012. (PACELC)
- Kingsbury, K. Jepsen: Distributed Systems Safety Analysis. https://jepsen.io/