对raft算法不熟悉的可以参考我之前记录过的分布式共识算法文章
https://liusir521.github.io/p/%E5%88%86%E5%B8%83%E5%BC%8F%E5%85%B1%E8%AF%86%E7%AE%97%E6%B3%95/
源码地址:https://github.com/liusir521/mit6.5840
实验要求
Lab3实验中要我们复刻raft算法,并由易到难的分为了4个部分,对应着raft算法中的三个关键点,领导者选举、日志复制以及安全性,当前是3A部分,对应的是领导者选举的部分。
3A任务要求:实现 Raft 领导者选举和心跳机制(使用 AppendEntries RPC,不记录日志)。第 3A 部分的目标是:选举出一个领导者,在没有故障的情况下保持领导者地位,并在旧领导者发生故障或与旧领导者之间的数据包丢失时,由新的领导者接管。运行 go test -run 3A来测试你的第 3A 部分代码。
关键点:
-
要实现心跳机制,请定义一个 AppendEntries RPC 结构体(尽管您可能暂时不需要所有参数),并让领导者定期发送这些参数。编写一个 AppendEntries RPC 处理方法。
-
测试人员要求领导者每秒发送心跳 RPC 的次数不得超过 10 次。
-
测试人员要求你的 Raft 在旧领导者失效后的 5 秒钟内选出新的领导者(如果大多数同伴仍然可以沟通)。
代码设计
在写代码之前建议先结合实验要求过一下代码,加深一下对代码中定义的理解。在我第一眼看到代码的时候,我有一个很大的疑惑,就是Raft结构体是给每个节点使用的,还是说相当于是一个中间调配者的角色,在我结合着代码阅读了几遍实验要求之后,才意识到其实每个节点都是一个Raft结构体实例。(在config.go文件中也可以看出cfg.rafts = make([]*Raft, cfg.n),用于测试文件中的初始化)
在3A中,我们需要实现的功能并不复杂,主要是领导者选举。这意味着我们只需要先定义好相关的结构体,然后在Make函数中进行初始化之后(初始化默认节点是跟随者),等待领导者发送心跳包即可,如果在规定时间内未收到,就进入选举过程,当某个节点完成选举成为主节点后,再继续向其他节点发送心跳包确认即可。(其中心跳包暂时使用AppendEntries函数实现)
初始化
需要我们在Raft结构体中完善相关的定义,关键字段包括当前任期、当前节点的身份以及最后一次收到心跳的时间等等。然后在Make函数中进行初始化。
关键代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (3A, 3B, 3C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
currentTerm int // 当前任期
votedFor int // 投票给的节点
log []LogEntry // 日志
state int // 当前节点状态:跟随者、候选者、领导者
lastheartbeattime time.Time // 最后一次收到心跳的时间
}
type LogEntry struct {
Command any // 日志内容
Term int // 日志的任期
}
const (
FOLLOWER = iota // 跟随者
CANDIDATE // 候选者
LEADER // 领导者
)
func (rf *Raft) GetState() (int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
return rf.currentTerm, rf.state == LEADER
}
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// Your initialization code here (3A, 3B, 3C).
rf.currentTerm = 0
rf.votedFor = -1
rf.log = make([]LogEntry, 0)
rf.state = FOLLOWER // 初始状态都为跟随者
rf.lastheartbeattime = time.Now()
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
// 计时器守护进程,检测到超时后,开始选举,投自己
go rf.ticker()
return rf
}
|
选举
在初始化时,起初应该都是跟随者,当某个跟随者等待心跳包超时之后就会发起选举,进入选举流程,然后通过rpc请求向除了自己之外的在peers数组中的其他节点发送选举请求,如果获得了多数派的同意,就成为领导者,成为领导者之后向其他跟随者发送心跳包,如果收到了任期比自己大的则变为跟随者。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
// 发送心跳的sendHeartbeat函数忘记粘贴了,建议自己实现一下。
func (rf *Raft) ticker() {
for rf.killed() == false {
// pause for a random amount of time between 50 and 350
// milliseconds.
ms := 50 + (rand.Int63() % 300)
time.Sleep(time.Duration(ms) * time.Millisecond)
rf.mu.Lock()
// Your code here (3A)
// Check if a leader election should be started.
// 只有 follower 和 candidate 才需要检查选举超时
if rf.state == FOLLOWER || rf.state == CANDIDATE {
electionTimeout := 150 + rand.Int63n(150) // 150-300ms 随机超时
if time.Since(rf.lastheartbeattime) > time.Duration(electionTimeout)*time.Millisecond {
// 超时,开始选举
rf.startElection()
// startElection 会重置 electionStart,所以这里不需要重复设置
}
}
rf.mu.Unlock()
}
}
func (rf *Raft) startElection() {
// 任期+1,状态为候选者
rf.currentTerm++
rf.state = CANDIDATE
rf.votedFor = rf.me // 投给自己
rf.lastheartbeattime = time.Now() // 重置选举超时时间
rf.persist()
// 获取最后一条日志的索引和任期
lastLogIndex := len(rf.log) - 1
lastLogTerm := 0
if lastLogIndex >= 0 {
lastLogTerm = rf.log[lastLogIndex].Term
}
args := &RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
votesReceived := 1 // 记录收到的投票数
var mu sync.Mutex
// 向除了自己之外的所有节点发送投票请求
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go func(server int) {
reply := &RequestVoteReply{}
ok := rf.sendRequestVote(server, args, reply)
mu.Lock()
defer mu.Unlock()
if !ok {
return
}
// 如果成功且收到投票,则记录
if reply.VoteGranted {
rf.mu.Lock()
if args.Term == rf.currentTerm && rf.state == CANDIDATE {
votesReceived++
// 获得大多数的投票,转换成领导者
if votesReceived > len(rf.peers)/2 {
rf.state = LEADER
rf.lastheartbeattime = time.Now()
go rf.sendHeartbeat()
}
}
rf.mu.Unlock()
}
// 如果收到新的任期大于当前任期,则转换成跟随者
if reply.Term > rf.currentTerm {
rf.mu.Lock()
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = FOLLOWER
rf.votedFor = -1
rf.lastheartbeattime = time.Now()
rf.persist()
}
rf.mu.Unlock()
return
}
}(i)
}
}
// 共用发送投票请求
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
return ok
}
// 选举rpc函数
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
reply.VoteGranted = false
rf.lastheartbeattime = time.Now()
// 如果请求中的任期小于当前任期,拒绝投票
if args.Term < rf.currentTerm {
return
}
// 如果当前任期小于请求的任期,更新任期并转为跟随者
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.state = FOLLOWER
}
// 如果尚未投票,或者是已经投给CandidateId,则校验日志进行投票
if args.Term == rf.currentTerm &&
(rf.votedFor == -1 || rf.votedFor == args.CandidateId) {
// 检查日志
lastLogIndex := len(rf.log) - 1
lastLogTerm := 0
if lastLogIndex >= 0 {
lastLogTerm = rf.log[lastLogIndex].Term
}
candidateLogUpToDate := (args.LastLogTerm > lastLogTerm) ||
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)
if candidateLogUpToDate {
reply.VoteGranted = true
rf.votedFor = args.CandidateId
}
}
rf.persist()
}
|
测试结果
测试如图所示
