当前位置: 移动技术网 > IT编程>数据库>其他数据库 > MIT-6.824 lab1-MapReduce

MIT-6.824 lab1-MapReduce

2019年03月21日  | 移动技术网IT编程  | 我要评论
概述 本lab将用go完成一个MapReduce框架,完成后将大大加深对MapReduce的理解。 Part I: Map/Reduce input and output 这部分需要我们实现common_map.go中的doMap()和common_reduce.go中的doReduce()两个函数 ...

概述

本lab将用go完成一个mapreduce框架,完成后将大大加深对mapreduce的理解。

part i: map/reduce input and output

这部分需要我们实现common_map.go中的domap()和common_reduce.go中的doreduce()两个函数。
可以先从测试用例下手:

func testsequentialsingle(t *testing.t) {
    mr := sequential("test", makeinputs(1), 1, mapfunc, reducefunc)
    mr.wait()
    check(t, mr.files)
    checkworker(t, mr.stats)
    cleanup(mr)
}

从sequential()开始调用链如下:
调用链
现在要做的是完成domap()和doreduce()。

domap():

func domap(
    jobname string, // the name of the mapreduce job
    maptask int, // which map task this is
    infile string,
    nreduce int, // the number of reduce task that will be run ("r" in the paper)
    mapf func(filename string, contents string) []keyvalue,
) {
    //打开infile文件,读取全部内容
    //调用mapf,将内容转换为键值对
    //根据reducename()返回的文件名,打开nreduce个中间文件,然后将键值对以json的格式保存到中间文件

    inputcontent, err := ioutil.readfile(infile)
    if err != nil {
        panic(err)
    }

    keyvalues := mapf(infile, string(inputcontent))

    var intermediatefileencoders []*json.encoder
    for reducetasknumber := 0; reducetasknumber < nreduce; reducetasknumber++ {
        intermediatefile, err := os.create(reducename(jobname, maptask, reducetasknumber))
        if err != nil {
            panic(err)
        }
        defer intermediatefile.close()
        enc := json.newencoder(intermediatefile)
        intermediatefileencoders = append(intermediatefileencoders, enc)
    }
    for _, kv := range keyvalues {
        err := intermediatefileencoders[ihash(kv.key) % nreduce].encode(kv)
        if err != nil {
            panic(err)
        }
    }
}

总结来说就是:

  1. 读取输入文件内容
  2. 将内容交个用户定义的map函数执行,生成键值对
  3. 保存键值对

doreduce:

func doreduce(
    jobname string, // the name of the whole mapreduce job
    reducetask int, // which reduce task this is
    outfile string, // write the output here
    nmap int, // the number of map tasks that were run ("m" in the paper)
    reducef func(key string, values []string) string,
) {
    //读取当前reducetasknumber对应的中间文件中的键值对,将相同的key的value进行并合
    //调用reducef
    //将reducef的结果以json形式保存到mergename()返回的文件中

    kvs := make(map[string][]string)
    for maptasknumber := 0; maptasknumber < nmap; maptasknumber++ {
        middatafilename := reducename(jobname, maptasknumber, reducetask)
        file, err := os.open(middatafilename)
        if err != nil {
            panic(err)
        }
        defer file.close()

        dec := json.newdecoder(file)
        for {
            var kv keyvalue
            err = dec.decode(&kv)
            if err != nil {
                break
            }
            values, ok := kvs[kv.key]
            if ok {
                kvs[kv.key] = append(values, kv.value)
            } else {
                kvs[kv.key] = []string{kv.value}
            }
        }
    }

    outputfile, err := os.create(outfile)
    if err != nil {
        panic(err)
    }
    defer outputfile.close()
    enc := json.newencoder(outputfile)
    for key, values := range kvs {
        enc.encode(keyvalue{key, reducef(key, values)})
    }
}

总结:

  1. 读取中间数据
  2. 执行reducef
  3. 保存结果

文件转换的过程大致如下:
文件转换

part ii: single-worker word count

这部分将用一个简单的实例展示如何使用mr框架。需要我们实现main/wc.go中的mapf()和reducef()来统计单词的词频。

mapf:

func mapf(filename string, contents string) []mapreduce.keyvalue {
    // your code here (part ii).
    words := strings.fieldsfunc(contents, func(r rune) bool {
        return !unicode.isletter(r)
    })
    var kvs []mapreduce.keyvalue
    for _, word := range words {
        kvs = append(kvs, mapreduce.keyvalue{word, "1"})
    }
    return kvs
}

将文本内容分割成单词,每个单词对应一个<word, "1">键值对。

reducef:

func reducef(key string, values []string) string {
    // your code here (part ii).
    return strconv.itoa(len(values))
}

value中有多少个"1",就说明这个word出现了几次。

part iii: distributing mapreduce tasks

目前实现的版本都是执行完一个map然后在执行下一个map,也就是说没有并行,这恰恰是mapreduce最大的买点。这部分需要实现schedule(),该函数将任务分配给worker去执行。当然这里并没有真正的多机部署,而是使用多线程进行模拟。
master和worker的关系大致如下:
master&worker
在创建worker对象的时候会调用register() rpc,master收到rpc后,将该worker的id保存在数组中,执行shedule()是可以根据该id,通过dotask() rpc调用该worker的dotask()执行map或reduce任务。

schedule.go

func schedule(jobname string, mapfiles []string, nreduce int, phase jobphase, registerchan chan string) {
    var ntasks int
    var n_other int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapphase:
        ntasks = len(mapfiles)
        n_other = nreduce
    case reducephase:
        ntasks = nreduce
        n_other = len(mapfiles)
    }

    fmt.printf("schedule: %v %v tasks (%d i/os)\n", ntasks, phase, n_other)

    //总共有ntasks个任务,registerchan中保存着空闲的workers
    taskchan := make(chan int)
    var wg sync.waitgroup
    go func() {
        for tasknumber := 0; tasknumber < ntasks; tasknumber++ {
            taskchan <- tasknumber
            fmt.printf("taskchan <- %d in %s\n", tasknumber, phase)
            wg.add(1)

        }

        wg.wait()                           //ntasks个任务执行完毕后才能通过
        close(taskchan)
    }()


    for task := range taskchan {            //所有任务都处理完后跳出循环
        worker := <- registerchan         //消费worker
        fmt.printf("given task %d to %s in %s\n", task, worker, phase)

        var arg dotaskargs
        arg.jobname = jobname
        arg.phase = phase
        arg.tasknumber = task
        arg.numotherphase = n_other

        if phase == mapphase {
            arg.file = mapfiles[task]
        }

        go func(worker string, arg dotaskargs) {
            if call(worker, "worker.dotask", arg, nil) {
                //执行成功后,worker需要执行其它任务
                //注意:需要先掉wg.done(),然后调register<-worker,否则会出现死锁
                //fmt.printf("worker %s run task %d success in phase %s\n", worker, task, phase)
                wg.done()
                registerchan <- worker  //回收worker
            } else {
                //如果失败了,该任务需要被重新执行
                //注意:这里不能用taskchan <- task,因为task这个变量在别的地方可能会被修改。比如task 0执行失败了,我们这里希望
                //将task 0重新加入到taskchan中,但是因为执行for循环的那个goroutine,可能已经修改task这个变量为1了,我们错误地
                //把task 1重新执行了一遍,并且task 0没有得到执行。
                taskchan <- arg.tasknumber
            }
        }(worker, arg)

    }
    fmt.printf("schedule: %v done\n", phase)

}

这里用到了两个channel,分别是registerchan和taskchan。
registerchan中保存了可用的worker id。
生产:

  1. worker调用register()进行注册,往里添加
  2. worker成功执行dotask()后,该worker需要重新加入registerchan

消费:

  1. schedule()拿到一个任务后,消费registerchan

taskchan中保存了任务号。任务执行失败需要重新加入taskchan。

part iv: handling worker failures

之前的代码已经体现了,对于失败的任务重新执行。

part v: inverted index generation

这是mapreduce的一个应用,生成倒排索引,比如想查某个单词出现在哪些文本中,就可以建立倒排索引来解决。

func mapf(document string, value string) (res []mapreduce.keyvalue) {
    // your code here (part v).
    words := strings.fieldsfunc(value, func(r rune) bool {
        return !unicode.isletter(r)
    })
    var kvs []mapreduce.keyvalue
    for _, word := range words {
        kvs = append(kvs, mapreduce.keyvalue{word, document})
    }
    return kvs
}

func reducef(key string, values []string) string {
    // your code here (part v).
    values = removeduplicationandsort(values)
    return strconv.itoa(len(values)) + " " + strings.join(values, ",")
}

func removeduplicationandsort(values []string) []string {
    kvs := make(map[string]struct{})
    for _, value := range values {
        _, ok := kvs[value]
        if !ok {
            kvs[value] = struct{}{}
        }
    }
    var ret []string
    for k := range kvs {
        ret = append(ret, k)
    }
    sort.strings(ret)
    return ret
}

mapf()生成<word, document>的键值对,reducef()处理word对应的所有document,去重并且排序,然后拼接到一起。

具体代码在:
如有错误,欢迎指正:
15313676365

如您对本文有疑问或者有任何想说的,请 点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网