当前位置: 移动技术网 > IT编程>脚本编程>Go语言 > golang如何实现mapreduce单进程版本详解

golang如何实现mapreduce单进程版本详解

2018年01月25日  | 移动技术网IT编程  | 我要评论

前言

  mapreduce作为hadoop的编程框架,是工程师最常接触的部分,也是除去了网络环境和集群配 置之外对整个job执行效率影响很大的部分,所以很有必要深入了解整个过程。元旦放假的第一天,在家没事干,用golang实现了一下mapreduce的单进程版本,。处理对大文件统计最高频的10个单词,因为功能比较简单,所以设计没有解耦合。

  本文先对mapreduce大体概念进行介绍,然后结合代码介绍一下,如果接下来几天有空,我会实现一下分布式高可用的mapreduce版本。下面话不多说了,来一起看看详细的介绍吧。

1. mapreduce大体架构

  上图是论文中mapreduce的大体架构。总的来说mapreduce的思想就是分治思想:对数据进行分片,然后用mapper进行处理,以key-value形式输出中间文件;然后用reducer进行对mapper输出的中间文件进行合并:将key一致的合到一块,并输出结果文件;如果有需要,采用combiner进行最后的合并。

  归纳来说主要分为5部分:用户程序、master、mapper、reducer、combiner(上图未给出)。

  • 用户程序。用户程序主要对输入数据进行分割,制定mapper、reducer、combiner的代码。
  • master:中控系统。控制分发mapper、reduer的个数,比如生成m个进程处理mapper,n个进程处理reducer。其实对master来说,mapper和reduer都属于worker,只不过跑的程序不一样,mapper跑用户输入的map代码,reduer跑用户输入的reduce代码。master还作为管道负责中间路径传递,比如将mapper生成的中间文件传递给reduer,将reduer生成的结果文件返回,或者传递给combiner(如果有需要的话)。由于master是单点,性能瓶颈,所以可以做集群:主备模式或者分布式模式。可以用zookeeper进行选主,用一些消息中间件进行数据同步。master还可以进行一些策略处理:比如某个worker执行时间特别长,很有可能卡住了,对分配给该worker的数据重新分配给别的worker执行,当然需要对多份数据返回去重处理。
  • mapper:负责将输入数据切成key-value格式。mapper处理完后,将中间文件的路径告知master,master获悉后传递给reduer进行后续处理。如果mapper未处理完,或者已经处理完但是reduer未读完其中间输出文件,分配给该mapper的输入将重新被别的mapper执行。
  • reducer: 接受master发送的mapper输出文件的消息,rpc读取文件并处理,并输出结果文件。n个reduer将产生n个输出文件。
  • combiner: 做最后的归并处理,通常不需要。

  总的来说,架构不复杂。组件间通信用啥都可以,比如rpc、http或者私有协议等。

2. 实现代码介绍

  该版本代码实现了单机单进程版本,mapper、reducer和combiner的实现用协程goroutine实现,通信采用channel。代码写的比较随意,没有解耦合。

  • 功能:统计给定文件中出现的最高频的10个单词
  • 输入:大文件
  • 输出:最高频的10个单词
  • 实现:5个mapper协程、2个reducer、1个combiner。

  为了方便起见,combiner对最高频的10个单词进行堆排序处理,按规范来说应该放在用户程序处理。

  文件目录如下,其中bin文件夹下的big_input_file.txt为输入文件,可以调用generate下的main文件生成,caller文件为入口的用户程序,master目录下分别存放master、mapper、reducer、combiner代码:

.
├── readme.md
├── bin
│ └── file-store
│  └── big_input_file.txt
└── src
 ├── caller
 │ └── main.go
 ├── generate
 │ └── main.go
 └── master
  ├── combiner.go
  ├── mapper.go
  ├── master.go
  └── reducer.go

6 directories, 8 files 

2.1 caller

  用户程序,读入文件并按固定行数进行划分;然后调用master.handle进行处理。

package main
import ( 
 "os"
 "path"
 "path/filepath"
 "bufio"
 "strconv"
 "master"
 "github.com/vinllen/go-logger/logger"
)
const ( 
 limit int = 10000 // the limit line of every file
)
func main() { 
 curdir, err := filepath.abs(filepath.dir(os.args[0]))
 if err != nil {
  logger.error("read path error: ", err.error())
  return
 }
 filedir := path.join(curdir, "file-store")
 _ = os.mkdir(filedir, os.modeperm)
 // 1. read file
 filename := "big_input_file.txt"
 inputfile, err := os.open(path.join(filedir, filename))
 if err != nil {
  logger.error("read inputfile error: ", err.error())
  return
 }
 defer inputfile.close()
 // 2. split inputfile into several pieces that every piece hold 100,000 lines
 filepiecearr := []string{}
 scanner := bufio.newscanner(inputfile)
 piece := 1
outter: 
 for {
  outputfilename := "input_piece_" + strconv.itoa(piece)
  outputfilepos := path.join(filedir, outputfilename)
  filepiecearr = append(filepiecearr, outputfilepos)
  outputfile, err := os.create(outputfilepos)
  if err != nil {
   logger.error("split inputfile error: ", err.error())
   continue
  }
  defer outputfile.close()
  for cnt := 0; cnt < limit; cnt++ {
   if !scanner.scan() {
    break outter
   }
   _, err := outputfile.writestring(scanner.text() + "\n")
   if err != nil {
    logger.error("split inputfile writting error: ", err.error())
    return
   }
  }
  piece++
 }
 // 3. pass to master
 res := master.handle(filepiecearr, filedir)
 logger.warn(res)
}

2.2 master

  master程序,依次生成combiner、reducer、mapper,处理消息中转,输出最后结果。

package master
import (
 "github.com/vinllen/go-logger/logger"
)
var ( 
 mapchanin chan mapinput // channel produced by master while consumed by mapper
 mapchanout chan string // channel produced by mapper while consumed by master
 reducechanin chan string // channel produced by master while consumed by reducer
 reducechanout chan string // channel produced by reducer while consumed by master
 combinechanin chan string // channel produced by master while consumed by combiner
 combinechanout chan []item // channel produced by combiner while consumed by master
)
func handle(inputarr []string, filedir string) []item { 
 logger.info("handle called")
 const(
  mappernumber int = 5
  reducernumber int = 2
 )
 mapchanin = make(chan mapinput)
 mapchanout = make(chan string)
 reducechanin = make(chan string)
 reducechanout = make(chan string)
 combinechanin = make(chan string)
 combinechanout = make(chan []item)
 reducejobnum := len(inputarr)
 combinejobnum := reducernumber
 // start combiner
 go combiner()
 // start reducer
 for i := 1; i <= reducernumber; i++ {
  go reducer(i, filedir)
 }
 // start mapper
 for i := 1; i <= mappernumber; i++ {
  go mapper(i, filedir)
 }
 go func() {
  for i, v := range(inputarr) {
   mapchanin <- mapinput{
    filename: v,
    nr: i + 1,
   } // pass job to mapper
  }
  close(mapchanin) // close map input channel when no more job
 }()
 var res []item
outter: 
 for {
  select {
   case v := <- mapchanout:
    go func() {
     reducechanin <- v
     reducejobnum--
     if reducejobnum <= 0 {
      close(reducechanin)
     }
    }()
   case v := <- reducechanout:
    go func() {
     combinechanin <- v
     combinejobnum--
     if combinejobnum <= 0 {
      close(combinechanin)
     }
    }()
   case v := <- combinechanout:
    res = v
    break outter
  }
 }
 close(mapchanout)
 close(reducechanout)
 close(combinechanout)
 return res
}

2.3 mapper

  mapper程序,读入并按key-value格式生成中间文件,告知master。

package master
import ( 
 "fmt"
 "path"
 "os"
 "bufio"
 "strconv"

 "github.com/vinllen/go-logger/logger"
)
type mapinput struct { 
 filename string
 nr int
}
func mapper(nr int, filedir string) { 
 for {
  val, ok := <- mapchanin // val: filename
  if !ok { // channel close
   break
  }
  inputfilename := val.filename
  nr := val.nr
  file, err := os.open(inputfilename)
  if err != nil {
   errmsg := fmt.sprintf("read file(%s) error in mapper(%d)", inputfilename, nr)
   logger.error(errmsg)
   mapchanout <- ""
   continue
  }
  mp := make(map[string]int)
  scanner := bufio.newscanner(file)
  scanner.split(bufio.scanwords)
  for scanner.scan() {
   str := scanner.text()
   //logger.info(str)
   mp[str]++
  }
  outputfilename := path.join(filedir, "mapper-output-" + strconv.itoa(nr))
  outputfilehandler, err := os.create(outputfilename)
  if err != nil {
   errmsg := fmt.sprintf("write file(%s) error in mapper(%d)", outputfilename, nr)
   logger.error(errmsg)
  } else {
   for k, v := range mp {
    str := fmt.sprintf("%s %d\n", k, v)
    outputfilehandler.writestring(str)
   }
   outputfilehandler.close()
  }
  mapchanout <- outputfilename
 }
}

2.4 reducer

  reducer程序,读入master传递过来的中间文件并归并。

package master
import ( 
 "fmt"
 "bufio"
 "os"
 "strconv"
 "path"
 "strings"
 "github.com/vinllen/go-logger/logger"
)
func reducer(nr int, filedir string) { 
 mp := make(map[string]int) // store the frequence of words
 // read file and do reduce
 for {
  val, ok := <- reducechanin
  if !ok {
   break
  }
  logger.debug("reducer called: ", nr)
  file, err := os.open(val)
  if err != nil {
   errmsg := fmt.sprintf("read file(%s) error in reducer", val)
   logger.error(errmsg)
   continue
  }
  scanner := bufio.newscanner(file)
  for scanner.scan() {
   str := scanner.text()
   arr := strings.split(str, " ")
   if len(arr) != 2 {
    errmsg := fmt.sprintf("read file(%s) error that len of line(%s) != 2(%d) in reducer", val, str, len(arr))
    logger.warn(errmsg)
    continue
   }
   v, err := strconv.atoi(arr[1])
   if err != nil {
    errmsg := fmt.sprintf("read file(%s) error that line(%s) parse error in reduer", val, str)
    logger.warn(errmsg)
    continue
   }
   mp[arr[0]] += v
  }
  if err := scanner.err(); err != nil {
   logger.error("reducer: reading standard input:", err)
  }
  file.close()
 }
 outputfilename := path.join(filedir, "reduce-output-" + strconv.itoa(nr))
 outputfilehandler, err := os.create(outputfilename)
 if err != nil {
  errmsg := fmt.sprintf("write file(%s) error in reducer(%d)", outputfilename, nr)
  logger.error(errmsg)
 } else {
  for k, v := range mp {
   str := fmt.sprintf("%s %d\n", k, v)
   outputfilehandler.writestring(str)
  }
  outputfilehandler.close()
 }
 reducechanout <- outputfilename
}

2.5 combiner

  combiner程序,读入master传递过来的reducer结果文件并归并成一个,然后堆排序输出最高频的10个词语。

package master
import ( 
 "fmt"
 "strings"
 "bufio"
 "os"
 "container/heap"
 "strconv"

 "github.com/vinllen/go-logger/logger"
)
type item struct { 
 key string
 val int
}
type priorityqueue []*item
func (pq priorityqueue) len() int { 
 return len(pq)
}
func (pq priorityqueue) less(i, j int) bool { 
 return pq[i].val > pq[j].val
}
func (pq priorityqueue) swap(i, j int) { 
 pq[i], pq[j] = pq[j], pq[i]
}
func (pq *priorityqueue) push(x interface{}) { 
 item := x.(*item)
 *pq = append(*pq, item)
}
func (pq *priorityqueue) pop() interface{} { 
 old := *pq
 n := len(old)
 item := old[n - 1]
 *pq = old[0 : n - 1]
 return item
}
func combiner() { 
 mp := make(map[string]int) // store the frequence of words
 // read file and do combine
 for {
  val, ok := <- combinechanin
  if !ok {
   break
  }
  logger.debug("combiner called")
  file, err := os.open(val)
  if err != nil {
   errmsg := fmt.sprintf("read file(%s) error in combiner", val)
   logger.error(errmsg)
   continue
  }
  scanner := bufio.newscanner(file)
  for scanner.scan() {
   str := scanner.text()
   arr := strings.split(str, " ")
   if len(arr) != 2 {
    errmsg := fmt.sprintf("read file(%s) error that len of line != 2(%s) in combiner", val, str)
    logger.warn(errmsg)
    continue
   }
   v, err := strconv.atoi(arr[1])
   if err != nil {
    errmsg := fmt.sprintf("read file(%s) error that line(%s) parse error in combiner", val, str)
    logger.warn(errmsg)
    continue
   }
   mp[arr[0]] += v
  }
  file.close()
 }
 // heap sort
 // pq := make(priorityqueue, len(mp))
 pq := make(priorityqueue, 0)
 heap.init(&pq)
 for k, v := range mp {
  node := &item {
   key: k,
   val: v,
  }
  // logger.debug(k, v)
  heap.push(&pq, node)
 }
 res := []item{}
 for i := 0; i < 10 && pq.len() > 0; i++ {
  node := heap.pop(&pq).(*item)
  res = append(res, *node)
 }
 combinechanout <- res
}

3. 总结

  不足以及未实现之处:

  • 各模块间耦合性高
  • master单点故障未扩展
  • 未采用多进程实现,进程间采用rpc通信
  • 未实现单个workder时间过长,另起worker执行任务的代码。

  接下来要是有空,我会实现分布式高可用的代码,模块间采用rpc通讯。

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对移动技术网的支持。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网