Zookeeper数据一致性

在分布式的环境中,存在多个值,需要从中选定出一个值,达成共识

Paxos

Prepare 阶段

proposer 在提出议案 N 之前,N 是该议案的编号,需要向至少多数的 acceptor 发送 Prepare(N)请求

acceptor 收到 Prepare(N)请求后

  1. 如果之前已经接受了其他议案,那么返回接受过的议案。
  2. 如果之前收到过Prepare(M)请求,并且 M > N,那么可以忽略此次请求,也可以返回一个错误
  3. 如果之前收到过Prepare(M)请求,并且 M < N,那么 acceptor 会保证以后不会响应议案编号小于 N 的任何请求,并且返回成功响应
Accept 阶段

proposer 在收到多数 acceptor 的成功响应后,,如果这些acceptor中有返回议案,那么就从中选出那个编号最大的议案的值,作为接下来要提

交的.如果没有返回议案,那么将本身的议案作为提交议案

在确定好提交议案后,proposer 会向至少多数的 acceptor 发起 ACCEPT 请求,提交议案.注意下这里的多数 acceptor 可以不和 prepare 阶

段的多数 acceptor 相同

acceptor 当收到 ACCEPT 请求后,会检查议案编号.因为acceptor 在 prepare 阶段,已经承诺过不会议案编号小于 N 的请求,如果 ACCEPT

请求的议案编号小于 N,那么就会拒绝接受此议案.否则,就会接受该议案

当 proposer 收到多数 acceptor 的成功接收议案的响应后,就会认为该议案被选定了

Learn 阶段

每当 acceptor 接受了一个议案,就会立即通知 learner。learner 会记录每个 acceptor 接收的议案,如果一个议案被多数 acceptor 接受了

那么就决定选定该议案.(假设有两个 proposer , proposer 2的议案被接收但是 proposer 1 不能保证 proposer 2 能够正常完成提交议

案,因为 proposer 2 有可能中途会挂掉,所以 proposer 1 只能继续提交议案,直到确认有一个议案被选定)

每个议案的编号都是唯一的,但是存在着多个 proposer,如何能保证它们提出的议案编号都不相同

质数算法:每个 proposer 对应着唯一的质数,每个新增议案,就是该质数的倍数.比如有两个 proposer 对应着 2,3 两个质数,第一个

proposer 提出第 i 个议案时,它的编号时 2 * i

等差算法:假设有 n 个 proposer,那么第 m 个 proposer第 i 次新增的议案编号是 m + i * n

Zab

Zab 协议保证了集群在任何时刻,要么只有一个 leader,要么没有 leader.这两种情况称为广播模式和恢复模式:

  1. 处于广播模式的集群,能够正常对外提供服务,客户端可以正常的读写
  2. 处于恢复模式的集群,不能对外提供服务,这时节点有可能处于选举过程,同步过程

广播模式比较简单,就是和二阶段提交类型,但是允许不超过半数的服务挂掉.客户端的写入请求都会转发给 leader,然后 leader 将消息发送给 follower,如果超过半数的follower响应成功,那么就提交 commit 操作,将数据持久化到磁盘

消息广播过程:

  1. 客户端发起写请求
  2. Leader将客户端请求信息转化为事务Proposal,同时为每个Proposal分配一个事务ID(Zxid)
  3. Leader为每个Follower单独分配一个FIFO的队列,将需要广播的Proposal依次放入到队列中
  4. Follower接收到Proposal后,首先将其以事务日志的方式写入到本地磁盘中,写入成功后给Leader反馈一个ACK响应
  5. Leader接收到半数以上Follower的ACK响应后,即认为消息发送成功,可以发送Commit消息
  6. Leader向所有Follower广播Commit消息,同时自身也会完成事务提交.Follower接收到Commit消息后也会完成事务的提交

崩溃恢复过程:

恢复模式有一点复杂,它会从集群中选举出一个节点作为 leader,这个节点需要具有最新的数据.而且这个 leader 需要被过半节点认同.我们

回想下 zookeeper 的写操作需要过半的节点完成才能认为成功,而选举也需要过半的节点都同意,那么两个集合之间必定有交叉,就可以保证

选举出来的是有最新数据的节点

加载过程: zookeeper 的节点会在本地磁盘持久化数据,启动时会加载这些本地数据,这些数据都是根据事务 id 顺序存储的,这样就可以根据

最大的事务 id 来判断数据的新旧

选举过程: 各个节点相同通信,选举出一个共同的 leader 节点.这个 leader 节点的选票必须过半,并且有着最新的数据

同步过程:当成为 follower 节点后,需要向 leader 节点同步数据,因为 follower 节点的数据可能落后于leader

处理请求:当同步过程完成后,集群就可以对外提供读写服务了

Zab协议规定了 如果⼀个事务Proposal在⼀台机器上被处理成功,那么应该在所有的机器上都被处理成功,哪怕机器出现故障崩溃. 针对这些情况ZAB协议需要保证以下条件

  1. 已经在Leader服务器上提交的事务最终被所有服务器都提交

假设⼀个事务在 Leader 服务器上被提交了,并且已经得到过半 Folower 服务器的Ack反馈,但是在它将Commit消息发送给所有Follower机器之前,Leader服务器挂了

  1. 丢弃只在Leader服务器上被提出(未提交)的事务

假设初始的 Leader 服务器 Server1 在提出了⼀个事务Proposal3 之后就崩溃退出了,从⽽导致集群中的其他服务器都没有收到这个事务Proposal3.于是.当 Server1 恢复过来再次加入到集群中的时候.Zab 协议需要确保丢弃Proposal3这个事务

综上所述,ZAB的选举出来的Leader必须满足以下条件:

能够确保提交已经被 Leader 提交的事务 Proposal,同时丢弃已经被跳过的事务 Proposal.即:

  1. 新选举出来的 Leader 不能包含未提交的 Proposal.
  2. 新选举的 Leader 节点中含有最大的 zxid
FastLeaderElection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public class FastLeaderElection implements Election {
AtomicLong logicalclock = new AtomicLong(); // 选举轮数

// 自身节点的投票信息
long proposedLeader; // 哪个节点为leader
long proposedZxid; // 选举的leader节点的最大事务id
long proposedEpoch; // 选举的leader节点的最大epoch

public Vote lookForLeader() throws InterruptedException {
// 存储着来自同选举轮数,每个节点对应的投票,投票里包含了选举哪个节点作为leader
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
// 存储着来自leader或者follower的投票
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
// 每次选举都会增加选举轮数
logicalclock.incrementAndGet();
// 设置自身的投票信息
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
// 发送投票给其余的节点
sendNotifications();

// 循环,一直到选出leader为止
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
// 从队列里获取其余节点的投票信息
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
// 根据发送投票的节点的状态,做不同的处理
switch (n.state) {
case LOOKING:
// 发送节点的状态为LOOKING
// 先比较选举轮数,再比较epoch和事务id
if (n.electionEpoch > logicalclock.get()) {
// 如果自己的选举轮数落后,更新自己的选举轮数
logicalclock.set(n.electionEpoch);
// 还需要清空旧的投票信息,因为这些都已经过时了
recvset.clear();
// 比较谁更适合当选leader,如果是别的节点,那么就更新自己的投票信息
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
// 需要重新发送投票,因为之前的已经过时了
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
// 如果发送投票的节点,选举轮数落后,那么就不用理睬
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
// 如果别的节点更适合当选leader,那么就更新自己的投票信息,并且重新发送
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}

// 记录投票信息
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

// 如果有超过半数的投票一致,那么有可能新的leader被选举出来了
if (voteSet.hasAllQuorums()) {
// 检测队列里的剩余投票请求,并且直到等待finalizeWait时间,仍然没有新的请求
// 如果有别的节点更适合当选leader,那么就跳到循环开始,来处理投票信息
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
// 如果仍然没有新请求,那么就认为leader选定出来了
// 根据比较节点id,判断是follower角色还是leader角色
if (n == null) {
// 这里会更新该节点的状态为follower或者leader
setPeerState(proposedLeader, voteSet);
// 返回最终的选票
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
return endVote;
}
}
break;

case FOLLOWING:
case LEADING:
if(n.electionEpoch == logicalclock.get()){
// 在同一选举轮数中,已经有leader产生并且开始运行了
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
// 这里会更新该节点的状态为follower或者leader
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
return endVote;
}
}
// 这里不用区分选举轮数,因为follower和leader角色已经稳定运行了
outofelection.put(n.sid, new Vote(n.version, n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state));
// 如果多数节点的投票相同,那么检查该leader节点是否处于leader状态
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
// 设置选举轮数
logicalclock.set(n.electionEpoch);
// 这里会更新该节点的状态为follower或者leader
setPeerState(n.leader, voteSet);
// 返回最终的选票
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
return endVote;
}
break;
}
}
}

//问题:

1.leader事务什么时候提交,等所有的follower事务提交完成还是发送commit之后还是等一半follower提交事务之后再提交leader的事务

2.leader提交事务之后会通知客户端吗

3.如果leader发送commit给follower那时,follower挂了怎么办?