当前位置: 移动技术网 > IT编程>数据库>其他数据库 > Flink源码分析 - 剖析一个简单的Flink程序

Flink源码分析 - 剖析一个简单的Flink程序

2019年06月06日  | 移动技术网IT编程  | 我要评论

本篇文章首发于头条号flink程序是如何执行的?通过源码来剖析一个简单的flink程序,欢迎关注和微信公众号“大数据技术和人工智能”(微信搜索bigdata_ai_tech)获取更多干货,也欢迎关注我的csdn博客

在这之前已经介绍了如何在本地搭建flink环境和如何创建flink应用如何构建flink源码,这篇文章用官方提供的socketwindowwordcount例子来解析一下一个常规flink程序的每一个基本步骤。

示例程序

public class socketwindowwordcount {
    public static void main(string[] args) throws exception {
        // the host and the port to connect to
        final string hostname;
        final int port;
        try {
            final parametertool params = parametertool.fromargs(args);
            hostname = params.has("hostname") ? params.get("hostname") : "localhost";
            port = params.getint("port");
        } catch (exception e) {
            system.err.println("no port specified. please run 'socketwindowwordcount " +
                    "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                    "and port is the address of the text server");
            system.err.println("to start a simple text server, run 'netcat -l <port>' and " +
                    "type the input text into the command line");
            return;
        }
        // get the execution environment
        final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
        // get input data by connecting to the socket
        datastream<string> text = env.sockettextstream(hostname, port, "\n");
        // parse the data, group it, window it, and aggregate the counts
        datastream<wordwithcount> windowcounts = text
                .flatmap(new flatmapfunction<string, wordwithcount>() {
                    @override
                    public void flatmap(string value, collector<wordwithcount> out) {
                        for (string word : value.split("\\s")) {
                            out.collect(new wordwithcount(word, 1l));
                        }
                    }
                })
                .keyby("word")
                .timewindow(time.seconds(5))

                .reduce(new reducefunction<wordwithcount>() {
                    @override
                    public wordwithcount reduce(wordwithcount a, wordwithcount b) {
                        return new wordwithcount(a.word, a.count + b.count);
                    }
                });
        // print the results with a single thread, rather than in parallel
        windowcounts.print().setparallelism(1);
        env.execute("socket window wordcount");
    }
    // ------------------------------------------------------------------------
    /**
     * data type for words with count.
     */
    public static class wordwithcount {
        public string word;
        public long count;
        public wordwithcount() {}
        public wordwithcount(string word, long count) {
            this.word = word;
            this.count = count;
        }
        @override
        public string tostring() {
            return word + " : " + count;
        }
    }
}

上面这个是官网的socketwindowwordcount程序示例,它首先从命令行中获取socket连接的host和port,然后获取执行环境、从socket连接中读取数据、解析和转换数据,最后输出结果数据。
每个flink程序都包含以下几个相同的基本部分:

  1. 获得一个execution environment,
  2. 加载/创建初始数据,
  3. 指定此数据的转换,
  4. 指定放置计算结果的位置,
  5. 触发程序执行

flink执行环境

final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

flink程序都是从这句代码开始,这行代码会返回一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回一个由createlocalenvironment()创建的本地执行环境localstreamenvironment。从其源码里可以看出来:

//代码目录:org/apache/flink/streaming/api/environment/streamexecutionenvironment.java
public static streamexecutionenvironment getexecutionenvironment() {
    if (contextenvironmentfactory != null) {
        return contextenvironmentfactory.createexecutionenvironment();
    }
    executionenvironment env = executionenvironment.getexecutionenvironment();
    if (env instanceof contextenvironment) {
        return new streamcontextenvironment((contextenvironment) env);
    } else if (env instanceof optimizerplanenvironment || env instanceof previewplanenvironment) {
        return new streamplanenvironment(env);
    } else {
        return createlocalenvironment();
    }
}

获取输入数据

datastream<string> text = env.sockettextstream(hostname, port, "\n");

这个例子里的源数据来自于socket,这里会根据指定的socket配置创建socket连接,然后创建一个新数据流,包含从套接字无限接收的字符串,接收的字符串由系统的默认字符集解码。当socket连接关闭时,数据读取会立即终止。通过查看源码可以发现,这里实际上是通过指定的socket配置来构造一个sockettextstreamfunction实例,然后源源不断的从socket连接里读取输入的数据创建数据流。

//代码目录:org/apache/flink/streaming/api/environment/streamexecutionenvironment.java
@publicevolving
public datastreamsource<string> sockettextstream(string hostname, int port, string delimiter, long maxretry) {
    return addsource(new sockettextstreamfunction(hostname, port, delimiter, maxretry),
            "socket stream");
}

sockettextstreamfunction的类继承关系如下:
sockettextstreamfunction类关系图

可以看出sockettextstreamfunctionsourcefunction的子类,sourcefunction是flink中所有流数据源的基本接口。sourcefunction的定义如下:

//代码目录:org/apache/flink/streaming/api/functions/source/sourcefunction.java
@public
public interface sourcefunction<t> extends function, serializable {
    void run(sourcecontext<t> ctx) throws exception;
    void cancel();
    @public
    interface sourcecontext<t> {
        void collect(t element);
        @publicevolving
        void collectwithtimestamp(t element, long timestamp);
        @publicevolving
        void emitwatermark(watermark mark);
        @publicevolving
        void markastemporarilyidle();
        object getcheckpointlock();
        void close();
    }
}

sourcefunction定义了runcancel两个方法和sourcecontext内部接口。

  • run(sourcecontex):实现数据获取逻辑,并可以通过传入的参数ctx进行向下游节点的数据转发。
  • cancel():用来取消数据源,一般在run方法中,会存在一个循环来持续产生数据,cancel方法则可以使该循环终止。
  • sourcecontext:source函数用于发出元素和可能的watermark的接口,返回source生成的元素的类型。

了解了sourcefunction这个接口,再来看下sockettextstreamfunction的具体实现(主要是run方法),逻辑就已经很清晰了,就是从指定的hostname和port持续不断的读取数据,按回车换行分隔符划分成一个个字符串,然后再将数据转发到下游。现在回到streamexecutionenvironmentsockettextstream方法,它通过调用addsource返回一个datastreamsource实例。思考一下,例子里的text变量是datastream类型,为什么源码里的返回类型却是datastreamsource呢?这是因为datastreamdatastreamsource的父类,下面的类关系图可以看出来,这也体现出了java的多态的特性。
datastreamsource类关系图

数据流操作

对上面取到的datastreamsource,进行flatmapkeybytimewindowreduce转换操作。

datastream<wordwithcount> windowcounts = text
        .flatmap(new flatmapfunction<string, wordwithcount>() {
            @override
            public void flatmap(string value, collector<wordwithcount> out) {
                for (string word : value.split("\\s")) {
                    out.collect(new wordwithcount(word, 1l));
                }
            }
        })
        .keyby("word")
        .timewindow(time.seconds(5))
        .reduce(new reducefunction<wordwithcount>() {
            @override
            public wordwithcount reduce(wordwithcount a, wordwithcount b) {
                return new wordwithcount(a.word, a.count + b.count);
            }
        });

这段逻辑中,对上面取到的datastreamsource数据流分别做了flatmapkeybytimewindowreduce四个转换操作,下面说一下flatmap转换,其他三个转换操作读者可以试着自己查看源码理解一下。

先看一下flatmap方法的源码吧,如下。

//代码目录:org/apache/flink/streaming/api/datastream/datastream.java
public <r> singleoutputstreamoperator<r> flatmap(flatmapfunction<t, r> flatmapper) {
    typeinformation<r> outtype = typeextractor.getflatmapreturntypes(clean(flatmapper),
            gettype(), utils.getcalllocationname(), true);
    return transform("flat map", outtype, new streamflatmap<>(clean(flatmapper)));
}

这里面做了两件事,一是用反射拿到了flatmap算子的输出类型,二是生成了一个operator。flink流式计算的核心概念就是将数据从输入流一个个传递给operator进行链式处理,最后交给输出流的过程。对数据的每一次处理在逻辑上成为一个operator。上面代码中的最后一行transform方法的作用是返回一个singleoutputstreamoperator,它继承了datastream类并且定义了一些辅助方法,方便对流的操作。在返回之前,transform方法还把它注册到了执行环境中。下面这张图是一个由flink程序映射为streaming dataflow的示意图:
flink基本编程模型

结果输出

windowcounts.print().setparallelism(1);

每个flink程序都是以source开始以sink结尾,这里的print方法就是把计算出来的结果sink标准输出流。在实际开发中,一般会通过官网提供的各种connectors或者自定义的connectors把计算好的结果数据sink到指定的地方,比如kafka、hbase、filesystem、elasticsearch等等。这里的setparallelism是设置此接收器的并行度的,值必须大于零。

执行程序

env.execute("socket window wordcount");

flink有远程模式和本地模式两种执行模式,这两种模式有一点不同,这里按本地模式来解析。先看下execute方法的源码,如下:

//代码目录:org/apache/flink/streaming/api/environment/localstreamenvironment.java
@override
public jobexecutionresult execute(string jobname) throws exception {
    // transform the streaming program into a jobgraph
    streamgraph streamgraph = getstreamgraph();
    streamgraph.setjobname(jobname);
    jobgraph jobgraph = streamgraph.getjobgraph();
    jobgraph.setallowqueuedscheduling(true);
    configuration configuration = new configuration();
    configuration.addall(jobgraph.getjobconfiguration());
    configuration.setstring(taskmanageroptions.managed_memory_size, "0");
    // add (and override) the settings with what the user defined
    configuration.addall(this.configuration);
    if (!configuration.contains(restoptions.bind_port)) {
        configuration.setstring(restoptions.bind_port, "0");
    }
    int numslotspertaskmanager = configuration.getinteger(taskmanageroptions.num_task_slots, jobgraph.getmaximumparallelism());
    miniclusterconfiguration cfg = new miniclusterconfiguration.builder()
        .setconfiguration(configuration)
        .setnumslotspertaskmanager(numslotspertaskmanager)
        .build();
    if (log.isinfoenabled()) {
        log.info("running job on local embedded flink mini cluster");
    }
    minicluster minicluster = new minicluster(cfg);
    try {
        minicluster.start();
        configuration.setinteger(restoptions.port, minicluster.getrestaddress().get().getport());
        return minicluster.executejobblocking(jobgraph);
    }
    finally {
        transformations.clear();
        minicluster.close();
    }
}

这个方法包含三部分:将流程序转换为jobgraph、使用用户定义的内容添加(或覆盖)设置、启动一个minicluster并执行任务。关于jobgraph暂先不讲,这里就只说一下执行任务,跟进下return minicluster.executejobblocking(jobgraph);这行的源码,如下:

//代码目录:org/apache/flink/runtime/minicluster/minicluster.java
@override
public jobexecutionresult executejobblocking(jobgraph job) throws jobexecutionexception, interruptedexception {
    checknotnull(job, "job is null");
    final completablefuture<jobsubmissionresult> submissionfuture = submitjob(job);
    final completablefuture<jobresult> jobresultfuture = submissionfuture.thencompose(
        (jobsubmissionresult ignored) -> requestjobresult(job.getjobid()));
    final jobresult jobresult;
    try {
        jobresult = jobresultfuture.get();
    } catch (executionexception e) {
        throw new jobexecutionexception(job.getjobid(), "could not retrieve jobresult.", exceptionutils.stripexecutionexception(e);
    }
    try {
        return jobresult.tojobexecutionresult(thread.currentthread().getcontextclassloader());
    } catch (ioexception | classnotfoundexception e) {
        throw new jobexecutionexception(job.getjobid(), e);
    }
}

这段代码的核心逻辑就是final completablefuture<jobsubmissionresult> submissionfuture = submitjob(job);,调用了minicluster类的submitjob方法,接着看这个方法:

//代码目录:org/apache/flink/runtime/minicluster/minicluster.java
public completablefuture<jobsubmissionresult> submitjob(jobgraph jobgraph) {
    final completablefuture<dispatchergateway> dispatchergatewayfuture = getdispatchergatewayfuture();
    // we have to allow queued scheduling in flip-6 mode because we need to request slots
    // from the resourcemanager
    jobgraph.setallowqueuedscheduling(true);
    final completablefuture<inetsocketaddress> blobserveraddressfuture = createblobserveraddress(dispatchergatewayfuture);
    final completablefuture<void> jaruploadfuture = uploadandsetjobfiles(blobserveraddressfuture, jobgraph);
    final completablefuture<acknowledge> acknowledgecompletablefuture = jaruploadfuture
        .thencombine(
            dispatchergatewayfuture,
            (void ack, dispatchergateway dispatchergateway) -> dispatchergateway.submitjob(jobgraph, rpctimeout))
        .thencompose(function.identity());
    return acknowledgecompletablefuture.thenapply(
        (acknowledge ignored) -> new jobsubmissionresult(jobgraph.getjobid()));
}

这里的dispatcher组件负责接收作业提交,持久化它们,生成jobmanagers来执行作业并在主机故障时恢复它们。dispatcher有两个实现,在本地环境下启动的是minidispatcher,在集群环境上启动的是standalonedispatcher。下面是类结构图:
minidispatcher类结构图

这里的dispatcher启动了一个jobmanagerrunner,委托jobmanagerrunner去启动该job的jobmaster。对应的代码如下:

//代码目录:org/apache/flink/runtime/jobmaster/jobmanagerrunner.java
private completablefuture<void> verifyjobschedulingstatusandstartjobmanager(uuid leadersessionid) {
    final completablefuture<jobschedulingstatus> jobschedulingstatusfuture = getjobschedulingstatus();
    return jobschedulingstatusfuture.thencompose(
        jobschedulingstatus -> {
            if (jobschedulingstatus == jobschedulingstatus.done) {
                return jobalreadydone();
            } else {
                return startjobmaster(leadersessionid);
            }
        });
}

jobmaster经过一系列方法嵌套调用之后,最终执行到下面这段逻辑:

//代码目录:org/apache/flink/runtime/jobmaster/jobmaster.java
private void scheduleexecutiongraph() {
    checkstate(jobstatuslistener == null);
    // register self as job status change listener
    jobstatuslistener = new jobmanagerjobstatuslistener();
    executiongraph.registerjobstatuslistener(jobstatuslistener);
    try {
        executiongraph.scheduleforexecution();
    }
    catch (throwable t) {
        executiongraph.failglobal(t);
    }
}

这里executiongraph.scheduleforexecution();调用了executiongraph的启动方法。在flink的图结构中,executiongraph是真正被执行的地方,所以到这里为止,一个任务从提交到真正执行的流程就结束了,下面再回顾一下本地环境下的执行流程:

  1. 客户端执行execute方法;
  2. minicluster完成了大部分任务后把任务直接委派给minidispatcher
  3. dispatcher接收job之后,会实例化一个jobmanagerrunner,然后用这个实例启动job;
  4. jobmanagerrunner接下来把job交给jobmaster去处理;
  5. jobmaster使用executiongraph的方法启动整个执行图,整个任务就启动起来了。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网