当前位置: 移动技术网 > IT编程>数据库>其他数据库 > Flink入门宝典(详细截图版)

Flink入门宝典(详细截图版)

2019年09月19日  | 移动技术网IT编程  | 我要评论

file
本文基于java构建flink1.9版本入门程序,需要maven 3.0.4 和 java 8 以上版本。需要安装netcat进行简单调试。

这里简述安装过程,并使用idea进行开发一个简单流处理程序,本地调试或者提交到flink上运行,maven与jdk安装这里不做说明。

一、flink简介

flink诞生于欧洲的一个大数据研究项目stratosphere。该项目是柏林工业大学的一个研究性项目。早期,flink是做batch计算的,但是在2014年,stratosphere里面的核心成员孵化出flink,同年将flink捐赠apache,并在后来成为apache的顶级大数据项目,同时flink计算的主流方向被定位为streaming,即用流式计算来做所有大数据的计算,这就是flink技术诞生的背景。

2015开始阿里开始介入flink 负责对资源调度和流式sql的优化,成立了阿里内部版本blink在最近更新的1.9版本中,blink开始合并入flink,

未来flink也将支持java,scala,python等更多语言,并在机器学习领域施展拳脚。

二、flink开发环境搭建

首先要想运行flink,我们需要下载并解压flink的二进制包,下载地址如下:https://flink.apache.org/downloads.html

我们可以选择flink与scala结合版本,这里我们选择最新的1.9版本apache flink 1.9.0 for scala 2.12进行下载。

flink在windows和linux下的安装与部署可以查看 flink快速入门--安装与示例运行,这里演示windows版。

安装成功后,启动cmd命令行窗口,进入flink文件夹,运行bin目录下的start-cluster.bat

$ cd flink
$ cd bin
$ start-cluster.bat
starting a local cluster with one jobmanager process and one taskmanager process.
you can terminate the processes via ctrl-c in the spawned shell windows.
web interface by default on http://localhost:8081/.

显示启动成功后,我们在浏览器访问 http://www.lhsxpumps.com/_localhost:8081/可以看到flink的管理页面。

file

三、flink快速体验

请保证安装好了flink,还需要maven 3.0.4 和 java 8 以上版本。这里简述maven构建过程。

其他详细构建方法欢迎查看:快速构建第一个flink工程

1、搭建maven工程

使用flink maven archetype构建一个工程。

 $ mvn archetype:generate                               \
      -darchetypegroupid=org.apache.flink              \
      -darchetypeartifactid=flink-quickstart-java      \
      -darchetypeversion=1.9.0

你可以编辑自己的artifactid groupid

目录结构如下:

$ tree quickstart/
quickstart/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── org
        │       └── myorg
        │           └── quickstart
        │               ├── batchjob.java
        │               └── streamingjob.java
        └── resources
            └── log4j.properties

在pom中核心依赖:

<dependencies>
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-java</artifactid>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-streaming-java_2.11</artifactid>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-clients_2.11</artifactid>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

2、编写代码

streamingjob

import org.apache.flink.api.common.functions.flatmapfunction;
import org.apache.flink.api.java.tuple.tuple2;
import org.apache.flink.streaming.api.datastream.datastream;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.streaming.api.windowing.time.time;
import org.apache.flink.util.collector;
public class streamingjob {

    public static void main(string[] args) throws exception {
        final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
        datastream<tuple2<string, integer>> datastreaming = env
                .sockettextstream("localhost", 9999)
                .flatmap(new splitter())
                .keyby(0)
                .timewindow(time.seconds(5))
                .sum(1);

        datastreaming.print();

        // execute program
        env.execute("flink streaming java api skeleton");
    }
    public static class splitter implements flatmapfunction<string, tuple2<string, integer>> {

        @override
        public void flatmap(string sentence, collector<tuple2<string, integer>> out) throws exception {
            for(string word : sentence.split(" ")){
                out.collect(new tuple2<string, integer>(word, 1));
            }
        }

    }
}

3、调试程序

安装netcat工具进行简单调试。

启动netcat 输入:

nc -l 9999

启动程序

file

在netcat中输入几个单词 逗号分隔

file

在程序一端查看结果

file

启动flink

windows为 start-cluster.bat    linux为start-cluster.sh

localhost:8081查看管理页面

file

通过maven对代码打包

file

将打好的包提交到flink上

file

查看log

tail -f log/flink-***-jobmanager.out

在netcat中继续输入单词,在running jobs中查看作业状态,在log中查看输出。

file

flink提供不同级别的抽象来开发流/批处理应用程序。

file

最低级抽象只提供有状态流

在实践中,大多数应用程序不需要上述低级抽象,而是针对core api编程, 如datastream api(有界/无界流)和dataset api(有界数据集)。

table api声明了一个表,遵循关系模型。

最高级抽象是sql

我们这里只用到了datastream api。

flink程序的基本构建块是转换

一个程序的基本构成:

l 获取execution environment

l 加载/创建原始数据

l 指定这些数据的转化方法

l 指定计算结果的存放位置

l 触发程序执行

file

五、datastreaming api使用

1、获取execution environment

streamexecutionenvironment是所有flink程序的基础,获取方法有:

getexecutionenvironment()

createlocalenvironment()

createremoteenvironment(string host, int port, string ... jarfiles)

一般情况下使用getexecutionenvironment。如果你在ide或者常规java程序中执行可以通过createlocalenvironment创建基于本地机器的streamexecutionenvironment。如果你已经创建jar程序希望通过invoke方式获取里面的getexecutionenvironment方法可以使用createremoteenvironment方式。

2、加载/创建原始数据

streamexecutionenvironment提供的一些访问数据源的接口

(1)基于文件的数据源

readtextfile(path)
readfile(fileinputformat, path)
readfile(fileinputformat, path, watchtype, interval, pathfilter, typeinfo)

(2)基于socket的数据源(本文使用的)

sockettextstream

 

(3)基于collection的数据源

fromcollection(collection)
fromcollection(iterator, class)
fromelements(t ...)
fromparallelcollection(splittableiterator, class)
generatesequence(from, to)

3、转化方法

(1)map方式:datastream -> datastream

功能:拿到一个element并输出一个element,类似hive中的udf函数

举例:

datastream<integer> datastream = //...
datastream.map(new mapfunction<integer, integer>() {
    @override
    public integer map(integer value) throws exception {
        return 2 * value;
    }
});

(2)flatmap方式:datastream -> datastream

功能:拿到一个element,输出多个值,类似hive中的udtf函数

举例:

datastream.flatmap(new flatmapfunction<string, string>() {
    @override
    public void flatmap(string value, collector<string> out)
        throws exception {
        for(string word: value.split(" ")){
            out.collect(word);
        }
    }
});

(3)filter方式:datastream -> datastream

功能:针对每个element判断函数是否返回true,最后只保留返回true的element

举例:

datastream.filter(new filterfunction<integer>() {
    @override
    public boolean filter(integer value) throws exception {
        return value != 0;
    }
});

(4)keyby方式:datastream -> keyedstream

功能:逻辑上将流分割成不相交的分区,每个分区都是相同key的元素

举例:

datastream.keyby("somekey") // key by field "somekey"
datastream.keyby(0) // key by the first element of a tuple

(5)reduce方式:keyedstream -> datastream

功能:在keyed data stream中进行轮训reduce。

举例:

keyedstream.reduce(new reducefunction<integer>() {
    @override
    public integer reduce(integer value1, integer value2)
    throws exception {
        return value1 + value2;
    }
});

(6)aggregations方式:keyedstream -> datastream

功能:在keyed data stream中进行聚合操作

举例:

keyedstream.sum(0);
keyedstream.sum("key");
keyedstream.min(0);
keyedstream.min("key");
keyedstream.max(0);
keyedstream.max("key");
keyedstream.minby(0);
keyedstream.minby("key");
keyedstream.maxby(0);
keyedstream.maxby("key");

(7)window方式:keyedstream -> windowedstream

功能:在keyedstream中进行使用,根据某个特征针对每个key用windows进行分组。

举例:

datastream.keyby(0).window(tumblingeventtimewindows.of(time.seconds(5))); // last 5 seconds of data

(8)windowall方式:datastream -> allwindowedstream

功能:在datastream中根据某个特征进行分组。

举例:

datastream.windowall(tumblingeventtimewindows.of(time.seconds(5))); // last 5 seconds of data

(9)union方式:datastream* -> datastream

功能:合并多个数据流成一个新的数据流

举例:

datastream.union(otherstream1, otherstream2, ...);

(10)split方式:datastream -> splitstream

功能:将流分割成多个流

举例:

splitstream<integer> split = somedatastream.split(new outputselector<integer>() {
    @override
    public iterable<string> select(integer value) {
        list<string> output = new arraylist<string>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

(11)select方式:splitstream -> datastream

功能:从split stream中选择一个流

举例:

splitstream<integer> split;
datastream<integer> even = split.select("even");
datastream<integer> odd = split.select("odd");
datastream<integer> all = split.select("even","odd");

4、输出数据

writeastext()
writeascsv(...)
print() / printtoerr() 
writeusingoutputformat() / fileoutputformat
writetosocket
addsink

更多flink相关原理:

穿梭时空的实时计算框架——flink对时间的处理

大数据实时处理的王者-flink

统一批处理流处理——flink批流一体实现原理

flink快速入门--安装与示例运行

快速构建第一个flink工程

更多实时计算,flink,kafka等相关技术博文,欢迎关注实时流式计算:

file

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网