当前位置: 移动技术网 > IT编程>数据库>其他数据库 > HBase 系列(七)——HBase 过滤器详解

HBase 系列(七)——HBase 过滤器详解

2019年08月26日  | 移动技术网IT编程  | 我要评论

一、hbase过滤器简介

hbase 提供了种类丰富的过滤器(filter)来提高数据处理的效率,用户可以通过内置或自定义的过滤器来对数据进行过滤,所有的过滤器都在服务端生效,即谓词下推(predicate push down)。这样可以保证过滤掉的数据不会被传送到客户端,从而减轻网络传输和客户端处理的压力。

二、过滤器基础

2.1 filter接口和filterbase抽象类

filter 接口中定义了过滤器的基本方法,filterbase 抽象类实现了 filter 接口。所有内置的过滤器则直接或者间接继承自 filterbase 抽象类。用户只需要将定义好的过滤器通过 setfilter 方法传递给 scanput 的实例即可。

setfilter(filter filter)
 // scan 中定义的 setfilter
 @override
  public scan setfilter(filter filter) {
    super.setfilter(filter);
    return this;
  }
  // get 中定义的 setfilter
 @override
  public get setfilter(filter filter) {
    super.setfilter(filter);
    return this;
  }

filterbase 的所有子类过滤器如下:

说明:上图基于当前时间点(2019.4)最新的 hbase-2.1.4 ,下文所有说明均基于此版本。

2.2 过滤器分类

hbase 内置过滤器可以分为三类:分别是比较过滤器,专用过滤器和包装过滤器。分别在下面的三个小节中做详细的介绍。

三、比较过滤器

所有比较过滤器均继承自 comparefilter。创建一个比较过滤器需要两个参数,分别是比较运算符比较器实例

 public comparefilter(final compareop compareop,final bytearraycomparable comparator) {
    this.compareop = compareop;
    this.comparator = comparator;
  }

3.1 比较运算符

  • less (<)
  • less_or_equal (<=)
  • equal (=)
  • not_equal (!=)
  • greater_or_equal (>=)
  • greater (>)
  • no_op (排除所有符合条件的值)

比较运算符均定义在枚举类 compareoperator

@interfaceaudience.public
public enum compareoperator {
  less,
  less_or_equal,
  equal,
  not_equal,
  greater_or_equal,
  greater,
  no_op,
}

注意:在 1.x 版本的 hbase 中,比较运算符定义在 comparefilter.compareop 枚举类中,但在 2.0 之后这个类就被标识为 @deprecated ,并会在 3.0 移除。所以 2.0 之后版本的 hbase 需要使用 compareoperator 这个枚举类。

3.2 比较器

所有比较器均继承自 bytearraycomparable 抽象类,常用的有以下几种:

  • binarycomparator : 使用 bytes.compareto(byte [],byte []) 按字典序比较指定的字节数组。
  • binaryprefixcomparator : 按字典序与指定的字节数组进行比较,但只比较到这个字节数组的长度。
  • regexstringcomparator : 使用给定的正则表达式与指定的字节数组进行比较。仅支持 equalnot_equal 操作。
  • substringcomparator : 测试给定的子字符串是否出现在指定的字节数组中,比较不区分大小写。仅支持 equalnot_equal 操作。
  • nullcomparator :判断给定的值是否为空。
  • bitcomparator :按位进行比较。

binaryprefixcomparatorbinarycomparator 的区别不是很好理解,这里举例说明一下:

在进行 equal 的比较时,如果比较器传入的是 abcd 的字节数组,但是待比较数据是 abcdefgh

  • 如果使用的是 binaryprefixcomparator 比较器,则比较以 abcd 字节数组的长度为准,即 efgh 不会参与比较,这时候认为 abcdabcdefgh 是满足 equal 条件的;
  • 如果使用的是 binarycomparator 比较器,则认为其是不相等的。

3.3 比较过滤器种类

比较过滤器共有五个(hbase 1.x 版本和 2.x 版本相同),见下图:

  • rowfilter :基于行键来过滤数据;
  • familyfilterr :基于列族来过滤数据;
  • qualifierfilterr :基于列限定符(列名)来过滤数据;
  • valuefilterr :基于单元格 (cell) 的值来过滤数据;
  • dependentcolumnfilter :指定一个参考列来过滤其他列的过滤器,过滤的原则是基于参考列的时间戳来进行筛选 。

前四种过滤器的使用方法相同,均只要传递比较运算符和运算器实例即可构建,然后通过 setfilter 方法传递给 scan

 filter filter  = new rowfilter(compareoperator.less_or_equal,
                                new binarycomparator(bytes.tobytes("xxx")));
  scan.setfilter(filter);    

dependentcolumnfilter 的使用稍微复杂一点,这里单独做下说明。

3.4 dependentcolumnfilter

可以把 dependentcolumnfilter 理解为一个 valuefilter 和一个时间戳过滤器的组合dependentcolumnfilter 有三个带参构造器,这里选择一个参数最全的进行说明:

dependentcolumnfilter(final byte [] family, final byte[] qualifier,
                               final boolean dropdependentcolumn, final compareoperator op,
                               final bytearraycomparable valuecomparator)
  • family :列族
  • qualifier :列限定符(列名)
  • dropdependentcolumn :决定参考列是否被包含在返回结果内,为 true 时表示参考列被返回,为 false 时表示被丢弃
  • op :比较运算符
  • valuecomparator :比较器

这里举例进行说明:

dependentcolumnfilter dependentcolumnfilter = new dependentcolumnfilter( 
    bytes.tobytes("student"),
    bytes.tobytes("name"),
    false,
    compareoperator.equal, 
    new binaryprefixcomparator(bytes.tobytes("xiaolan")));
  • 首先会去查找 student:name 中值以 xiaolan 开头的所有数据获得 参考数据集,这一步等同于 valuefilter 过滤器;

  • 其次再用参考数据集中所有数据的时间戳去检索其他列,获得时间戳相同的其他列的数据作为 结果数据集,这一步等同于时间戳过滤器;

  • 最后如果 dropdependentcolumn 为 true,则返回 参考数据集+结果数据集,若为 false,则抛弃参考数据集,只返回 结果数据集

四、专用过滤器

专用过滤器通常直接继承自 filterbase,适用于范围更小的筛选规则。

4.1 单列列值过滤器 (singlecolumnvaluefilter)

基于某列(参考列)的值决定某行数据是否被过滤。其实例有以下方法:

  • setfilterifmissing(boolean filterifmissing) :默认值为 false,即如果该行数据不包含参考列,其依然被包含在最后的结果中;设置为 true 时,则不包含;
  • setlatestversiononly(boolean latestversiononly) :默认为 true,即只检索参考列的最新版本数据;设置为 false,则检索所有版本数据。
singlecolumnvaluefilter singlecolumnvaluefilter = new singlecolumnvaluefilter(
                "student".getbytes(), 
                "name".getbytes(), 
                compareoperator.equal, 
                new substringcomparator("xiaolan"));
singlecolumnvaluefilter.setfilterifmissing(true);
scan.setfilter(singlecolumnvaluefilter);

4.2 单列列值排除器 (singlecolumnvalueexcludefilter)

singlecolumnvalueexcludefilter 继承自上面的 singlecolumnvaluefilter,过滤行为与其相反。

4.3 行键前缀过滤器 (prefixfilter)

基于 rowkey 值决定某行数据是否被过滤。

prefixfilter prefixfilter = new prefixfilter(bytes.tobytes("xxx"));
scan.setfilter(prefixfilter);

4.4 列名前缀过滤器 (columnprefixfilter)

基于列限定符(列名)决定某行数据是否被过滤。

columnprefixfilter columnprefixfilter = new columnprefixfilter(bytes.tobytes("xxx"));
 scan.setfilter(columnprefixfilter);

4.5 分页过滤器 (pagefilter)

可以使用这个过滤器实现对结果按行进行分页,创建 pagefilter 实例的时候需要传入每页的行数。

public pagefilter(final long pagesize) {
    preconditions.checkargument(pagesize >= 0, "must be positive %s", pagesize);
    this.pagesize = pagesize;
  }

下面的代码体现了客户端实现分页查询的主要逻辑,这里对其进行一下解释说明:

客户端进行分页查询,需要传递 startrow(起始 rowkey),知道起始 startrow 后,就可以返回对应的 pagesize 行数据。这里唯一的问题就是,对于第一次查询,显然 startrow 就是表格的第一行数据,但是之后第二次、第三次查询我们并不知道 startrow,只能知道上一次查询的最后一条数据的 rowkey(简单称之为 lastrow)。

我们不能将 lastrow 作为新一次查询的 startrow 传入,因为 scan 的查询区间是[startrow,endrow) ,即前开后闭区间,这样 startrow 在新的查询也会被返回,这条数据就重复了。

同时在不使用第三方数据库存储 rowkey 的情况下,我们是无法通过知道 lastrow 的下一个 rowkey 的,因为 rowkey 的设计可能是连续的也有可能是不连续的。

由于 hbase 的 rowkey 是按照字典序进行排序的。这种情况下,就可以在 lastrow 后面加上 0 ,作为 startrow 传入,因为按照字典序的规则,某个值加上 0 后的新值,在字典序上一定是这个值的下一个值,对于 hbase 来说下一个 rowkey 在字典序上一定也是等于或者大于这个新值的。

所以最后传入 lastrow+0,如果等于这个值的 rowkey 存在就从这个值开始 scan,否则从字典序的下一个 rowkey 开始 scan。

25 个字母以及数字字符,字典排序如下:

'0' < '1' < '2' < ... < '9' < 'a' < 'b' < ... < 'z'

分页查询主要实现逻辑:

byte[] postfix = new byte[] { 0x00 };
filter filter = new pagefilter(15);

int totalrows = 0;
byte[] lastrow = null;
while (true) {
    scan scan = new scan();
    scan.setfilter(filter);
    if (lastrow != null) {
        // 如果不是首行 则 lastrow + 0
        byte[] startrow = bytes.add(lastrow, postfix);
        system.out.println("start row: " +
                           bytes.tostringbinary(startrow));
        scan.withstartrow(startrow);
    }
    resultscanner scanner = table.getscanner(scan);
    int localrows = 0;
    result result;
    while ((result = scanner.next()) != null) {
        system.out.println(localrows++ + ": " + result);
        totalrows++;
        lastrow = result.getrow();
    }
    scanner.close();
    //最后一页,查询结束  
    if (localrows == 0) break;
}
system.out.println("total rows: " + totalrows);

需要注意的是在多台 regin services 上执行分页过滤的时候,由于并行执行的过滤器不能共享它们的状态和边界,所以有可能每个过滤器都会在完成扫描前获取了 pagecount 行的结果,这种情况下会返回比分页条数更多的数据,分页过滤器就有失效的可能。

4.6 时间戳过滤器 (timestampsfilter)

list<long> list = new arraylist<>();
list.add(1554975573000l);
timestampsfilter timestampsfilter = new timestampsfilter(list);
scan.setfilter(timestampsfilter);

4.7 首次行键过滤器 (firstkeyonlyfilter)

firstkeyonlyfilter 只扫描每行的第一列,扫描完第一列后就结束对当前行的扫描,并跳转到下一行。相比于全表扫描,其性能更好,通常用于行数统计的场景,因为如果某一行存在,则行中必然至少有一列。

firstkeyonlyfilter firstkeyonlyfilter = new firstkeyonlyfilter();
scan.set(firstkeyonlyfilter);

五、包装过滤器

包装过滤器就是通过包装其他过滤器以实现某些拓展的功能。

5.1 skipfilter过滤器

skipfilter 包装一个过滤器,当被包装的过滤器遇到一个需要过滤的 keyvalue 实例时,则拓展过滤整行数据。下面是一个使用示例:

// 定义 valuefilter 过滤器
filter filter1 = new valuefilter(compareoperator.not_equal,
      new binarycomparator(bytes.tobytes("xxx")));
// 使用 skipfilter 进行包装
filter filter2 = new skipfilter(filter1);

5.2 whilematchfilter过滤器

whilematchfilter 包装一个过滤器,当被包装的过滤器遇到一个需要过滤的 keyvalue 实例时,whilematchfilter 则结束本次扫描,返回已经扫描到的结果。下面是其使用示例:

filter filter1 = new rowfilter(compareoperator.not_equal,
                               new binarycomparator(bytes.tobytes("rowkey4")));

scan scan = new scan();
scan.setfilter(filter1);
resultscanner scanner1 = table.getscanner(scan);
for (result result : scanner1) {
    for (cell cell : result.listcells()) {
        system.out.println(cell);
    }
}
scanner1.close();

system.out.println("--------------------");

// 使用 whilematchfilter 进行包装
filter filter2 = new whilematchfilter(filter1);

scan.setfilter(filter2);
resultscanner scanner2 = table.getscanner(scan);
for (result result : scanner1) {
    for (cell cell : result.listcells()) {
        system.out.println(cell);
    }
}
scanner2.close();
rowkey0/student:name/1555035006994/put/vlen=8/seqid=0
rowkey1/student:name/1555035007019/put/vlen=8/seqid=0
rowkey2/student:name/1555035007025/put/vlen=8/seqid=0
rowkey3/student:name/1555035007037/put/vlen=8/seqid=0
rowkey5/student:name/1555035007051/put/vlen=8/seqid=0
rowkey6/student:name/1555035007057/put/vlen=8/seqid=0
rowkey7/student:name/1555035007062/put/vlen=8/seqid=0
rowkey8/student:name/1555035007068/put/vlen=8/seqid=0
rowkey9/student:name/1555035007073/put/vlen=8/seqid=0
--------------------
rowkey0/student:name/1555035006994/put/vlen=8/seqid=0
rowkey1/student:name/1555035007019/put/vlen=8/seqid=0
rowkey2/student:name/1555035007025/put/vlen=8/seqid=0
rowkey3/student:name/1555035007037/put/vlen=8/seqid=0

可以看到被包装后,只返回了 rowkey4 之前的数据。

六、filterlist

以上都是讲解单个过滤器的作用,当需要多个过滤器共同作用于一次查询的时候,就需要使用 filterlistfilterlist 支持通过构造器或者 addfilter 方法传入多个过滤器。

// 构造器传入
public filterlist(final operator operator, final list<filter> filters)
public filterlist(final list<filter> filters)
public filterlist(final filter... filters)

// 方法传入
 public void addfilter(list<filter> filters)
 public void addfilter(filter filter)

多个过滤器组合的结果由 operator 参数定义 ,其可选参数定义在 operator 枚举类中。只有 must_pass_allmust_pass_one 两个可选的值:

  • must_pass_all :相当于 and,必须所有的过滤器都通过才认为通过;
  • must_pass_one :相当于 or,只有要一个过滤器通过则认为通过。
@interfaceaudience.public
  public enum operator {
    /** !and */
    must_pass_all,
    /** !or */
    must_pass_one
  }

使用示例如下:

list<filter> filters = new arraylist<filter>();

filter filter1 = new rowfilter(compareoperator.greater_or_equal,
                               new binarycomparator(bytes.tobytes("xxx")));
filters.add(filter1);

filter filter2 = new rowfilter(compareoperator.less_or_equal,
                               new binarycomparator(bytes.tobytes("yyy")));
filters.add(filter2);

filter filter3 = new qualifierfilter(compareoperator.equal,
                                     new regexstringcomparator("zzz"));
filters.add(filter3);

filterlist filterlist = new filterlist(filters);

scan scan = new scan();
scan.setfilter(filterlist);

参考资料

hbase: the definitive guide _> chapter 4. client api: advanced features

更多大数据系列文章可以参见 github 开源项目大数据入门指南

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网