hdfs和hbase的交互,和写mapreduce程序类似,只是需要修改输入输出数据和使用hbase的javaapi对其进行操作处理即可
public class hbasetohdfs extends toolrunner implements tool { private configuration configuration; //配置文件需要配置的属性 private static final string hdfs_name = "fs.defaultfs"; private static final string hdfs_value = "hdfs://mycluster"; private static final string mapreduce_name = "mapreduce.framework.name"; private static final string mapreduce_value = "yarn"; private static final string hbase_name = "hbase.zookeeper.quorum"; private static final string hbase_value = "qiaojunlong3:2181,qiaojunlong4:2181,qiaojunlong5:2181"; //获取hbase表的扫描对象 private scan getscan() { return new scan(); } @override public int run(string[] args) throws exception { getconf(); //获取job实例对象 job job = job.getinstance(configuration, "copy_move"); //map/reduce的class链接 job.setmapperclass(hbase_to_hdfs.class); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(nullwritable.class); //设置输入输出 //由hbase导数据到hdfs故输入端需要使用tablemapreduceutil类 tablemapreduceutil.inittablemapperjob("ns3:t5", getscan(), hbase_to_hdfs.class, text.class, nullwritable.class, job); fileoutputformat.setoutputpath(job, new path(args[0])); //设置jar包 job.setjarbyclass(hbasetohdfs.class); //提交作业 int b = job.waitforcompletion(true) ? 0 : 1; return b; } @override public void setconf(configuration configuration) { configuration.set(hdfs_name, hdfs_value); configuration.set(mapreduce_name, mapreduce_value); configuration.set(hbase_name, hbase_value); this.configuration = configuration; } @override public configuration getconf() { return configuration; } public static void main(string[] args) throws exception { toolrunner.run(hbaseconfiguration.create(),new hbasetohdfs() , args); } // 创建map程序 private static text mkey = new text();
static class hbase_to_hdfs extends tablemapper<text, nullwritable> { @override protected void map(immutablebyteswritable key, result value, context context) throws ioexception, interruptedexception { //定义字符串拼接 stringbuffer stringbuffer = new stringbuffer(); /** * 使用value获取扫描器,获取hbase表的列名/列值等信息 * 使用stringbuffer来对需要的信息进行字符串拼接 */ cellscanner cellscanner = value.cellscanner(); while (cellscanner.advance()) { cell cell = cellscanner.current(); stringbuffer.append(new string(cellutil.clonevalue(cell))).append("\t"); } mkey.set(stringbuffer.tostring()); context.write(mkey, nullwritable.get()); } } }
如对本文有疑问, 点击进行留言回复!!
去 HBase,Kylin on Parquet 性能表现如何?
如何找到Hive提交的SQL相对应的Yarn程序的applicationId
如何在 HBase Shell 命令行正常查看十六进制编码的中文?哈哈~
HBase Filter 过滤器之 Comparator 原理及源码学习
网友评论