当前位置: 移动技术网 > IT编程>数据库>其他数据库 > nifi processer介绍

nifi processer介绍

2020年03月25日  | 移动技术网IT编程  | 我要评论
2.3 NiFi Processor应用介绍 对于NiFi的使用者来说,如果想要创建一个高效的数据流,那么就需要了解什么样的单元处理器才最适合这个数据流。NiFi拥有大量的可以用于各种业务场景的单元处理器可供使用者挑选和使用,这些单元处理器主要提供例如系统之间数据的传输,数据的路由,数据的转换、处理 ...

2.3 nifi processor应用介绍
对于nifi的使用者来说,如果想要创建一个高效的数据流,那么就需要了解什么样的单元处理器才最适合这个数据流。nifi拥有大量的可以用于各种业务场景的单元处理器可供使用者挑选和使用,这些单元处理器主要提供例如系统之间数据的传输,数据的路由,数据的转换、处理、分割和聚合等大类的功能。
在每个nifi的版本发布中都会有大量的新的处理器单元产生,这就导致本书中讲重点介绍1.4.0版本及之前的常用处理器单元的功能,我们讲根据这些常用的处理器单元的不同用处进行分类。

2.3.1 数据转换类处理器单元

compresscontent

compresscontent处理器单元主要用途是对nifi数据流的flowfile的内容进行压缩和解压缩,支持的压缩种类如图

70

convertcharacterset

convertcharacterset处理器单元主要用途将nifi数据流的flowfile的内容从一种字符集转换成另外一种字符集。配置例子如图

70

encryptcontent

encryptcontent处理器单元主要用途将nifi数据流的flowfile的内容进行加密/解密传输。
70

replacetext

replacetext处理器单元主要用途是根据处理器属性配置的正则表达式对flowfile的内容进行匹配,如果匹配成功将会降匹配成功的字段替换为配置属性中的字段。将flowfile的内容全部替换为nifi的配置例子如图

70

2.3.2 数据路由类和调制处理器单元

controlrate

controlrate处理器单元用来控制数据流部分流量的速率。

上面的图中的例子表示1分钟内只允许最多1000个flowfile流过。

detectduplicate

detectduplicate处理器单元用来依据用户定义的特征来监控和发现重复的flowfile。通常这个处理器会搭配hashcontent单元处理器来完成功能。

上面的图中的例子表示processor根据输入的flowfile的hash.value属性值作为去重条件对flowfile进行匹配,将去重后的映射到non-duplicate的relationship中,将重复的flowfile映射到duplicate的relationship中。

monitoractivity

monitoractivity处理器单元可以在用户定义的时段内如果没有数据流量就是发送告警通知,也可以选择附加功能,在数据流量恢复之后发送恢复通知。

上面的图中例子标示processor每1分钟内没有flowfile输入就会不间断的发出inactivity message属性的内容,且检测范围是本node节点。

routeonattribute

routeonattribute处理器单元可以根据flowfile的属性制定路由规则来对flowfile进行路由。

上面的图中例子表示processor根据输入的flowfile的value属性进行路由,将含有hello的flowfile路由到include hello text的relationship中,将含有world的flowfile路由到include world text的relationship中。

scanattribute

scanattribute处理器单元用途是将flowfile属性中被用户定义的属性与用户自定义的字典进行对比,看是否能够匹配。

上面的图中例子表示processor输入的flowfile中的属性值只要有一个包含了sample.txt字典中任意一行的字符,那么processor就会将这个flowfile路由到matched的relationship中。

routeoncontent

routeoncontent处理器单元的功能近似于routeonattribute,区别在于routeoncontent处理器单元进行路由判定的内容是flowfile的内容而不是之前routeonattribute处理器单元所使用的属性。

上面的图中例子表示processor根据输入flowfile的内容进行路由,如果输入的flowfile的内容为hello,那么它将会被路由到hello relationship的relationship中。

scancontent

scancontent处理器单元同样也近似于scanattribute,区别在于前者用户选取的比对对象是内容而后者定义却是属性。

上面的图中例子表示processor根据输入flowfile内容进行扫描路由,如果flowfile的内容为hello,那么它将会被路由到matched的relationship中。

validatexml

validatexml处理器单元将flowfile的xml内容和用户的xml定义进行校验,将符合xml定义的flowfile进行路由。

上面的图中表示processor根据输入xml的schema文件对输入的flowfile内容进行校验匹配,如果校验合格的flowfile会被映射到valid的relationship中。

上面的图中表示processor根据输入的flowfile中的hiveql往hive中写入或者更新数据。

2.3.3 数据接入类处理器单元

convertjsontosql

convertjsontosql处理器单元可以将结构化的json转换成insert或者update这样命令的sql,配合putsql处理器单元可以直接根据这鞋命令将数据插入数据库中。

上面的图中表示processor根据输入的flowfile的json内容,将json转化成update的sql语句。

executesql

executesql处理器单元直接运行运行用户配置的sql查询语句,并将查询结果以avro的格式写入到flowfile的内容中去。

上面的图中表示processor根据用户配置sql select query语句,从数据库中查询出结果,并将结果flowfile映射到success的relationship中。

putsql

putsql处理器单元可以根据传入的flowfile内容中的ddm sql对数据库进行更新操作。

上面的图中表示processor根据输入的flowfile的sql内容,每100个sql作为一个事务提交数据库,并将生成的key返回且在事务提交失败的情况下对事务进行回滚。

selecthiveql

selecthiveql处理器单元执行hive的查询语句hiveql,并且将结果以avro或者csv的格式写入到flowfile中。

上面的图中表示processor根据hiveql语句查询hive,并将结果以csv格式输出,csv拥有header为username和age。

puthiveql

puthiveql处理器根据传入的hiveql ddm语句对hive数据仓库的内容进行更新。

上面的图中表示processor根据输入的flowfile中的hiveql往hive中写入或者更新数据。

2.3.4 属性提取类处理器单元

evaluatejsonpath

evaluatejsonpath处理器单元根据用户定义的jsonpath表达式对flowfile的json内容进行解析,将这些表达式所解析出来的内容替换flowfile的内容或者将其更新到flowfile的属性中,以便于后续的单元处理器的引用。

上面的图中表示processor将输入内容为json格式的flowfile例如{“name”:”zhangsan”,”phone”:”13734564321”},将其中的phone解析出来后输出到flowfile的内容中。

evaluatexpath

evaluatexpath处理器单元功能近似于evaluatejsonpath,根据用户提供的xpath表达式,将flowfile的xml内容用表达式进行解析,将解析的结果替换如flowfile的正文或者更新flowfile的属性。

上面的图中表示processor对输入内容为xml格式的flowfile利用配置xpath表达式进行解析,并将结果输出到flowfile的内容中。

evaluatexquery

evaluatexquery处理器单元根据用户定义的xquery,将flowfile的xml正文与表达式进行进行虬枝,将提取的结果替换flowfile的正文或者更新flowfile的属性。

上面的图中表示processor对输入内容为xml格式的flowfile利用配置的xquery表达式进行解析,并将结果以xml格式输入到flowfile的内容中。

hashattribute

hashattribute处理器单元对用户选择的已有属性列表的值拼接后的字符串进行hash计算。

上面的图中表示processor对输入的flowfile中value属性值进行hash计算,并将结果输出到flowfile的value属性中。

hashcontent

hashcontent处理器单元对flowfile的内容进行hash,并将hash值添加到flowfile的属性中。

上面的图中表示processor对输入的flowfile中的内容进行hash计算,并将结果输出到flowfile的hash.value属性中。

identifymimetype

identifymimetype处理器单元对flowfile的内容格式进行判定。此处理器能够检测许多不同的mime类型,例如它能够判定出flowfile的内容是图片,文本和压缩文件等格式。

updateattribute

updateattribute处理器单元可以对flowfile添加任意的用户定义的属性。这将有利于对flowfile添加静态的属性,也可以根据nifi表达式语言来动态的添加属性。

上面的图中表示processor对输入的flowfile中属性进行修改,添加一个键值为value值为helloworld的属性。

2.3.5 系统交互类处理器单元

executeprocess

executeprocess处理器单元能够运行用户定义的操作系统命令,将处理完的标准输出内容写入flowfile中。该处理器是一个不需要输入的源处理器,它会输出产生一个新的flowfile。如果需要提供输入源请使用下面介绍的executestreamcommand处理器单元。

上面的图中表示processor根据输入的指令和参数执行命令ls –l /user,并将结果输出到flowfile中。

executestreamcommand

executestreamcommand处理器单元运行用户定义的操作系统命令。输入的flowfile的内容作为命令的标准输入。将处理完的标准输出内容写入flowfile内容中。此处理器单元不同于executeprocess,它必须有flowfile的输入才能正常完成功能。

上面的图中表示processor根据输入的flowfile中的path属性值为/usr/cmd.sh命令脚本的运行参数来运行命令,并将结果输出到flowfile中。

2.3.6 数据提取类处理器单元

getfile

getfile处理器单元从本地磁盘获取文件的内容到nifi,并删除原有的磁盘文件。这个处理器应用场景是将一个文件从一个地方搬移到另外一个地方而不是对文件的拷贝。

上面的图中表示processor将/user/sample.txt文件的内容输出到flowfile的内容中。

getftp

getftp处理器单元从ftp服务器文件内容输出到flowfile中,并可以选择删除原有文件。同样它的使用场景是文件的搬移而不是文件的拷贝。

上面的图中表示processor将ftpserver01上/resource路径下的文件内容输出到flowfile中,并将源文件删除。

getsftp

getsftp处理器单元从sftp文件内容输出到flowfile中,并可以选择删除原有文件。同样它的使用场景是文件的搬移而不是文件的拷贝。

上面的图中表示processor将sftpserver01上/resource路径下的文件内容输出到flowfile中,并将源文件删除。

getjmsqueue

getjmsqueue处理器单元从jms队列中下载消息,并通过jms message来创建flowfile的内容,同时也可以指定创建flowfile的属性。

getjmstopic

getjmstopic处理器单元从jms的topic中下载消息,并根据jms消息创建flowfile的内容,通过选择也能生成flowfile的属性。这个处理器单元支持长期和非长期的订阅模式。

gethttp

gethttp处理器单元能够根据url通过http或者https协议下载内容到nifi,从而形成的新的flowfile内容。同时处理器单元在下载的同时也记忆etag和最新修改时间来防止数据的重复下载问题。

上面的图中表示processor根据配置的url进行http访问,将访问结果发送到flowfile的内容中并且filename属性值为配置的filename的值。

listenhttp

listenhttp处理器单元启动一个http或者https监听端口,当监听到有post请求过来的时候,会首先返回200状态,并利用post的请求内容形成新的flowfile。

上面的图中表示processor监听8811端口的http post请求,当有post请求访问http://localhost:8811/contentlistener的时候,processor就会首先返回200状态,让后将post请求的参数输出到新的flowfile的内容中。

listenudp

listenudp处理器单元监听udp数据包,并根据配置获取一定量的包来创建一个flowfile并将flowfile发射到success的relationships关系中。

gethdfs

gethdfs处理器单元监控用户定义的hdfs指定路径的文件变化,当有新的文件写入hdfs中的该路径下,那么文件的内容被用来创建新的flowfile的内容,同时删除原有的文件。这个处理器同前面一样适用于文件的搬移场景而非文件的复制场景。

上面的图中表示processor将hdfs上/target路径下的文件内容输出到flowfile中,并将源文件删除。

2.3.7数据发送类处理器单元

putemail

putemail处理器单元主要功能是将flowfile的内容以邮件的形式发送给配置的用户邮箱,也可以通过配置选择将flowfile的内容以附件的方式发送出去。

putfile

putfile处理器主要功能是将flowfile的内容以文件的形式写入本地磁盘。

上面的图中表示processor将接收到的flowfile的内容写入到本地的磁盘文件中。(注意:1.5.0之前此processor不支持追加写入)

putftp

putftp处理器单元将flowfile的内容拷贝到远程的ftp服务器上。

上面的图中表示processor将输入的flowfile的内容通过ftp协议写入到ftpserver01的/upload路径下且上传路径不存在的情况下自动创建路径。

putsftp

putsftp处理器单元主要功能将flowfile的内容拷贝到远程的sftp服务器上。

上面的图中表示processor将输入的flowfile的内容通过sftp协议写入到sftpserver01的/upload路径下且上传路径不存在的情况下自动创建路径。

putjms

putjms处理器单元主要功能将flowfile的内容座位jms消息发送到jms代理上,也可以通过配置根据flowfile的属性来添加jms配置属性。

putsql

putsql处理器单元的主要功能是将flowfile的正文当作sql ddl声明。flowfile必须是正确的符合sql规范的sql声明。flowfile的属性被用作ddl sql的参数,这样可以有效的防止sql注入攻击。

上面的图中表示processor将输入的flowfile的内容按照100个进行batch操作写入数据库。

putkafka

putkafka处理器单元专门是针对0.8.x版本的kafka,它将flowfile的内容以消息的形式发送到kafka消息队列中。flowfile的内容既可以作为一条完整的消息发送到kafka,同时也可以通过分隔符将它切分为多个消息来发送到kafka,例如换行符。

上面的图中表示processor从localhost安装的kafka的sample_topic_a消费数据,并将数据输出到flowfile的内容中。

putmongo

putmongo处理器单元将flowfile的内容插入或者更新到mongodb中。

上面的图中表示processor根据输入的flowfile内容中的doc来写入mongodb。

2.3.8切分和聚合类处理器单元

splittext

splittext处理器单元可以将一个文本内容的flowfile切分成你想要数量的flowfile。

上面的图中表示processor将输入的flowfile的内容切分成多个flowfile,每个flowfile的内容都来自于flowfile中的一行内容。

splitjson

splitjson处理器单元可以将一个json对象根据它的结构拆解成json内部的字对象。

上面的图中表示processor将输入的flowfile内容中的json按照jsonpath表达式$.*进行第一级切分生成新的flowfile。

splitxml

splitxml处理器单元可以将xml消息分解为多个flowfile,且新的flowfile中包含原有的分段信息。这种处理器单元经常适用于多个xml元素被封装在一个元素中,而此处理器单元允许这些元素分离成各自单独的xml元素。

上面的图中表示processor对于输入的flowfile内容中的xml按照第一层级进行切分,切分出来的子xml输出到flowfile中。

unpackcontent

unpackcontent处理器单元可以对压缩格式的文件如zip和tar进行解压,且解压后的文件作为一个flowfile的内容输出。

上图中unpackcontent和identifymimetype一起使用,后者输出的flowfile由前者来进行处理,unpackcontent根据输入的flowfile的mime.type属性对flowfile的内容进行解压。

mergecontent

mergecontent处理器单元的主要功能是将多个flowfile的内容合并成一个flowfile。这些flowfile的内容合并的同时,也可以通过配置对合并后的内容增加标题,页脚和分隔符,也可以对合并后的内容置顶归档格式,比如zip和tar。在flowfile合并的过程中可以依据相同的属性进行合并,也可以根据之前分片处理器分片后的序号来进行合并。用户可以定义合并后flowfile内容的最大值和最小值,当达到这个值的时候flowfile就合并完毕。为了防止在flowfile没有达到配置的大小值的过程中时间太久,用户也可以通过配置超时参数来有效的解决这个问题。

上图中表示processor将输入的flowfile的内容按照从queue中任意消费的flowfile的内容进行merge输出到新的flowfile中,flowfile的内容格式为tar,选择各个输入flowfile中一致的属性写入到新输出的flowfile中,对于不同的metadata不进行merge,输出的新的flowfile内容中同事也增加了页头和页脚。

segmentcontent

segmentcontent处理器单元可以根据配置切分后的flowfile大小将一个大的flowfile切分成许多小的flowfile。分片是基于字节的偏移量而不是分隔符。这种将大的flowfile以分片的形式进行传输可以有效的减少大文件传输过程中的延时问题。当这些分片传输到达目标端的时候,可以通过其它的处理器单元重新进行组装,例如上面所说的mergecontent处理器单元。

上面的途中表示processor把输入的flowfile的内容按照1mb的大小进行切分,切分成新的flowfile且新的flowfile中写入了分片的序号segment.index和数量segment.count属性。

splitcontent

splitcontent处理器单元的功能近似于segmentcontent将一个flowfile分解成多个flowfile。但区别在于splitcontent在进行分解的过程中不是按照设定的字节大小,而是根据分隔符进行分裂。

上面的图中表示processor对输入的flowfile的内容按照竖线 | 符号进行切分,切分成多个flowfile。

2.3.9 http协议类处理器单元

gethttp

gethttp处理器单元对配置的http或者https协议的url发起请求并将返回结果输出到新的flowfile中。而且gethttp会记录etag和最新数据修改时间避免不停的访问给服务端产生不必要的开销。如下图

listenhttp

listenhttp处理器单元监听http或者https请求,如果有请求先返回200然后将post的请求参数输出到新的flowfile中。

上面的图中表示processor监听locahost的http请求,请求url为http://localhost:9080/contentlistener

invokehttp

invokehttp处理器单元能够根据用户的配置发送http协议请求。invokehttp处理器单元通过更多的配置可以完成比gethttp和posthttp更多的功能。如下图

posthttp

posthttp处理器单元将flowfile的内容作为http post请求的body消息。它通常与listenhttp处理器单元组合使用,应用于当多个nifi实例之间不能通过site-to-site的方式进行数据交换的场景。如下图

handlehttprequest / handlehttpresponse

handlehttprequest处理器单元可以作为一个源处理器单元来启动一个http监听服务功能,类似于listenhttp。但是这个处理器不响应客户端,它将请求的参数以flowfile的内容和属性的方式,响数据流的下游进行传递。handlehttpresponse处理器单元能够响应并将处理后的flowfile结果返回请求的客户端。这两个处理器通常都是在一起被使用的。

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网