当前位置: 移动技术网 > IT编程>数据库>其他数据库 > Spark连接HBase

Spark连接HBase

2018年02月28日  | 移动技术网IT编程  | 我要评论
(一)、Spark读取HBase中的数据 hbase中的数据 (二)、Spark写HBase 1.第一种方式: 2.第二种方式: ...

(一)、Spark读取HBase中的数据

hbase中的数据

 

 1 import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
 2 import org.apache.hadoop.hbase.client.HBaseAdmin
 3 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 4 import org.apache.spark._
 5 import org.apache.hadoop.hbase.util.Bytes
 6 
 7 /**
 8   * Created by *** on 2018/2/12.
 9   *
10   * 从hbase读取数据转化成RDD
11   */
12 object SparkReadHBase {
13 
14   def main(args: Array[String]): Unit = {
15     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
16     val sc = new SparkContext(sparkConf)
17 
18     val tablename = "account"
19     val conf = HBaseConfiguration.create()
20     //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
21     conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22     //设置zookeeper连接端口,默认2181
23     conf.set("hbase.zookeeper.property.clientPort", "2181")
24     conf.set(TableInputFormat.INPUT_TABLE, tablename)
25 
26     // 如果表不存在则创建表
27     val admin = new HBaseAdmin(conf)
28     if (!admin.isTableAvailable(tablename)) {
29       val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
30       admin.createTable(tableDesc)
31     }
32 
33     //读取数据并转化成rdd
34     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
35       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
36       classOf[org.apache.hadoop.hbase.client.Result])
37 
38     val count = hBaseRDD.count()
39     println(count)
40     hBaseRDD.foreach{case (_,result) =>{
41       //获取行键
42       val key = Bytes.toString(result.getRow)
43       //通过列族和列名获取列
44       val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
45       val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
46       println("Row key:"+key+" Name:"+name+" Age:"+age)
47     }}
48 
49     sc.stop()
50     admin.close()
51   }
52 
53 }

(二)、Spark写HBase

  1.第一种方式:

 1 import org.apache.hadoop.hbase.HBaseConfiguration
 2 import org.apache.hadoop.hbase.client.Put
 3 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 4 import org.apache.hadoop.hbase.mapred.TableOutputFormat
 5 import org.apache.hadoop.hbase.util.Bytes
 6 import org.apache.hadoop.mapred.JobConf
 7 import org.apache.spark.{SparkConf, SparkContext}
 8 import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
 9 /**
10   * Created by *** on 2018/2/12.
11   *
12   * 使用saveAsHadoopDataset写入数据
13   */
14 object SparkWriteHBaseOne {
15   def main(args: Array[String]): Unit = {
16     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
17     val sc = new SparkContext(sparkConf)
18 
19     val conf = HBaseConfiguration.create()
20     //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
21     conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22     //设置zookeeper连接端口,默认2181
23     conf.set("hbase.zookeeper.property.clientPort", "2181")
24 
25     val tablename = "account"
26 
27     //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
28     val jobConf = new JobConf(conf)
29     jobConf.setOutputFormat(classOf[TableOutputFormat])
30     jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
31 
32     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
33 
34 
35     val rdd = indataRDD.map(_.split(',')).map{arr=>{
36       /*一个Put对象就是一行记录,在构造方法中指定主键
37        * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
38        * Put.add方法接收三个参数:列族,列名,数据
39        */
40       val put = new Put(Bytes.toBytes(arr(0).toInt))
41       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
42       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
43       //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
44       (new ImmutableBytesWritable, put)
45     }}
46 
47     rdd.saveAsHadoopDataset(jobConf)
48 
49     sc.stop()
50   }
51 }

  2.第二种方式:

 1 import org.apache.hadoop.hbase.client.{Put, Result}
 2 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 3 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
 4 import org.apache.hadoop.hbase.util.Bytes
 5 import org.apache.hadoop.mapreduce.Job
 6 import org.apache.spark._
 7 /**
 8   * Created by *** on 2018/2/12.
 9   *
10   * 使用saveAsNewAPIHadoopDataset写入数据
11   */
12 object SparkWriteHBaseTwo {
13   def main(args: Array[String]): Unit = {
14     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
15     val sc = new SparkContext(sparkConf)
16 
17     val tablename = "account"
18 
19     sc.hadoopConfiguration.set("hbase.zookeeper.quorum","node02,node03,node04")
20     sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
21     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
22 
23     val job = new Job(sc.hadoopConfiguration)
24     job.setOutputKeyClass(classOf[ImmutableBytesWritable])
25     job.setOutputValueClass(classOf[Result])
26     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
27 
28     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
29     val rdd = indataRDD.map(_.split(',')).map{arr=>{
30       val put = new Put(Bytes.toBytes(arr(0)))
31       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
32       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
33       (new ImmutableBytesWritable, put)
34     }}
35 
36     rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
37   }
38 }

 

如您对本文有疑问或者有任何想说的,请 点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网