lab3B实验中主要实现的功能是raft的日志复制功能,建议多看几遍关于raft日志复制功能的过程。
原文pdf:http://nil.csail.mit.edu/6.5840/2024/papers/raft-extended.pdf
源码地址:https://github.com/liusir521/mit6.5840
实验要求
实现领导者和追随者代码以追加新的日志条目,以便go test -run 3B测试通过。
实验中的要求看似就一句话并不多,但是我们需要考虑很多细节上的东西,以及针对日志不同步的情况的处理等等。个人感觉3B难就难在细节太多,不是一下子就可以想全面的,所以这里才说需要多看几遍raft的日志复制功能的讲解。
我之前的关于分布式共识算法中也有关于raft日志复制相关的描述,这里再粘贴一下:
Raft 算法中,领导者通过广播消息(AppendEntries RPC)将日志条目复制到所有跟随者。AppendEntries RPC 的示例如下:
1
2
3
4
5
6
7
8
9
10
|
{
"term": 5, // 领导者的任期号
"leaderId": "leader-123",
"prevLogIndex": 8, // 前一日志条目的索引
"prevLogTerm": 4, // 前一日志条目的任期
"entries": [
{ "index": 9, "term": 5, "command": "set x=4" }, // 要复制的日志条目
],
"leaderCommit": 7// Leader 的“已提交”状态的日志条目索引号
}
|
当 Raft 集群收到客户端请求(例如 set x=4)时,日志复制的过程如下:

领导者向客户端返回结果,并不意味着日志复制过程已完全结束,跟随者尚不清楚日志条目是否已被大多数节点确认。Raft 的设计通过心跳或后续日志复制请求中携带更新的提交索引(leaderCommit),通知跟随者提交日志。此机制将“达成共识的过程”优化为一个阶段,减少了客户端约一半的等待时间。
我们来看日志复制的另一种情况。在上述例子中,只有 follower-1 成功追加日志,follower-2 因为日志不连续,追加失败。日志的连续性至关重要,如果日志条目没有按正确顺序应用到状态机,各个 follower 节点的状态肯定不一致。
日志不连续的问题是这样解决的:follower-2 收到日志复制请求后,它会通过 prevLogIndex 和 prevLogTerm 检查本地日志的连续性。如果日志缺失或存在冲突,follower-2 返回失败响应,指明与领导者日志不一致的部分。
1
2
3
4
5
6
|
{
"success": false,
"term": 4,
"conflictIndex": 4, // 表示发生缺失的日志索引,Follower 的日志中最大索引为 3,所以缺失的索引是 4。
"conflictTerm": 3//缺失日志的“上一个有效日志条目”的任期号
}
|
当领导者收到失败响应,根据 conflictIndex 和 conflictTerm 找到与跟随者日志的最大匹配索引(例如,6)。随后,领导者从该索引开始重新向跟随者(如 follower-2)发送日志条目,逐步修复日志的不一致性,直至同步完成。
OK,再看一遍raft的日志复制流程,才发现需要实现的功能确实不少。感觉3B确实可以算hard。
代码设计
好的,接下来讲讲我的代码设计。
首先,我本来准备延用3A中的设计思路,但是思考之下发现了一些问题,比如心跳机制,我在3A中设计的是当节点成为领导者之后再启动这个协程,但是在3B中,心跳函数中还需要实现一些额外的功能,如推进日志的应用等,再把心跳放到选举之后反而会增加代码的复杂度。
所以,我在Make函数初始化时便启动了这个心跳函数,如果当前节点是领导者时,会进行日志检测、同步日志的功能。
初始化
在3B中,也需要在raft结构体中添加一些额外的字段,来满足日志同步的需求,对领导者和跟随者来说,二者都需要已提交的日志的索引字段和已应用的日志的索引字段,对领导者来说,还需要额外定义一个切片来记录对每个跟随者下一个要发送的日志的索引,同时还需要另外一个切片来实现记录每个跟随者已经匹配的最大日志的索引。
初始化时便启动计时器和心跳函数。选举流程基本和3A没什么变动。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
currentTerm int // 当前任期
votedFor int // 投票给的节点
log []LogEntry // 日志
state int // 当前节点状态:跟随者、候选者、领导者
applyCh chan ApplyMsg // 应用通道
lastAppliedID int // 已应用的 ID
commitIndex int // 已提交的索引
lastheartbeattime time.Time // 最后一次收到心跳的时间
nextIndex []int // 对每个 follower,下一个要发送的 log index
matchIndex []int // 对每个 follower,已经匹配的最大 index
}
type LogEntry struct {
Command any // 命令
Term int // 日志的任期
}
const (
FOLLOWER = iota // 跟随者
CANDIDATE // 候选者
LEADER // 领导者
)
const (
UnCommitted = iota // 未提交
Committed // 已提交
Applied // 已应用
)
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// Your initialization code here (3A, 3B, 3C).
rf.currentTerm = 0
rf.votedFor = -1
rf.state = FOLLOWER // 初始状态都为跟随者
rf.lastheartbeattime = time.Now()
rf.applyCh = applyCh
rf.lastAppliedID = 0 // 已应用的索引,从0开始(表示还没有应用任何日志)
rf.commitIndex = 0 // 已提交的索引,从0开始(表示还没有提交任何日志)
rf.log = make([]LogEntry, 1)
rf.log[0] = LogEntry{Term: 0}
// 初始化领导者状态
rf.nextIndex = make([]int, len(peers))
rf.matchIndex = make([]int, len(peers))
for i := range rf.nextIndex {
rf.nextIndex[i] = len(rf.log) // 初始下一个要发送的索引是当前日志长度
rf.matchIndex[i] = 0 // 初始已匹配的索引是0
}
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
// 计时器守护进程,检测到超时后,开始选举,投自己
go rf.ticker()
go rf.heartbeatLoop()
return rf
}
// 心跳循环
func (rf *Raft) heartbeatLoop() {
for !rf.killed() {
time.Sleep(100 * time.Millisecond)
rf.mu.Lock()
if rf.state == LEADER {
term := rf.currentTerm
// 持续推进 commitIndex
rf.updateCommitIndex()
rf.mu.Unlock()
for i := range rf.peers {
if i != rf.me {
go rf.sendAppendEntriesToPeer(i, term)
}
}
} else {
rf.mu.Unlock()
}
}
}
|
日志同步
设置日志的入口函数其实是Start函数,在Start函数中,如果当前节点是领导者,就将新日志写入到日志存储字段中,然后触发广播复制日志的函数,将复制日志的消息广播给跟随者,根据返回消息的情况进行处理。
不过触发日志复制的时机在心跳函数中也存在,所以其实是否在Start函数中推进commitIndex都可以,当时在这里推进是想到可能刚好写完日志之后遇到心跳检测将这个日志进行了同步,然后这里直接推进并应用能更省事,其实这里是否推进并没有太大的关系,因为这个刚好写完就遇到心跳检测毕竟是小概率,两种情况测试都能通过的。
在日志同步的过程中,会有许多细节上的问题,首先,需要进行二次状态检测,防止信息发送未执行时发生了选举,其次在构建entry切片时,如果当前没有新的日志,就发送空切片,代表心跳。然后在发送append rpc请求前是需要释放锁的,防止rpc阻塞导致的死锁。然后根据响应查看是否存在缺失日志,存在则修改下标,在下次发送时进行同步。
关键代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
|
// 领导者向其他节点发送 AppendEntries 请求,server代表跟随者节点
func (rf *Raft) sendAppendEntriesToPeer(server int, term int) {
rf.mu.Lock()
// 二次检查状态,不是领导者直接退出
if rf.state != LEADER || term != rf.currentTerm {
rf.mu.Unlock()
return
}
// 确保 nextIndex 不越界
if rf.nextIndex[server] > len(rf.log) {
rf.nextIndex[server] = len(rf.log)
}
nextIdx := rf.nextIndex[server]
prevLogIndex := nextIdx - 1
// 获取 prevLogTerm
prevLogTerm := 0
if prevLogIndex >= 0 && prevLogIndex < len(rf.log) {
prevLogTerm = rf.log[prevLogIndex].Term
}
// 构建 entries 切片
var entries []LogEntry
if nextIdx < len(rf.log) {
entries = append([]LogEntry{}, rf.log[nextIdx:]...)
} else {
entries = []LogEntry{}
}
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entries,
LeaderCommit: rf.commitIndex,
}
rf.mu.Unlock()
// 在发送rpc请求时,需要先解锁,避免死锁
reply := AppendEntriesReply{}
ok := rf.sendAppendEntries(server, &args, &reply)
if !ok {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
// 再次检查状态
if rf.state != LEADER || term != rf.currentTerm {
return
}
// term 过期
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = FOLLOWER
rf.votedFor = -1
rf.persist()
return
}
if reply.Success {
rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[server] = rf.matchIndex[server] + 1
} else {
if reply.ConflictIndex != -1 {
rf.nextIndex[server] = reply.ConflictIndex
} else {
rf.nextIndex[server]--
}
if rf.nextIndex[server] < 1 {
rf.nextIndex[server] = 1
}
}
// 更新 commitIndex
rf.updateCommitIndex()
}
// 更新 commitIndex
func (rf *Raft) updateCommitIndex() {
for N := len(rf.log) - 1; N > rf.commitIndex; N-- {
count := 1
// 统计日志索引 N 是否被大多数节点提交
for i := range rf.peers {
if i != rf.me && rf.matchIndex[i] >= N {
count++
}
}
// 如果大多数节点都已提交到 N 索引位置,且 N 索引位置的日志任期与当前任期相同,则推进 commitIndex
if count > len(rf.peers)/2 && rf.log[N].Term == rf.currentTerm {
rf.commitIndex = N
go rf.applyLogs()
break
}
}
}
// 日志进行应用
func (rf *Raft) applyLogs() {
for {
rf.mu.Lock()
// 如果 lastAppliedID 已经大于等于 commitIndex,则没有日志需要应用
if rf.lastAppliedID >= rf.commitIndex {
rf.mu.Unlock()
return
}
rf.lastAppliedID++
msg := ApplyMsg{
CommandValid: true,
Command: rf.log[rf.lastAppliedID].Command,
CommandIndex: rf.lastAppliedID,
}
rf.mu.Unlock()
rf.applyCh <- msg
}
}
// 领导者发送日志请求结构体
// AppendEntriesreq
type AppendEntriesArgs struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry // 日志条目
LeaderCommit int // Leader 的“已提交”状态的日志条目索引号
}
// AppendEntriesReply
type AppendEntriesReply struct {
Term int
Success bool
ConflictIndex int // 表示发生缺失的日志索引
ConflictTerm int // 缺失日志的“上一个有效日志条目”的任期号
}
// 领导者发送日志请求
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}
// 在3A中只需要实现心跳功能即可,3B中需要实现日志复制
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
reply.Success = false
// 初始化
reply.ConflictIndex = -1
reply.ConflictTerm = -1
// term 检查
if args.Term < rf.currentTerm {
return
}
// 更新为 follower
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.state = FOLLOWER
}
rf.lastheartbeattime = time.Now()
// prevLogIndex 不存在,返回缺失的日志索引
if args.PrevLogIndex >= len(rf.log) {
reply.ConflictIndex = len(rf.log)
return
}
// term 不匹配,寻找错误日志的索引和任期
if args.PrevLogIndex >= 0 &&
rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.ConflictTerm = rf.log[args.PrevLogIndex].Term
i := args.PrevLogIndex
for i > 0 && rf.log[i-1].Term == reply.ConflictTerm {
i--
}
reply.ConflictIndex = i
return
}
// append entries
index := args.PrevLogIndex + 1 // 跟随者开始对齐的位置
i := 0 // 领导者发送的日志条目下标
// 寻找冲突日志的索引位置
for ; i < len(args.Entries); i++ {
if index+i < len(rf.log) {
if rf.log[index+i].Term != args.Entries[i].Term {
// 如果存在任期不匹配的日志,进行截断
rf.log = rf.log[:index+i]
break
}
} else {
break
}
}
// 将剩余的日志条目添加到跟随者日志中
if i < len(args.Entries) {
rf.log = append(rf.log, args.Entries[i:]...)
}
// 更新 commitIndex 和 lastAppliedID 索引并应用日志
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = min(args.LeaderCommit, len(rf.log)-1)
go rf.applyLogs()
}
reply.Success = true
rf.persist()
}
func (rf *Raft) Start(command interface{}) (int, int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.state != LEADER {
return -1, rf.currentTerm, false
}
index := len(rf.log)
term := rf.currentTerm
rf.log = append(rf.log, LogEntry{
Command: command,
Term: term,
})
// leader 自己的 matchIndex 必须更新
rf.matchIndex[rf.me] = len(rf.log) - 1
rf.persist()
// 立即尝试 commit
// rf.updateCommitIndex()
// 触发复制
go rf.broadcastAppendEntries()
return index, term, true
}
// 触发复制
func (rf *Raft) broadcastAppendEntries() {
rf.mu.Lock()
term := rf.currentTerm
rf.mu.Unlock()
for i := range rf.peers {
if i == rf.me {
continue
}
go rf.sendAppendEntriesToPeer(i, term)
}
}
|
其中需要对某个节点发送的日志的下标是在rf.nextIndex[server]中,包括如果说某个server的日志不匹配,修改的也是rf.nextIndex[server]的值,在发送时根据此下标进行获取相关日志。
运行结果
测试结果如下:
