MIT-6.824 MapReduce
代码结构
coordinator 和 worker入口:
1 | /main/mrcoordinator |
具体实现:
1 | /mr/coordinator.go |
用来跑的应用:
1 | /mrapps/* |
代码思路
首先启动一个 coordinator:
1 | go run -race mrcoordinator.go pg-*.txt |
然后多开几个终端,启动一些 worker,这里跑一个 word count:
1 | go run -race mrworker.go wc.so |
大致流程
coordinator 在初始化时,把要干的任务先分好(简单起见,令一个文本文件为一个任务),启动一个 server,接收 worker 的任务请求。worker 向 coordinator 轮询请求任务。worker 既可以做 map 也可以做 reduce,由 coordinator 下发的任务决定。
map 的任务由 coordinator 设一个任务号,reduce任务则是有 nReduce 个,每个 map 任务产出nReduce个临时文件,比如 mr-mapId-reduceId,mapId 就是 coordinator 设置的任务号,reduceId 在这里是对 key(word count里就是word)做一个哈希函数算出来的 0~nReduce-1 的值。
每个 map 任务就是不断地从一个文件里读入,产出 word : 1这样的 pair,以 json 的形式存入一个个 mr-mapId-reduceId-tmp 临时文件中,worker 做完了之后通知 coordinator,coordinator 把这个mapId的所有 mr-mapId-reduceId-tmp文件改成 mr-mapId-reduceId,表示是已经完成的(如果超过了 ddl 或者 crash 导致通知失败,那么这些带 -tmp 的文件就视为无效了,coordinator 把挂了的任务重新分配给别的 worker,并把之前的 -tmp 文件删掉)。
等到所有 map 任务都结束了,开始 reduce。每个 reduce 任务处理自己 reduceId 的所有 mr-mapId-reduceId 文件,把它们的结果聚合到 mr-out-reduceId-tmp 文件里,然后通知 coordinator,通知同上,超过 ddl 则失效,没超过则让 coordinator 把 -tmp 去掉,成为正式的结果。
所有 reduce 任务结束后,coordinator 关闭,worker 们请求不到了也就自己去死了。
各种标记的设计
1 | type Coordinator struct { |
Map 任务数,Reduce 任务数,结束标志,任务列表,分配任务的锁。
1 | type MapTask struct { |
任务情况:Phase 记录了这个任务的当前状况。TaskNumber 记录这个任务被派发的次数(从第0次开始)。mux 是任务上的锁,避免这个任务被同时修改,产生有风险的 data race。
如果一个任务发出去,ddl 内没回来,这个任务被重新派给了另一个人,结果老的那个人过了一会先返回了,那我怎么知道这是新的返回还是老的返回?老的返回应该被判定为无效。
我的策略:派发任务时,coordinator 同时发出一个 taskNumber,worker 完成后也把这个taskNumber 传回来。每次 ddl 到达时 coordinator 处记录的 taskNumber 递增。只有回来的号和现在的号相符,才是正确的结果,否则无视。
RPC 传参和返回:
1 | type Args struct { |
Args:
AskForTask,是否是来找新任务的
IsMap,如果是来报道任务完成的,是 map 还是 reduce,用到下面的 MapTaskId 或ReduceTaskId
Reply:
IsFinished,是不是全部完成了,可以关闭了
IsMap,分配的是 map 还是 reduce
锁的设计和使用
- 上一条说的 ddl 处理和接收到 worker 的返回,这两者是有冲突的,不能让它们同时开始,否则错误五花八门。需要加锁,设计为加在单个任务上的锁。
- 分配任务时,map 和 reduce 分别用一个 coordinator 的全局锁,防止分出去同样的任务。
DDL实现
1 | // 派发任务后启动一个线程 |
1 | func (c *Coordinator) MapTaskChecker(MapTaskId int) { |
实现效果与测试
1 | ./test-mr.sh |
为什么 job count 挂了捏?
job count 这个测试的目的是检查在没有 failure 的情况下,一个 map 任务是否会被派发多次。在我的逻辑中,这是不可能的。那为什么挂呢?理论上 map 的任务个数等于输入的文件个数,这里是8,最后结果却比8大,经过仔细的检查,我确实每个任务只派发了一次(在没有失败的情况下),但是 jobcount.go 中是依靠统计文件夹内的文件来判定任务次数的,而当文件夹正在被写时,ioutil.ReadDir 可能会发生错乱(我发现了同一个文件被计入两次的情况),如果要解决,可以通过加一个 directory.lock 锁文件的方式,每次修改/读取文件夹内文件时,如果锁文件已经存在,阻塞,如果不存在,创建锁文件,任务结束后删除锁文件。(用锁文件的原因是分布式系统中内存不共享,我们需要一个能共享的锁,那就是本地存储)
并不影响其他计算型测试的正确性。