Featured image of post Mit lab1个人笔记

Mit lab1个人笔记

Mit 6.824 lab1个人笔记

源码地址:https://github.com/liusir521/mit6.5840

前言

久闻mit6.824大名,一直没找到合适的时机进行实操,最近在工作之余对此课程进行了学习。这篇文章总结一下个人学习lab1时的一些心得体会。

lab1简介

在本实验中,你将构建一个 MapReduce 系统。你将实现一个工作进程,该进程调用应用程序的 Map 和 Reduce 函数并处理文件的读写操作;以及一个协调器进程,该进程将任务分配给工作进程并处理故障的工作进程。

实验中给出了很多细节上的方案与提示,建议先多看几遍实验任务。

流程介绍:我们需要通过多个Map任务去读取系统给出的文件中的单词,并根据单词的key的hash值将其归入指定的中间文件中(mr-X-Y),当所有的Map任务均处理完毕之后,启动Reduce任务,每个Reduce任务会找到自己对应的中间文件并对内部的单词进行统计,最后输出到对应的文件中(mr-out-Y)。

流程图如下:

流程图

其中X代表的是map任务的编号,Y代表的是reduce任务的编号

总体设计

总体需要我们来实现的其实并不是关于KeyValue键值对的处理,这部分可以参考示例中给出的代码,无需我们再次实现,只需在原本单机场景之下改写为多任务模式即可。主要需要我们实现的是关于任务的分配处理代码。主要包括三部分:

  • rpc:定义实验中所需要的关于rpc通讯用到的相关结构体

  • Worker:定义工作进程相关的代码

  • Coordinator:起到协调作用,定义关于任务分配等相关的具体实现

具体实现

Coordinator

Coordinator中需要我们定义整体任务分配相关的结构体,从原有的函数中可以看出还需要我们实现rpc被调用方的相关函数,包括获取任务、完成任务以及Coordinator初始化。同时官方给出需要进行超时校验(10s),我们还需要在获取任务时进行超时校验。

关于Coordinator结构体我们需要定义的关键内容有:

  • files:文件数组,用于分配给各个map任务执行,每个map对应一个文件,所以map的总任务数其实就是文件总数

  • nReduce:系统指定的reduce任务的总数

  • mapTasks:map的任务列表

  • reduceTasks:reduce的任务列表

  • isMapFinished:map任务是否全部完成的标志

  • isReduceFinished:reduce任务是否全部完成的标志,只有map任务全部完成之后才会启动reduce任务,reduce任务全部完成之后也就标志的整体流程结束

  • sync.Mutex:同理需要一个锁对象来控制流程(也可以对map和reduce分别设置,这里简便实现只设置一个)

关于Task任务结构体我们需要定义的关键内容有:

  • TaskType:任务类型,map or reduce

  • TaskStatus:任务状态,等待中、运行中、已完成

  • NReduce:系统NReduce变量,用于map任务中对应的key流向哪个文件(hash之后对NReduce取余)

  • MapTaskIndex:如果是map任务,当前map任务的编号

  • ReduceTaskIndex:如果是reduce任务,当前reduce任务的编号

  • FileName:如果是map任务,当前map任务要处理的文件名

我这里只是一个粗略的设计,有很多地方可以进行优化,比如可以添加map、reduce还未完成的数量,通过原子类操作数量的方式会比我这里使用遍历的方式更加的合理。锁也可以设计为两个,在超时校验时或许可以再节省一些时间。

获取任务

在获取任务时,需要首先判断是否存在超时文件,然后接着首先判断是否需要分配map任务,先把map任务分配完毕,然后才是reduce任务,当reduce任务也执行完毕,就返回退出标志。(当map任务都在执行,reduce任务尚未开启,此时返回等待标志。reduce任务都在执行时也是)

关键代码如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
type Coordinator struct {
	// Your definitions here.
	mu               sync.Mutex // 共用锁
	files            []string
	nReduce          int
	mapTasks         []Task // map任务列表
	reduceTasks      []Task // reduce任务列表
	isMapFinished    bool   // map任务是否完成
	isReduceFinished bool   // reduce任务是否完成
	nextTaskId       int
}

// 任务结构体
type Task struct {
	TaskID          int
	TaskType        int
	TaskStatus      int // 当前任务状态
	NReduce         int // map任务需要知道分配到多少个reduce任务
	MapTaskIndex    int
	ReduceTaskIndex int
	FileName        string
	StartTime       time.Time // 用于判断超时
}

// Your code here -- RPC handlers for the worker to call.

// 获取任务
func (c *Coordinator) GetTask(args *GetTaskReq, reply *GetTaskReply) error {

	c.mu.Lock()
	defer c.mu.Unlock()

	// 判断是否所有任务均完成
	if c.isMapFinished && c.isReduceFinished {
		reply.TaskType = ExitTask
		return nil
	}

	// 判断是否有超时任务
	c.checkTimeoutTask()

	// 判断map任务是否都完成
	if !c.isMapFinished {
		for i, task := range c.mapTasks {
			// 如果存在map waiting任务,则将其改为running,并返回给worker执行
			if c.mapTasks[i].TaskStatus == Waiting {
				reply.TaskID = task.TaskID
				reply.TaskType = task.TaskType
				reply.FileName = task.FileName
				reply.NReduce = task.NReduce
				reply.ReduceID = task.ReduceTaskIndex
				c.mapTasks[i].TaskStatus = Running
				c.mapTasks[i].StartTime = time.Now()
				return nil
			}
		}
		reply.TaskType = WaitingTask
		return nil
	}

	// 到这里说明map任务已经完成,判断reduce任务是否都完成
	if !c.isReduceFinished {
		for i, task := range c.reduceTasks {
			// 如果存在reduce waiting任务,则将其改为running,并返回给worker执行
			if c.reduceTasks[i].TaskStatus == Waiting {
				reply.TaskID = task.TaskID
				reply.TaskType = task.TaskType
				reply.AllMapNum = len(c.mapTasks)
				reply.ReduceID = task.ReduceTaskIndex
				c.reduceTasks[i].TaskStatus = Running
				c.reduceTasks[i].StartTime = time.Now()
				return nil
			}
		}

	}

	reply.TaskType = WaitingTask
	return nil
}

// 查看是否存在任务超时
func (c *Coordinator) checkTimeoutTask() {
	// 超时时间 10s
	timeout := 10 * time.Second

	// 如果 map 任务已经完成,则返回
	// if c.isMapFinished {
	// 	return
	// }
	// 这里不可以直接return,会导致下面的reduce任务无法判断

	now := time.Now()

	if !c.isMapFinished {
		for i := 0; i < len(c.mapTasks); i++ {
			task := c.mapTasks[i]
			// 判断正在执行的任务是否存在超时的
			if task.TaskStatus == Running && now.Sub(task.StartTime) > timeout {
				// 如果存在,将状态改为 waiting,等待其他 worker 来执行
				c.mapTasks[i].TaskStatus = Waiting
				// 这里不需要修改任务的time,后面检测到waiting时,会重新标记时间
				// return 不 return,继续判断下一个任务,否则每次只标记到了一个任务就返回了
			}
		}
	}

	// 如果 reduce 任务已经完成,则返回
	// 这里可以直接return,代表全部任务执行完毕
	if c.isReduceFinished {
		return
	}
	for i := 0; i < len(c.reduceTasks); i++ {
		task := c.reduceTasks[i]
		// 存在超时任务
		if task.TaskStatus == Running && now.Sub(task.StartTime) > timeout {
			// 将状态改为 waiting,等待其他 worker 来执行
			c.reduceTasks[i].TaskStatus = Waiting
			// 这里不需要修改任务的time,后面检测到waiting时,会重新标记时间
			// return
		}
	}
}

func MakeCoordinator(files []string, nReduce int) *Coordinator {

	c := Coordinator{
		files:            files,
		nReduce:          nReduce,
		mapTasks:         make([]Task, len(files)),
		reduceTasks:      make([]Task, nReduce),
		nextTaskId:       0,
		isMapFinished:    false,
		isReduceFinished: false,
	}

	// map任务初始化
	for i, file := range files {
		c.mapTasks[i] = Task{
			TaskID:       i,
			TaskType:     MapTask,
			TaskStatus:   Waiting,
			NReduce:      nReduce,
			MapTaskIndex: i,
			FileName:     file,
			StartTime:    time.Now(),
		}
		c.nextTaskId++
	}

	// reduce任务初始化
	for i := 0; i < nReduce; i++ {
		c.reduceTasks[i] = Task{
			TaskID:          i,
			TaskType:        ReduceTask,
			TaskStatus:      Waiting,
			ReduceTaskIndex: i,
		}
		c.nextTaskId++
	}

	c.server()
	return &c
}

刚开始时,我在校验超时时间中犯了个逻辑错误,我先校验了map任务是否都已经完成了,完成之后我直接return了,这个操作就导致了后面超时函数无法校验到reduce任务的状态(是否存在超时任务),就导致最后一个测验一直无法通过。

完成任务

在完成任务中,我们需要判断当前完成的任务的类型,然后判断是否当前所有类型的任务都完成了。关键代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 完成任务
func (c *Coordinator) CompleteTask(args *CompleteTaskReq, reply *CompleteTaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	// 完成map任务
	switch args.TaskType {
	case MapTask:
		// map任务完成
		// 判断所有map任务是否完成

		allmapfinished := true
		for i, task := range c.mapTasks {
			if args.TaskID == task.TaskID {
				c.mapTasks[i].TaskStatus = Finished
			} else if task.TaskStatus != Finished {
				allmapfinished = false
			}
		}
		// 所有map任务完成
		c.isMapFinished = allmapfinished
	case ReduceTask:
		// 完成reduce任务
		// 检测reduce任务是否完成
		if c.isReduceFinished {
			return nil
		}

		allreducefinished := true
		for i, task := range c.reduceTasks {
			if args.TaskID == task.TaskID {
				c.reduceTasks[i].TaskStatus = Finished
			} else if task.TaskStatus != Finished {
				allreducefinished = false
			}
		}
		c.isReduceFinished = allreducefinished
	}

	return nil
}

rpc

rpc.go文件中主要是对实验中需要用到的rpc相关的请求和响应的相关结构体,关键代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 任务类型枚举定义
const (
	MapTask = iota
	ReduceTask
	WaitingTask // 任务正在执行,此时需要task等待
	ExitTask    // map、reduce任务已经执行完毕,退出信号
)

// task任务状态枚举定义
const (
	Waiting = iota
	Running
	Finished
)

// 获取任务请求
type GetTaskReq struct {
	WorkerID int
}

// 获取任务响应
type GetTaskReply struct {
	TaskID     int
	TaskType   int // 当收到ExitTask时,结束标志
	FileName   string
	AllMapNum  int // 一共有多少map任务,用于reduce过程,其实相当于map处理文件的个数,一个map任务对应一个文件
	ReduceID   int // 当前reduce任务负责的分区编号
	NReduce    int
	TaskStatus int
}

// 完成任务请求
type CompleteTaskReq struct {
	TaskType int
	TaskID   int
}

// 完成任务响应
type CompleteTaskReply struct {
}

其中,GetTaskReply就是获取task任务的响应结构体,根据tasktype字段判断当前任务是map任务还是reduce任务,然后再从里面获取需要使用到的字段。

Worker

在worker中,需要我们通过rpc向调度器获取任务并执行具体的任务,当任务执行完毕时通过rpc向调度器汇报。其中具体的执行任务流程可以参考示例中给出的代码。关于文件的操作,可以使用课程中提示的中间文件的形式。

关键代码如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// 请求任务并执行
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {

	// Your worker implementation here.
	for {

		workerid := os.Getpid()
		reply := getWorkerTask(workerid)

		switch reply.TaskType {
		case MapTask:
			startMapTask(workerid, reply, mapf)
		case ReduceTask:
			startReduceTask(workerid, reply, reducef)
		case WaitingTask:
			// 休眠0.5秒
			time.Sleep(500 * time.Millisecond)
			continue
		case ExitTask:
			return
		}
	}
	// uncomment to send the Example RPC to the coordinator.
	// CallExample()

}

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

func startReduceTask(workerid int, reply GetTaskReply, reducef func(string, []string) string) {
	// 根据传入的reduceNum确定所负责的任务index
	// 然后去所有map产出的文件中找到对应index的文件进行reduce

	kva := []KeyValue{}

	for i := 0; i < reply.AllMapNum; i++ {
		curfilename := fmt.Sprintf("mr-%d-%d", i, reply.ReduceID)

		file, err := os.Open(curfilename)
		if err != nil {
			log.Fatalf("reduce cannot open %v", file)
		}

		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			kva = append(kva, kv)
		}
		file.Close()
	}

	// 对结果进行排序
	sort.Sort(ByKey(kva))

	tempfile, err := os.CreateTemp("", "mr-out-tmp-*")
	if err != nil {
		log.Fatal("create tempfile err", err)
	}

	// 参考示例代码处理键值对
	i := 0
	for i < len(kva) {
		j := i + 1
		for j < len(kva) && kva[j].Key == kva[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, kva[k].Value)
		}
		output := reducef(kva[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(tempfile, "%v %v\n", kva[i].Key, output)

		i = j
	}

	tempfile.Close()

	err = os.Rename(tempfile.Name(), fmt.Sprintf("mr-out-%d", reply.ReduceID))

	if err != nil {
		log.Fatal("rename err", err)
	}

	completeTask(reply.TaskID, reply.TaskType)
}

// Map任务
func startMapTask(workerid int, reply GetTaskReply, mapf func(string, string) []KeyValue) {
	// 首先读取文件kv,并根据kv的hash分配到不同的切片中
	// 然后使用临时文件的方式,将这些kv存入对应的临时文件,最后写入对应的reduce文件中

	// 读取文件内容参考示例文件
	filename := reply.FileName

	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("map cannot open %v %v", filename, err)
	}
	content, err := io.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	}
	file.Close()
	kva := mapf(filename, string(content))
	// intermediate = append(intermediate, kva...)
	// 示例文件给出的是单机版本,这里需要实现并发版本

	intermediate := make([][]KeyValue, reply.NReduce)

	for _, kv := range kva {
		// 计算hash并分配
		index := ihash(kv.Key) % reply.NReduce
		intermediate[index] = append(intermediate[index], kv)
	}

	// 临时文件方式存储
	for i := 0; i < reply.NReduce; i++ {
		tempfile, err := os.CreateTemp("", "mr-tmp-*")
		if err != nil {
			log.Fatal("create tempfile err", err)
		}

		// 使用推荐的json包处理kv
		encoder := json.NewEncoder(tempfile)
		for _, kv := range intermediate[i] {
			err := encoder.Encode(&kv)
			if err != nil {
				log.Fatal("json write tempfile err", err)
			}
		}
		tempfile.Close()
		reducename := fmt.Sprintf("mr-%d-%d", reply.TaskID, i)
		os.Rename(tempfile.Name(), reducename)
	}
	// 汇报任务执行完成
	completeTask(reply.TaskID, MapTask)
}

// 通过rpc获取任务
func getWorkerTask(workerID int) GetTaskReply {
	// rpc获取任务
	req := GetTaskReq{
		WorkerID: workerID,
	}

	reply := GetTaskReply{}

	call("Coordinator.GetTask", &req, &reply)

	return reply
}

// 汇报任务完成
func completeTask(taskid, tasktype int) {
	req := CompleteTaskReq{
		TaskID:   taskid,
		TaskType: tasktype,
	}

	reply := CompleteTaskReply{}

	call("Coordinator.CompleteTask", &req, &reply)
}

总结

个人感觉,实验主要还是考验的思维逻辑能力,而不是一些细节实现,毕竟一些关键实现实验中已经提供了。

代码执行结果如下:

执行结果

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计