计划过很多次,终于开始了6.824的征程;
希望一鼓作气!
一: MapReduce逻辑
二: 实验任务
- 完成用户端
map()
+reduce()
函数。调用MapReduce接口,用于单词统计,倒排索引生成; - 完成MapReduce端任务处理函数
doMap()
+doReduce()
,一个Map/Reduce的任务具体的执行流程; - 完成MapReduce端调度函数
schedule()
,master如何调度worker进行分布式作业; - 完成MapReduce端fault tolerance,处理worker failures;
三: 实验问题
Part1:Map/Reduce input and output
json编解码需要一致
//merge阶段,读取每个reduce任务输出的文件,一个个解析;
dec := json.NewDecoder(file)
for {
var kv KeyValue
err = dec.Decode(&kv)
if err != nil {
break
}
kvs[kv.Key] = kv.Value
}
//原始版本:reduce阶段,写入文件,将切片整体写入,导致merge无法解析,最终check不正确;
var values []KeyValue
enc := json.NewEncoder(file)
err:=enc.Encode(&values)
//修改版本:reduce阶段,写入文件,一个个写入;
enc := json.NewEncoder(file)
for key,values := range keys {
enc.Encode(KeyValue{key, reduceF(key,values)})
}
Part3: Distributing MapReduce tasks
//workers资源池
workers := make(chan string, ntasks)
go func() {
for workerAdress := range registerChan {
workers <- workerAdress
}
//原有错误代码
// workerAdress := <-registerChan
// workers <- workerAdress
}()
//错误原因:map/reduce阶段只接受一个worker;导致另一个worker工作任务为0;
//当map阶段和reduce阶段接受的worker不同时,偶尔返回正确答案,但也是错误的,因为每个阶段仍然是只有一个worker在处理;
注意并发参数
func main() {
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
fmt.Println(i)
wg.Done()
}
}
wg.Wait()
}
10
6
6
10
10
10
10
10
10
4
Part4: Handling worker failures
(1)
func work(workers chan string, doTaskArgs DoTaskArgs) {
defer wg.Done()
Here:
workerAdress := <-workers //获取工作线程
if call(workerAdress, "Worker.DoTask", doTaskArgs, nil) {
workers <- workerAdress //工作正常完成
}else{
workers <- workerAdress //工作非正常完成
goto Here
}
}
(2)
func work(workers chan string, doTaskArgs DoTaskArgs) {
defer wg.Done()
Here:
workerAdress := <-workers //获取工作线程
if call(workerAdress, "Worker.DoTask", doTaskArgs, nil) {//工作正常完成
workers <- workerAdress
}else{//工作非正常完成
// workers <- workerAdress
goto Here
}
}
崩溃的worker不需要再放入workers资源池中,worker的nRPC为0时,会断开链接;
for {
wk.Lock()
if wk.nRPC == 0 {
wk.Unlock()
break
}
wk.Unlock()
conn, err := wk.l.Accept()
if err == nil {
wk.Lock()
wk.nRPC--
wk.Unlock()
go rpcs.ServeConn(conn)
} else {
break
}
}
wk.l.Close()
debug("RunWorker %s exit\n", me)
Part5: Inverted index generation
题目的文件输出是按照文件名排序,并没有按照文件的次数排序;
func reduceF(key string, values []string) string {
// TODO: you should complete this to do the inverted index challenge
//每篇文章的数目
documentNumMap := make(map[string]int)
for _, document := range values {
documentNumMap[document]++
}
// //相同次数的文章集合
// numDocumentMap := make(map[int][]string)
// for document, number := range documentNumMap {
// numDocumentMap[number] = append(numDocumentMap[number], document)
// }
// //根据key排序
// documentNums := make([]int, 0)
// for number, _ := range numDocumentMap {
// documentNums = append(documentNums, number)
// }
// sort.Ints(documentNums)
// //输出
// documents:=make([]string,0)
// for index := 0; index < len(documentNums); index++ {
// documentNum := documentNums[index]
// documents = append(documents,numDocumentMap[documentNum]...)
// }
// return strconv.Itoa(len(documentNumMap)) + " " + strings.Join(documents, ",")
documents:=make([]string,0)
for document,_:=range documentNumMap{
documents = append(documents,document)
}
sort.Strings(documents)
return strconv.Itoa(len(documentNumMap)) + " " + strings.Join(documents, ",")
}
Test
实验每个part单独做测试时均正常运行,但是在最后整体测试时,part3、part4偶尔会出现报错:
error: listen unix /var/tmp/824-502/mr47307-worker13: socket: too many open files in system
//1:读取每个临时文件中的数据并整合到一起
keyValues := make([]KeyValue, 0)
for index := 0; index < nMap; index++ {
fileName := reduceName(jobName, index, reduceTaskNumber)
file, errFile := os.Open(fileName)
if errFile != nil {
log.Fatal(errFile)
}
defer file.Close()
var tempKeyValues []KeyValue //一个文件中的数据,临时,否则会被覆盖
errDecode := json.NewDecoder(file).Decode(&tempKeyValues)
keyValues = append(keyValues, tempKeyValues...)
if errDecode != nil {
log.Fatal(errDecode)
}
}
keyValues := make([]KeyValue, 0)
for index := 0; index < nMap; index++ {
fileName := reduceName(jobName, index, reduceTaskNumber)
file, err := os.Open(fileName)
if err != nil {
log.Fatal("doReduce: ",err)
}
dec:=json.NewDecoder(file)
for{
var kv KeyValue
err=dec.Decode(&kv)
if err!=nil {
break;
}
keyValues = append(keyValues,kv)
}
file.Close()
}
defer 导致函数结束时才释放文件,nMap个文件全部open状态,这可能是报错的原因;
修改完毕,全部测试pass;
四: 总结
- MapReduce
- 概念理解
- 实现理解
- 熟悉go语言
- 实验需求-》学习动力
- 调试BUG
- 学习提供代码
[2017.9 梦工厂]