当前位置: 移动技术网 > IT编程>数据库>其他数据库 > hbase与hdfs的交互

hbase与hdfs的交互

2019年05月21日  | 移动技术网IT编程  | 我要评论
hdfs和hbase的交互,和写mapreduce程序类似,只是需要修改输入输出数据和使用hbase的javaapi对其进行操作处理即可

public class hbasetohdfs extends toolrunner implements tool {

    private configuration configuration;
    //配置文件需要配置的属性
    private static final string hdfs_name = "fs.defaultfs";
    private static final string hdfs_value = "hdfs://mycluster";
    private static final string mapreduce_name = "mapreduce.framework.name";
    private static final string mapreduce_value = "yarn";
    private static final string hbase_name = "hbase.zookeeper.quorum";
    private static final string hbase_value = "qiaojunlong3:2181,qiaojunlong4:2181,qiaojunlong5:2181";

    //获取hbase表的扫描对象
    private scan getscan() {
        return new scan();
    }

    @override
    public int run(string[] args) throws exception {

        getconf();

        //获取job实例对象
        job job = job.getinstance(configuration, "copy_move");

        //map/reduce的class链接
        job.setmapperclass(hbase_to_hdfs.class);
        job.setmapoutputkeyclass(text.class);
        job.setmapoutputvalueclass(nullwritable.class);

        //设置输入输出
        //由hbase导数据到hdfs故输入端需要使用tablemapreduceutil类
        tablemapreduceutil.inittablemapperjob("ns3:t5", getscan(), hbase_to_hdfs.class, text.class, nullwritable.class, job);
        fileoutputformat.setoutputpath(job, new path(args[0]));

        //设置jar包
        job.setjarbyclass(hbasetohdfs.class);

        //提交作业
        int b = job.waitforcompletion(true) ? 0 : 1;

        return b;
    }

    @override
    public void setconf(configuration configuration) {
        configuration.set(hdfs_name, hdfs_value);
        configuration.set(mapreduce_name, mapreduce_value);
        configuration.set(hbase_name, hbase_value);
        this.configuration = configuration;
    }

    @override
    public configuration getconf() {
        return configuration;
    }

    public static void main(string[] args) throws exception {
        toolrunner.run(hbaseconfiguration.create(),new hbasetohdfs() , args);
    }

    // 创建map程序
    private static text mkey = new text();
static class hbase_to_hdfs extends tablemapper<text, nullwritable> { @override protected void map(immutablebyteswritable key, result value, context context) throws ioexception, interruptedexception { //定义字符串拼接 stringbuffer stringbuffer = new stringbuffer(); /** * 使用value获取扫描器,获取hbase表的列名/列值等信息 * 使用stringbuffer来对需要的信息进行字符串拼接 */ cellscanner cellscanner = value.cellscanner(); while (cellscanner.advance()) { cell cell = cellscanner.current(); stringbuffer.append(new string(cellutil.clonevalue(cell))).append("\t"); } mkey.set(stringbuffer.tostring()); context.write(mkey, nullwritable.get()); } } }

 

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网