当前位置: 移动技术网 > IT编程>数据库>其他数据库 > Flink中TaskManager端执行用户逻辑过程(源码分析)

Flink中TaskManager端执行用户逻辑过程(源码分析)

2019年09月11日  | 移动技术网IT编程  | 我要评论

taskmanager接收到来自jobmanager的jobgraph转换得到的tdd对象,启动了任务,在streaminputprocessor类的processinput()方法中

通过一个while(true)中不停的拉取上游的数据,然后调用streamoperator.processelement(record)调用用户实现的方法去处理数据拉取的数据

首先先来看下这个operator对象

然后看看oneinputstreamoperator类的uml

这里所有的实现类没有全部列出,只列了一些代表

看到这里,写过flink的streamapi的同学,肯定感觉到很熟悉!!!!!!

这里!不就是我们常写flink代码的那些算子嘛

对没有错,我们程序中实现的那些算子逻辑,最后都会被封装成一个oneinputstreamoperator,这里具体看一个最熟悉的fliter

来看一下streamfilter的processelement方法

!!!这里传入一个数据后,这个userfunction调用了filter方法并且把数据放进去了

当返回true通过这个output.collect发送出去了

这不就对应了我们用户自己实现的filter算子嘛,没错这个方法其实就是客户端的filter方法,这个userfunction包含了用户实现filter算子的逻辑

(!!!!!就是说这个processelement方法会调用用户的逻辑)

(所以这个userfunction可以带上client的方法实现,这对我们很重要,特别是对flink源码修改,为clientapi添加新功能方法,运行时可以通过这里拿到)

继续

来看看这个output.collect()方法

然后

 

看到这个,等等等等

我不是从这个processelement()方法进来的吗,怎么又开始调processelement()方法了

难道递归了? 不对不对

这里operator不是上一个operator了,而是这个output对象的(这里是chainoutput)

看下这个output对象

看下uml类图,也是只列举了重要的

先看chainingoutput的属性

 

发现了又出现了oneinputstreamoperator对象

看到这个实现类的名字!chain联想起了什么

flink会将可以chain在一起的算子在streamgraph转换成jobgraph的时候根据条件chain在一起

一惊!

来分别看一下chainingoutput和recordwriteroutput的collect()方法有什么区别

在chain中

 在recordwriter中

这里chain的ouput,又继续调用了下一个operator的processelement方法,然后又在processelement方法中又调用output.collect( ),collect中又调用了下一个operator的processelement方法

整个过程就是个无限的循环,直到,某一个operator的ouput不为chainingoutput,当变为recordwriteroutput时

上面看到recordwriteroutput的processelement直接emit发送出去了这个数据,再也没有继续调用processelement方法了

这里也就对应了,flink中的责任链,chain在一起的算子会一个接着一个执行,直到无法chain,就会往下游发送emit了

来看一下uml类图帮助理解

 

 里中有我,我中有你,一直相互调用直到无法chain,然后emit往下游发送(这里肯定就有发送端的反压逻辑,以后随缘更新)

那这里的循环调用理解了就会想,那如何确定第一个operator调用,然后进入整个调用链呢

回到taskmanager接收到jobmanager的tdd以后初始化整个任务的时候

streamtask.java中invoke方法中

 先是初始化了一个operatorchain,里面其实就是一个数组streamoperator

在他初始化的时候,其实就是为我们所有的streamoutputs设置了他的output以及会根据jobmanager发送过来的tdd(包含信息)

设置成对应的chainingoutput还是recordwriteroutput,chainoutput会设置他的的operator

然后获取了getheadoperator()其实就是获取了他调用连中的第一个

然后在

 

将这个第一个operator关联到了inputprocessor对象里面

后面就简单了在inputprocessor.processinput中就进入了while(true)循环拉取上游数据的逻辑

然后

在这里调用的第一个processelement方法就是我们的那个headoperator

这样整个调用责任链就开始从第一个operator运行起来了

 

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网