flink提供三层api。 每个api在简洁性和表达性之间提供不同的权衡,并针对不同的用例。
而且flink提供不同级别的抽象来开发流/批处理应用程序
apache flink具有两个关系型api
用于统一流和批处理
table api是scala和java语言集成查询api,可以非常直观的方式组合来自关系算子的查询(e.g. 选择,过滤和连接).
flink的sql支持基于实现sql标准的apache calcite。无论输入是批输入(dataset)还是流输入(datastream),任一接口中指定的查询都具有相同的语义并指定相同的结果。
table api和sql接口彼此紧密集成,就如flink的datastream和dataset api。我们可以轻松地在基于api构建的所有api和库之间切换。例如,可以使用cep库从datastream中提取模式,然后使用 table api分析模式,或者可以在预处理上运行gelly图算法之前使用sql查询扫描,过滤和聚合批处理表数据。
table api和sql尚未完成并且正在积极开发中。并非 table api,sql和stream,batch输入的每种组合都支持所有算子操作
所有table api和sql组件都捆绑在flink-table maven工件中。
以下依赖项与大多数项目相关:
必须将以下依赖项添加到项目中才能使用table api和sql来定义管道:
<dependency> <groupid>org.apache.flink</groupid> <artifactid>flink-table-planner_2.11</artifactid> <version>1.8.0</version> </dependency>
此外,根据目标编程语言,您需要添加java或scala api。
<!-- either... --> <dependency> <groupid>org.apache.flink</groupid> <artifactid>flink-table-api-java-bridge_2.11</artifactid> <version>1.8.0</version> </dependency> <!-- or... --> <dependency> <groupid>org.apache.flink</groupid> <artifactid>flink-table-api-scala-bridge_2.11</artifactid> <version>1.8.0</version> </dependency>
在内部,表生态系统的一部分是在scala中实现的。 因此,请确保为批处理和流应用程序添加以下依赖项:
<dependency> <groupid>org.apache.flink</groupid> <artifactid>flink-streaming-scala_2.11</artifactid> <version>1.8.0</version> </dependency>
如果要实现与kafka或一组用户定义函数交互的自定义格式,以下依赖关系就足够了,可用于sql客户端的jar文件:
<dependency> <groupid>org.apache.flink</groupid> <artifactid>flink-table-common</artifactid> <version>1.8.0</version> </dependency>
目前,该模块包括以下扩展点:
table api和sql集成在一个联合api中。此api的核心概念是table用作查询的输入和输出。本文档显示了具有 table api和sql查询的程序的常见结构,如何注册table,如何查询table以及如何发出table。
批处理和流式传输的所有 table api和sql程序都遵循相同的模式。以下代码示例显示了 table api和sql程序的常见结构。
// 对于批处理程序,使用executionenvironment而不是streamexecutionenvironment streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment(); // 创建一个tableenvironment // 对于批处理程序使用batchtableenvironment而不是streamtableenvironment streamtableenvironment tableenv = streamtableenvironment.create(env); // 注册一个 table tableenv.registertable("table1", ...) // 或者 tableenv.registertablesource("table2", ...); // 或者 tableenv.registerexternalcatalog("extcat", ...); // 注册一个输出 table tableenv.registertablesink("outputtable", ...); / 从 table api query 创建一个table table tapiresult = tableenv.scan("table1").select(...); // 从 sql query 创建一个table table sqlresult = tableenv.sqlquery("select ... from table2 ... "); // 将表api结果表发送到tablesink,对于sql结果也是如此 tapiresult.insertinto("outputtable"); // 执行 env.execute();
它也可以直接转换为a 而不是注册a datastream或datasetin 。如果要在 table api查询中使用table,这很方便。tableenvironmenttable
// 获取streamtableenvironment //在batchtableenvironment中注册dataset是等效的 streamtableenvironment tableenv = streamtableenvironment.create(env); datastream<tuple2<long, string>> stream = ... // 将datastream转换为默认字段为“f0”,“f1”的表 table table1 = tableenv.fromdatastream(stream); // 将datastream转换为包含字段“mylong”,“mystring”的表 table table2 = tableenv.fromdatastream(stream, "mylong, mystring");
还不完善,等日后flink该模块开发完毕再深入研究!
如对本文有疑问, 点击进行留言回复!!
HBase Filter 过滤器之FamilyFilter详解
去 HBase,Kylin on Parquet 性能表现如何?
如何找到Hive提交的SQL相对应的Yarn程序的applicationId
网友评论