phoenix作为查询引擎,为了提高查询效率,为phoenix表创建了二级索引,而数据是sparkstreaming通过hbase api直接向hbase插数据。那么问题来了,对于phoenix的二级索引,直接插入底层hbase的源表,不会引起二级索引的更新,从而导致phoenix索引数据和hbase源表数据不一致。而对于spark+phoenix的写入方式,官方有文档说明,但是有版本限制,以下是官方原文:
note that for phoenix versions 4.7 and 4.8 you must use the ‘phoenix-<version>-client-spark.jar’. as of phoenix 4.10, the ‘phoenix-<version>-client.jar’ is compiled against spark 2.x. if compability with spark 1.x if needed, you must compile phoenix with the spark16 maven profile.
所以只能考虑用jdbc的方式做。
我使用的版本信息:
jar包引入:
<dependency> <groupid>org.apache.phoenix</groupid> <artifactid>phoenix-core</artifactid> <version>4.13.1-hbase-1.2</version> </dependency> <dependency> <groupid>org.apache.phoenix</groupid> <artifactid>phoenix-spark</artifactid> <version>4.13.1-hbase-1.2</version> </dependency>
phoenixutil类:
public class phoenixutil { private static linkedlist<connection> connectionqueue; static { try { class.forname("org.apache.phoenix.jdbc.phoenixdriver"); } catch (classnotfoundexception e) { e.printstacktrace(); } } public synchronized static connection getconnection() throws sqlexception { try { if (connectionqueue == null){ connectionqueue = new linkedlist<connection>(); for (int i = 0;i < 3;i++){ connection conn = drivermanager.getconnection("jdbc:phoenix:hostname:2181"); connectionqueue.push(conn); } } }catch (exception e1){ e1.printstacktrace(); } return connectionqueue.poll(); } public static void returnconnection(connection conn){ connectionqueue.push(conn); }
在sparkstreaming中引入phoenixutil类(由于业务关系,这里使用的是statement):
savelines.foreachrdd(rdd -> { rdd.foreachpartition(p -> { connection conn = phoenixutil.getconnection(); statement stmt = conn.createstatement(); conn.setautocommit(false); //业务逻辑 //sql } stmt.addbatch(sql); } stmt.executebatch(); conn.commit(); stmt.close(); phoenixutil.returnconnection(conn); zkkafkautil.updateoffset(offsetranges, group_id, topic); }); });
最后,如果大家有更好的方式处理这个问题,欢迎指教。
如对本文有疑问, 点击进行留言回复!!
HBase Filter 过滤器之FamilyFilter详解
去 HBase,Kylin on Parquet 性能表现如何?
如何找到Hive提交的SQL相对应的Yarn程序的applicationId
网友评论