hbase 提供了种类丰富的过滤器(filter)来提高数据处理的效率,用户可以通过内置或自定义的过滤器来对数据进行过滤,所有的过滤器都在服务端生效,即谓词下推(predicate push down)。这样可以保证过滤掉的数据不会被传送到客户端,从而减轻网络传输和客户端处理的压力。
filter 接口中定义了过滤器的基本方法,filterbase 抽象类实现了 filter 接口。所有内置的过滤器则直接或者间接继承自 filterbase 抽象类。用户只需要将定义好的过滤器通过 setfilter
方法传递给 scan
或 put
的实例即可。
setfilter(filter filter)
// scan 中定义的 setfilter @override public scan setfilter(filter filter) { super.setfilter(filter); return this; }
// get 中定义的 setfilter @override public get setfilter(filter filter) { super.setfilter(filter); return this; }
filterbase 的所有子类过滤器如下:
说明:上图基于当前时间点(2019.4)最新的 hbase-2.1.4 ,下文所有说明均基于此版本。
hbase 内置过滤器可以分为三类:分别是比较过滤器,专用过滤器和包装过滤器。分别在下面的三个小节中做详细的介绍。
所有比较过滤器均继承自 comparefilter
。创建一个比较过滤器需要两个参数,分别是比较运算符和比较器实例。
public comparefilter(final compareop compareop,final bytearraycomparable comparator) { this.compareop = compareop; this.comparator = comparator; }
比较运算符均定义在枚举类 compareoperator
中
@interfaceaudience.public public enum compareoperator { less, less_or_equal, equal, not_equal, greater_or_equal, greater, no_op, }
注意:在 1.x 版本的 hbase 中,比较运算符定义在
comparefilter.compareop
枚举类中,但在 2.0 之后这个类就被标识为 @deprecated ,并会在 3.0 移除。所以 2.0 之后版本的 hbase 需要使用compareoperator
这个枚举类。
所有比较器均继承自 bytearraycomparable
抽象类,常用的有以下几种:
bytes.compareto(byte [],byte [])
按字典序比较指定的字节数组。equal
和 not_equal
操作。equal
和 not_equal
操作。binaryprefixcomparator
和 binarycomparator
的区别不是很好理解,这里举例说明一下:
在进行 equal
的比较时,如果比较器传入的是 abcd
的字节数组,但是待比较数据是 abcdefgh
:
binaryprefixcomparator
比较器,则比较以 abcd
字节数组的长度为准,即 efgh
不会参与比较,这时候认为 abcd
与 abcdefgh
是满足 equal
条件的;binarycomparator
比较器,则认为其是不相等的。比较过滤器共有五个(hbase 1.x 版本和 2.x 版本相同),见下图:
前四种过滤器的使用方法相同,均只要传递比较运算符和运算器实例即可构建,然后通过 setfilter
方法传递给 scan
:
filter filter = new rowfilter(compareoperator.less_or_equal, new binarycomparator(bytes.tobytes("xxx"))); scan.setfilter(filter);
dependentcolumnfilter
的使用稍微复杂一点,这里单独做下说明。
可以把 dependentcolumnfilter
理解为一个 valuefilter 和一个时间戳过滤器的组合。dependentcolumnfilter
有三个带参构造器,这里选择一个参数最全的进行说明:
dependentcolumnfilter(final byte [] family, final byte[] qualifier, final boolean dropdependentcolumn, final compareoperator op, final bytearraycomparable valuecomparator)
这里举例进行说明:
dependentcolumnfilter dependentcolumnfilter = new dependentcolumnfilter( bytes.tobytes("student"), bytes.tobytes("name"), false, compareoperator.equal, new binaryprefixcomparator(bytes.tobytes("xiaolan")));
首先会去查找 student:name
中值以 xiaolan
开头的所有数据获得 参考数据集
,这一步等同于 valuefilter 过滤器;
其次再用参考数据集中所有数据的时间戳去检索其他列,获得时间戳相同的其他列的数据作为 结果数据集
,这一步等同于时间戳过滤器;
最后如果 dropdependentcolumn
为 true,则返回 参考数据集
+结果数据集
,若为 false,则抛弃参考数据集,只返回 结果数据集
。
专用过滤器通常直接继承自 filterbase
,适用于范围更小的筛选规则。
基于某列(参考列)的值决定某行数据是否被过滤。其实例有以下方法:
singlecolumnvaluefilter singlecolumnvaluefilter = new singlecolumnvaluefilter( "student".getbytes(), "name".getbytes(), compareoperator.equal, new substringcomparator("xiaolan")); singlecolumnvaluefilter.setfilterifmissing(true); scan.setfilter(singlecolumnvaluefilter);
singlecolumnvalueexcludefilter
继承自上面的 singlecolumnvaluefilter
,过滤行为与其相反。
基于 rowkey 值决定某行数据是否被过滤。
prefixfilter prefixfilter = new prefixfilter(bytes.tobytes("xxx")); scan.setfilter(prefixfilter);
基于列限定符(列名)决定某行数据是否被过滤。
columnprefixfilter columnprefixfilter = new columnprefixfilter(bytes.tobytes("xxx")); scan.setfilter(columnprefixfilter);
可以使用这个过滤器实现对结果按行进行分页,创建 pagefilter 实例的时候需要传入每页的行数。
public pagefilter(final long pagesize) { preconditions.checkargument(pagesize >= 0, "must be positive %s", pagesize); this.pagesize = pagesize; }
下面的代码体现了客户端实现分页查询的主要逻辑,这里对其进行一下解释说明:
客户端进行分页查询,需要传递 startrow
(起始 rowkey),知道起始 startrow
后,就可以返回对应的 pagesize 行数据。这里唯一的问题就是,对于第一次查询,显然 startrow
就是表格的第一行数据,但是之后第二次、第三次查询我们并不知道 startrow
,只能知道上一次查询的最后一条数据的 rowkey(简单称之为 lastrow
)。
我们不能将 lastrow
作为新一次查询的 startrow
传入,因为 scan 的查询区间是[startrow,endrow) ,即前开后闭区间,这样 startrow
在新的查询也会被返回,这条数据就重复了。
同时在不使用第三方数据库存储 rowkey 的情况下,我们是无法通过知道 lastrow
的下一个 rowkey 的,因为 rowkey 的设计可能是连续的也有可能是不连续的。
由于 hbase 的 rowkey 是按照字典序进行排序的。这种情况下,就可以在 lastrow
后面加上 0
,作为 startrow
传入,因为按照字典序的规则,某个值加上 0
后的新值,在字典序上一定是这个值的下一个值,对于 hbase 来说下一个 rowkey 在字典序上一定也是等于或者大于这个新值的。
所以最后传入 lastrow
+0
,如果等于这个值的 rowkey 存在就从这个值开始 scan,否则从字典序的下一个 rowkey 开始 scan。
25 个字母以及数字字符,字典排序如下:
'0' < '1' < '2' < ... < '9' < 'a' < 'b' < ... < 'z'
分页查询主要实现逻辑:
byte[] postfix = new byte[] { 0x00 }; filter filter = new pagefilter(15); int totalrows = 0; byte[] lastrow = null; while (true) { scan scan = new scan(); scan.setfilter(filter); if (lastrow != null) { // 如果不是首行 则 lastrow + 0 byte[] startrow = bytes.add(lastrow, postfix); system.out.println("start row: " + bytes.tostringbinary(startrow)); scan.withstartrow(startrow); } resultscanner scanner = table.getscanner(scan); int localrows = 0; result result; while ((result = scanner.next()) != null) { system.out.println(localrows++ + ": " + result); totalrows++; lastrow = result.getrow(); } scanner.close(); //最后一页,查询结束 if (localrows == 0) break; } system.out.println("total rows: " + totalrows);
需要注意的是在多台 regin services 上执行分页过滤的时候,由于并行执行的过滤器不能共享它们的状态和边界,所以有可能每个过滤器都会在完成扫描前获取了 pagecount 行的结果,这种情况下会返回比分页条数更多的数据,分页过滤器就有失效的可能。
list<long> list = new arraylist<>(); list.add(1554975573000l); timestampsfilter timestampsfilter = new timestampsfilter(list); scan.setfilter(timestampsfilter);
firstkeyonlyfilter
只扫描每行的第一列,扫描完第一列后就结束对当前行的扫描,并跳转到下一行。相比于全表扫描,其性能更好,通常用于行数统计的场景,因为如果某一行存在,则行中必然至少有一列。
firstkeyonlyfilter firstkeyonlyfilter = new firstkeyonlyfilter(); scan.set(firstkeyonlyfilter);
包装过滤器就是通过包装其他过滤器以实现某些拓展的功能。
skipfilter
包装一个过滤器,当被包装的过滤器遇到一个需要过滤的 keyvalue 实例时,则拓展过滤整行数据。下面是一个使用示例:
// 定义 valuefilter 过滤器 filter filter1 = new valuefilter(compareoperator.not_equal, new binarycomparator(bytes.tobytes("xxx"))); // 使用 skipfilter 进行包装 filter filter2 = new skipfilter(filter1);
whilematchfilter
包装一个过滤器,当被包装的过滤器遇到一个需要过滤的 keyvalue 实例时,whilematchfilter
则结束本次扫描,返回已经扫描到的结果。下面是其使用示例:
filter filter1 = new rowfilter(compareoperator.not_equal, new binarycomparator(bytes.tobytes("rowkey4"))); scan scan = new scan(); scan.setfilter(filter1); resultscanner scanner1 = table.getscanner(scan); for (result result : scanner1) { for (cell cell : result.listcells()) { system.out.println(cell); } } scanner1.close(); system.out.println("--------------------"); // 使用 whilematchfilter 进行包装 filter filter2 = new whilematchfilter(filter1); scan.setfilter(filter2); resultscanner scanner2 = table.getscanner(scan); for (result result : scanner1) { for (cell cell : result.listcells()) { system.out.println(cell); } } scanner2.close();
rowkey0/student:name/1555035006994/put/vlen=8/seqid=0 rowkey1/student:name/1555035007019/put/vlen=8/seqid=0 rowkey2/student:name/1555035007025/put/vlen=8/seqid=0 rowkey3/student:name/1555035007037/put/vlen=8/seqid=0 rowkey5/student:name/1555035007051/put/vlen=8/seqid=0 rowkey6/student:name/1555035007057/put/vlen=8/seqid=0 rowkey7/student:name/1555035007062/put/vlen=8/seqid=0 rowkey8/student:name/1555035007068/put/vlen=8/seqid=0 rowkey9/student:name/1555035007073/put/vlen=8/seqid=0 -------------------- rowkey0/student:name/1555035006994/put/vlen=8/seqid=0 rowkey1/student:name/1555035007019/put/vlen=8/seqid=0 rowkey2/student:name/1555035007025/put/vlen=8/seqid=0 rowkey3/student:name/1555035007037/put/vlen=8/seqid=0
可以看到被包装后,只返回了 rowkey4
之前的数据。
以上都是讲解单个过滤器的作用,当需要多个过滤器共同作用于一次查询的时候,就需要使用 filterlist
。filterlist
支持通过构造器或者 addfilter
方法传入多个过滤器。
// 构造器传入 public filterlist(final operator operator, final list<filter> filters) public filterlist(final list<filter> filters) public filterlist(final filter... filters) // 方法传入 public void addfilter(list<filter> filters) public void addfilter(filter filter)
多个过滤器组合的结果由 operator
参数定义 ,其可选参数定义在 operator
枚举类中。只有 must_pass_all
和 must_pass_one
两个可选的值:
@interfaceaudience.public public enum operator { /** !and */ must_pass_all, /** !or */ must_pass_one }
使用示例如下:
list<filter> filters = new arraylist<filter>(); filter filter1 = new rowfilter(compareoperator.greater_or_equal, new binarycomparator(bytes.tobytes("xxx"))); filters.add(filter1); filter filter2 = new rowfilter(compareoperator.less_or_equal, new binarycomparator(bytes.tobytes("yyy"))); filters.add(filter2); filter filter3 = new qualifierfilter(compareoperator.equal, new regexstringcomparator("zzz")); filters.add(filter3); filterlist filterlist = new filterlist(filters); scan scan = new scan(); scan.setfilter(filterlist);
hbase: the definitive guide _> chapter 4. client api: advanced features
更多大数据系列文章可以参见 github 开源项目: 大数据入门指南
如对本文有疑问, 点击进行留言回复!!
HBase Filter 过滤器之FamilyFilter详解
去 HBase,Kylin on Parquet 性能表现如何?
如何找到Hive提交的SQL相对应的Yarn程序的applicationId
网友评论