当前位置: 移动技术网 > IT编程>脚本编程>Python > 使用python编写hadoop的mapper和reducer的实例详解

使用python编写hadoop的mapper和reducer的实例详解

2018年12月12日  | 移动技术网IT编程  | 我要评论

技校门 ed2k,无线密码破解工具,校草爱上花

hadoop streaming 原理

hadoop 本身是用 java 开发的,程序也需要用 java 编写,但是通过 hadoop streaming,我们可以使用任意语言来编写程序,让 hadoop 运行。

hadoop streaming 就是通过将其他语言编写的 mapper 和 reducer 通过参数传给一个事先写好的 java 程序(hadoop 自带的 *-streaming.jar),这个 java 程序会负责创建 mr 作业,另开一个进程来运行 mapper,将得到的输入通过 stdin 传给它,再将 mapper 处理后输出到 stdout 的数据交给 hadoop,经过 partition 和 sort 之后,再另开进程运行 reducer,同样通过 stdin/stdout 得到最终结果。

python的mapreduce代码

因此,使用python编写mapreduce代码的技巧就在于我们使用了 hadoopstreaming 来帮助我们在map 和 reduce间传递数据通过stdin (标准输入)和stdout (标准输出).我们仅仅使用python的sys.stdin来输入数据,使用sys.stdout输出数据,这样做是因为hadoopstreaming会帮我们办好其他事。


创建文件,上传文件

当前路径下,创建一本,包含英文单词(后面mapper 和reduce 统计单词频次需要使用)

hadoop@derekubun:/usr/local/hadoop$ bin/hdfs dfs -mkdir -p /input
hadoop@derekubun:/usr/local/hadoop$ bin/hdfs dfs -put ./book.txt /input
编写mapper.py 文件

将下列的代码保存在/home/hadoop/example/mapper.py中,他将从stdin读取数据并将单词成行分隔开,生成一个列表映射单词与发生次数的关系:

注意:要确保这个脚本有足够权限(chmod +x mapper.py)

#!/usr/bin/env python
import sys
# input comes from stdin (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to stdout (standard output);
        # what we output here will be the input for the
        # reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)
编写reducer 文件

将代码存储在/home/hadoop/example/reducer.py 中,这个脚本的作用是从mapper.py 的stdout中读取结果,然后计算每个单词出现次数的总和,并输出结果到stdout。

同样,要注意脚本权限:chmod +x reducer.py

#!/usr/bin/env python
from operator import itemgetter
import sys

current_word = none
current_count = 0
word = none

# input comes from stdin
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except valueerror:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this if-switch only works because hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to stdout
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)
自测

在运行mapreduce job测试前尝试手工测试你的mapper.py 和 reducer.py脚本,以免得不到任何返回结果。这里有一些建议,关于如何测试你的map和reduce的功能:

hadoop@derekubun:/usr/local/hadoop$ echo "foo foo quux labs foo bar quux" | ./mapper.py
foo      1
foo      1
quux     1
labs     1
foo      1
bar      1
quux     1
hadoop@derekubun:/usr/local/hadoop$ echo "foo foo quux labs foo bar quux" |./mapper.py | sort |./reducer.py
bar     1
foo     3
labs    1
quux    2
hadoop 运行

一切准备就绪,我们将在运行python mapreduce job 在hadoop集群上。像我上面所说的,我们使用的是hadoopstreaming 帮助我们传递数据在map和reduce间并通过stdin和stdout,进行标准化输入输出。

hadoop@derekubun:/usr/local/hadoop$ hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.7.6.jar  \
-mapper 'python mapper.py' \
-file /home/hadoop/example/mapper.py \
-reducer 'python reducer.py' \
-file /home/hadoop/example/reducer.py 
-input hdfs:/input/book.txt \
-output output

第一行是告诉 hadoop 运行 streaming 的 java 程序,接下来的是参数:

这里的mapper 后面跟的其实是一个命令。也就是说,-mapper 和 -reducer 后面跟的文件名不需要带上路径。而 -file 后的参数需要带上路径,为了让 hadoop 将程序分发给其他机器,需要-file 参数指明要分发的程序放在哪里。

注意:如果你在 mapper 后的命令用了引号,加上路径名反而会报错说找不到这个程序。(因为 -file 选项会将对应的本地参数文件上传至 hadoop streaming 的工作路径下,所以再执行 -mapper 对应的参数命令能直接找到对应的文件。

-input 和 -output 后面跟的是 hdfs 上的路径名,这里的 input/book.txt 指的是input 文件夹下刚才上传的文本文件,注意 -output 后面跟着的需要是一个不存在于 hdfs 上的路径,在产生输出的时候 hadoop 会帮你创建这个文件夹,如果已经存在的话就会产生冲突。因此每次执行 hadoop streaming 前可以通过脚本命令 hadoop fs -rmr 清除输出路径。

由于 mapper 和 reducer 参数跟的实际上是命令,所以如果每台机器上 python 的环境配置不一样的话,会用每台机器自己的配置去执行 python 程序。

结果获取

如果运行中遇到问题,注意看报错,然后进行调整。

运行结束之后,结果存储在hdfs上 output目录下。

查看结果:hadoop@derekubun:/usr/local/hadoop$ bin/hdfs dfs -ls output

从hdfs拷贝至本地:hadoop@derekubun:/usr/local/hadoop$ bin/hdfs dfs -get output/* ./

注:如果结果中包含_success 则说明本次运行成功。

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网