当前位置: 移动技术网 > IT编程>数据库>MongoDB > 通用MapReduce程序复制HBase表数据

通用MapReduce程序复制HBase表数据

2019年05月28日  | 移动技术网IT编程  | 我要评论

编写mr程序,让其可以适合大部分的hbase表数据导入到hbase表数据。其中包括可以设置版本数、可以设置输入表的列导入设置(选取其中某几列)、可以设置输出表的列导出设置(选取其中某几列)。

原始表test1数据如下:

每个row key都有两个版本的数据,这里只显示了row key为1的数据

 在hbase shell 中创建数据表:

create 'test2',{name => 'cf1',versions => 10}  // 保存无版本、无列导入设置、无列导出设置的数据
create 'test3',{name => 'cf1',versions => 10}  // 保存无版本、无列导入设置、有列导出设置的数据
create 'test4',{name => 'cf1',versions => 10}  // 保存无版本、有列导入设置、无列导出设置的数据
create 'test5',{name => 'cf1',versions => 10}  // 保存有版本、无列导入设置、无列导出设置的数据
create 'test6',{name => 'cf1',versions => 10}  // 保存有版本、无列导入设置、有列导出设置的数据
create 'test7',{name => 'cf1',versions => 10}  // 保存有版本、有列导入设置、无列导出设置的数据
create 'test8',{name => 'cf1',versions => 10}  // 保存有版本、有列导入设置、有列导出设置的数据

main函数入口:

package generalhbasetohbase;
import org.apache.hadoop.util.toolrunner;
public class drivertest {
 public static void main(string[] args) throws exception {
 // 无版本设置、无列导入设置,无列导出设置
 string[] myargs1= new string[]{
 "test1", // 输入表
 "test2", // 输出表
 "0",  // 版本大小数,如果值为0,则为默认从输入表导出最新的数据到输出表
 "-1", // 列导入设置,如果为-1 ,则没有设置列导入
 "-1" // 列导出设置,如果为-1,则没有设置列导出
 }; 
 toolrunner.run(hbasedriver.getconfiguration(), 
 new hbasedriver(),
 myargs1);
 // 无版本设置、有列导入设置,无列导出设置
 string[] myargs2= new string[]{
 "test1",
 "test3",
 "0",
 "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
 "-1"
 };
 toolrunner.run(hbasedriver.getconfiguration(), 
 new hbasedriver(),
 myargs2);
 // 无版本设置,无列导入设置,有列导出设置
 string[] myargs3= new string[]{
 "test1",
 "test4",
 "0",
 "-1",
 "cf1:c1,cf1:c10,cf1:c14"
 };
 toolrunner.run(hbasedriver.getconfiguration(), 
 new hbasedriver(),
 myargs3);
 // 有版本设置,无列导入设置,无列导出设置
 string[] myargs4= new string[]{
 "test1",
 "test5",
 "2",
 "-1",
 "-1"
 };
 toolrunner.run(hbasedriver.getconfiguration(), 
 new hbasedriver(),
 myargs4);
 // 有版本设置、有列导入设置,无列导出设置
 string[] myargs5= new string[]{
 "test1",
 "test6",
 "2",
 "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
 "-1"
 };
 toolrunner.run(hbasedriver.getconfiguration(), 
 new hbasedriver(),
 myargs5);
 
 // 有版本设置、无列导入设置,有列导出设置
 string[] myargs6= new string[]{
 "test1",
 "test7",
 "2",
 "-1",
 "cf1:c1,cf1:c10,cf1:c14"
 };
 toolrunner.run(hbasedriver.getconfiguration(), 
 new hbasedriver(),
 myargs6);
 // 有版本设置、有列导入设置,有列导出设置
 string[] myargs7= new string[]{
 "test1",
 "test8",
 "2",
 "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
 "cf1:c1,cf1:c10,cf1:c14"
 };
 toolrunner.run(hbasedriver.getconfiguration(), 
 new hbasedriver(),
 myargs7);
 }
 
}

driver:

package generalhbasetohbase;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.conf.configured;
import org.apache.hadoop.hbase.client.put;
import org.apache.hadoop.hbase.client.scan;
import org.apache.hadoop.hbase.io.immutablebyteswritable;
import org.apache.hadoop.hbase.mapreduce.tablemapreduceutil;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.util.tool;
import util.jarutil;
 
 
public class hbasedriver extends configured implements tool{
 public static string fromtable=""; //导入表
 public static string totable=""; //导出表
 public static string setversion=""; //是否设置版本
 // args => {fromtable,totable,setversion,columnfromtable,columntotable}
 @override
 public int run(string[] args) throws exception {
 if(args.length!=5){
 system.err.println("usage:\n demo.job.hbasedriver <input> <inputtable> "
  + "<output> <outputtable>"
  +"< versions >"
  + " <set columns from inputtable> like <cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14> or <-1> "
  + "<set columns from outputtable> like <cf1:c1,cf1:c10,cf1:c14> or <-1>");
 return -1;
 }
 configuration conf = getconf();
 fromtable = args[0];
 totable = args[1];
 setversion = args[2];
 conf.set("setversion", setversion);
 if(!args[3].equals("-1")){
 conf.set("columnfromtable", args[3]);
 }
 if(!args[4].equals("-1")){
 conf.set("columntotable", args[4]);
 }
 string jobname ="from table "+fromtable+ " ,import to "+ totable;
 job job = job.getinstance(conf, jobname);
 job.setjarbyclass(hbasedriver.class);
 scan scan = new scan();
 // 判断是否需要设置版本
 if(setversion != "0" || setversion != "1"){
 scan.setmaxversions(integer.parseint(setversion));
 }
 // 设置hbase表输入:表名、scan、mapper类、mapper输出键类型、mapper输出值类型
 tablemapreduceutil.inittablemapperjob(
 fromtable, 
 scan, 
 hbasetohbasemapper.class, 
 immutablebyteswritable.class, 
 put.class, 
 job);
 // 设置hbase表输出:表名,reducer类
 tablemapreduceutil.inittablereducerjob(totable, null, job);
 // 没有 reducers, 直接写入到 输出文件
  job.setnumreducetasks(0);
 
  return job.waitforcompletion(true) ? 0 : 1;
  
 }
 private static configuration configuration;
 public static configuration getconfiguration(){
 if(configuration==null){
 /**
 * todo 了解如何直接从windows提交代码到hadoop集群
 *  并修改其中的配置为实际配置
 */
 configuration = new configuration();
 configuration.setboolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平台提交任务
 configuration.set("fs.defaultfs", "hdfs://master:8020");// 指定namenode
 configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架
 configuration.set("yarn.resourcemanager.address", "master:8032"); // 指定resourcemanager
 configuration.set("yarn.resourcemanager.scheduler.address", "master:8030");// 指定资源分配器
 configuration.set("mapreduce.jobhistory.address", "master:10020");// 指定historyserver
 configuration.set("hbase.master", "master:16000");
 configuration.set("hbase.rootdir", "hdfs://master:8020/hbase");
 configuration.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");
 configuration.set("hbase.zookeeper.property.clientport", "2181");
 //todo 需export->jar file ; 设置正确的jar包所在位置
 configuration.set("mapreduce.job.jar",jarutil.jar(hbasedriver.class));// 设置jar包路径
 }
 
 return configuration;
 }
 
 
}

mapper:

package generalhbasetohbase;
import java.io.ioexception;
import java.util.arraylist;
import java.util.hashmap;
import java.util.hashset;
import java.util.map.entry;
import java.util.navigablemap;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.hbase.cell;
import org.apache.hadoop.hbase.keyvalue;
import org.apache.hadoop.hbase.client.put;
import org.apache.hadoop.hbase.client.result;
import org.apache.hadoop.hbase.io.immutablebyteswritable;
import org.apache.hadoop.hbase.mapreduce.tablemapper;
import org.apache.hadoop.hbase.util.bytes;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
public class hbasetohbasemapper extends tablemapper<immutablebyteswritable, put> {
 logger log = loggerfactory.getlogger(hbasetohbasemapper.class);
 private static int versionnum = 0;
 private static string[] columnfromtable = null;
 private static string[] columntotable = null;
 private static string column1 = null;
 private static string column2 = null;
 @override
 protected void setup(context context)
 throws ioexception, interruptedexception {
 configuration conf = context.getconfiguration();
 versionnum = integer.parseint(conf.get("setversion", "0"));
 column1 = conf.get("columnfromtable",null);
 if(!(column1 == null)){
 columnfromtable = column1.split(",");
 }
 column2 = conf.get("columntotable",null); 
 if(!(column2 == null)){
 columntotable = column2.split(",");
 }
 }
 @override
 protected void map(immutablebyteswritable key, result value,
 context context)
 throws ioexception, interruptedexception {
 context.write(key, resulttoput(key,value));
 } 
 /***
 * 把key,value转换为put
 * @param key
 * @param value
 * @return
 * @throws ioexception
 */
 private put resulttoput(immutablebyteswritable key, result value) throws ioexception {
 hashmap<string, string> ftablemap = new hashmap<>();
 hashmap<string, string> ttablemap = new hashmap<>();
 put put = new put(key.get());
 if(! (columnfromtable == null || columnfromtable.length == 0)){
 ftablemap = getfamilyandcolumn(columnfromtable);
 }
 if(! (columntotable == null || columntotable.length == 0)){
 ttablemap = getfamilyandcolumn(columntotable);
 }
 if(versionnum==0){      
 if(ftablemap.size() == 0){   
 if(ttablemap.size() == 0){ 
  for (cell kv : value.rawcells()) {
  put.add(kv); // 没有设置版本,没有设置列导入,没有设置列导出
  }
  return put;
 } else{
  return getput(put, value, ttablemap); // 无版本、无列导入、有列导出
 }
 } else {
 if(ttablemap.size() == 0){
  return getput(put, value, ftablemap);// 无版本、有列导入、无列导出
 } else {
  return getput(put, value, ttablemap);// 无版本、有列导入、有列导出
 }
 }
 } else{
 if(ftablemap.size() == 0){
 if(ttablemap.size() == 0){
  return getput1(put, value); // 有版本,无列导入,无列导出
 }else{
  return getput2(put, value, ttablemap); //有版本,无列导入,有列导出
 }
 }else{
 if(ttablemap.size() == 0){
  return getput2(put,value,ftablemap);// 有版本,有列导入,无列导出
 }else{
  return getput2(put,value,ttablemap); // 有版本,有列导入,有列导出
 }
 }
 }
 }
 /***
 * 无版本设置的情况下,对于有列导入或者列导出
 * @param put
 * @param value
 * @param tablemap
 * @return
 * @throws ioexception
 */
 
 private put getput(put put,result value,hashmap<string, string> tablemap) throws ioexception{
 for(cell kv : value.rawcells()){
 byte[] family = kv.getfamily();
 if(tablemap.containskey(new string(family))){
 string columnstr = tablemap.get(new string(family));
 arraylist<string> columnby = tobyte(columnstr);
 if(columnby.contains(new string(kv.getqualifier()))){
  put.add(kv); //没有设置版本,没有设置列导入,有设置列导出
 }
 }
 }
 return put;
 }
 /***
 * (有版本,无列导入,有列导出)或者(有版本,有列导入,无列导出)
 * @param put
 * @param value
 * @param ttablemap
 * @return
 */
 private put getput2(put put,result value,hashmap<string, string> tablemap){
 navigablemap<byte[], navigablemap<byte[], navigablemap<long, byte[]>>> map=value.getmap();
  for(byte[] family:map.keyset()){
   if(tablemap.containskey(new string(family))){
   string columnstr = tablemap.get(new string(family));
   log.info("@@@@@@@@@@@"+new string(family)+" "+columnstr);
 arraylist<string> columnby = tobyte(columnstr);
   navigablemap<byte[], navigablemap<long, byte[]>> familymap = map.get(family);//列簇作为key获取其中的列相关数据
    for(byte[] column:familymap.keyset()){        //根据列名循坏
     log.info("!!!!!!!!!!!"+new string(column));
     if(columnby.contains(new string(column))){
     navigablemap<long, byte[]> valuesmap = familymap.get(column);
      for(entry<long, byte[]> s:valuesmap.entryset()){//获取列对应的不同版本数据,默认最新的一个
      system.out.println("***:"+new string(family)+" "+new string(column)+" "+s.getkey()+" "+new string(s.getvalue()));
      put.addcolumn(family, column, s.getkey(),s.getvalue());
      }
     }
    }
   }
   
  }
 return put; 
 }
 /***
 * 有版本、无列导入、无列导出
 * @param put
 * @param value
 * @return
 */
 private put getput1(put put,result value){
 navigablemap<byte[], navigablemap<byte[], navigablemap<long, byte[]>>> map=value.getmap();
  for(byte[] family:map.keyset()){ 
   navigablemap<byte[], navigablemap<long, byte[]>> familymap = map.get(family);//列簇作为key获取其中的列相关数据
   for(byte[] column:familymap.keyset()){        //根据列名循坏
    navigablemap<long, byte[]> valuesmap = familymap.get(column);
    for(entry<long, byte[]> s:valuesmap.entryset()){    //获取列对应的不同版本数据,默认最新的一个
     put.addcolumn(family, column, s.getkey(),s.getvalue());
    }
   }
  }
  return put;
 }
 // str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}
 /***
 * 得到列簇名与列名的k,v形式的map
 * @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}
 * @return map => {"cf1" => "c1,c2,c10,c11,c14"}
 */
 private static hashmap<string, string> getfamilyandcolumn(string[] str){
 hashmap<string, string> map = new hashmap<>();
 hashset<string> set = new hashset<>();
 for(string s : str){
 set.add(s.split(":")[0]);
 }
 object[] ob = set.toarray();
 for(int i=0; i<ob.length;i++){
 string family = string.valueof(ob[i]);
 string columns = "";
 for(int j=0;j < str.length;j++){
 if(family.equals(str[j].split(":")[0])){
  columns += str[j].split(":")[1]+",";
 }
 }
 map.put(family, columns.substring(0, columns.length()-1));
 }
 return map; 
 }
 
 private static arraylist<string> tobyte(string s){
 arraylist<string> b = new arraylist<>();
 string[] sarr = s.split(",");
 for(int i=0;i<sarr.length;i++){
 b.add(sarr[i]);
 }
 return b;
 }
}

程序运行完之后,在hbase shell中查看每个表,看是否数据导入正确:

test2:(无版本、无列导入设置、无列导出设置)

test3 (无版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)

test4(无版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

test5(有版本、无列导入设置、无列导出设置)

test6(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)

test7(有版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

test8(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网