当前位置: 移动技术网 > IT编程>数据库>其他数据库 > Flink1.9整合Kafka

Flink1.9整合Kafka

2019年09月20日  | 移动技术网IT编程  | 我要评论
本文基于Flink1.9版本简述如何连接Kafka。 流式连接器 我们知道可以自己来开发Source 和 Sink ,但是一些比较基本的 Source 和 Sink 已经内置在 Flink 里。 预定义的source支持从文件、目录、socket,以及 collections 和 iterators ...

file

本文基于flink1.9版本简述如何连接kafka。

流式连接器

file

我们知道可以自己来开发source 和 sink ,但是一些比较基本的 source 和 sink 已经内置在 flink 里。

预定义的source支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。

预定义的sink支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。

连接器可以和多种多样的第三方系统进行交互。目前支持以下系统:

  • apache kafka
  • apache cassandra(sink)
  • amazon kinesis streams(source/sink)
  • elasticsearch(sink)
  • hadoop filesystem (sink)
  • rabbitmq(source/sink)
  • apache nifi(source/sink)
  • twitter streaming api(source)

请记住,在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列。

apache bahir 中定义了其他一些连接器

  • apache activemq(source/sink)
  • apache flume(sink)
  • redis(sink)
  • akka (sink)
  • netty (source)

使用connector并不是唯一可以使数据进入或者流出flink的方式。一种常见的模式是从外部数据库或者 web 服务查询数据得到初始数据流,然后通过 map 或者 flatmap 对初始数据流进行丰富和增强,这里要使用flink的异步io。

而向外部存储推送大量数据时会导致 i/o 瓶颈问题出现。在这种场景下,如果对数据的读操作远少于写操作,可以让外部应用从 flink 拉取所需的数据,需要用到flink的可查询状态接口。

本文重点介绍apache kafka connector

kafka连接器

此连接器提供对apache kafka提供的事件流的访问。

flink提供特殊的kafka连接器,用于从/向kafka主题读取和写入数据。flink kafka consumer集成了flink的检查点机制,可提供一次性处理语义。为实现这一目标,flink并不完全依赖kafka 的消费者组的偏移量,而是在内部跟踪和检查这些偏移。

下表为不同版本的kafka与flink kafka consumer的对应关系。

maven dependency supported since consumer and producer class name kafka version
flink-connector-kafka-0.8_2.11 1.0.0 flinkkafkaconsumer08 flinkkafkaproducer08 0.8.x
flink-connector-kafka-0.9_2.11 1.0.0 flinkkafkaconsumer09 flinkkafkaproducer09 0.9.x
flink-connector-kafka-0.10_2.11 1.2.0 flinkkafkaconsumer010 flinkkafkaproducer010 0.10.x
flink-connector-kafka-0.11_2.11 1.4.0 flinkkafkaconsumer011 flinkkafkaproducer011 0.11.x
flink-connector-kafka_2.11 1.7.0 flinkkafkaconsumer flinkkafkaproducer >= 1.0.0

而从最新的flink1.9.0版本开始,使用kafka 2.2.0客户端。

下面简述使用步骤。

导入maven依赖:

<dependency>
  <groupid>org.apache.flink</groupid>
  <artifactid>flink-connector-kafka_2.11</artifactid>
  <version>1.9.0</version>
</dependency>

安装kafka:

可以参照 kafka入门宝典(详细截图版)

兼容性:

从flink 1.7开始,它不跟踪特定的kafka主要版本。相反,它在flink发布时跟踪最新版本的kafka。如果您的kafka代理版本是1.0.0或更高版本,则应使用此kafka连接器。如果使用旧版本的kafka(0.11,0.10,0.9或0.8),则应使用与代理版本对应的连接器。

升级connect要注意flink升级作业,同时

  • 在整个过程中使用flink 1.9或更新版本。
  • 不要同时升级flink和运营商。

  • 确保您作业中使用的kafka consumer和/或kafka producer分配了唯一标识符(uid)。

  • 使用stop with savepoint功能获取保存点(例如,使用stop --withsavepoint)。

用法:

引入依赖后,实例化新的source(flinkkafkaconsumer)和sink(flinkkafkaproducer)。

kafka consumer

先分步骤介绍构建过程,文末附flink1.9连接kafka完整代码。

kafka consumer 根据版本分别叫做flinkkafkaconsumer08 flinkkafkaconsumer09等等
kafka >= 1.0.0 的版本就叫flinkkafkaconsumer。

构建flinkkafkaconsumer

java示例代码如下:

properties properties = new properties();
properties.setproperty("bootstrap.servers", "localhost:9092");
// only required for kafka 0.8
properties.setproperty("zookeeper.connect", "localhost:2181");
properties.setproperty("group.id", "test");
datastream<string> stream = env
    .addsource(new flinkkafkaconsumer<>("topic", new simplestringschema(), properties));

scala:

val properties = new properties()
properties.setproperty("bootstrap.servers", "localhost:9092")
// only required for kafka 0.8
properties.setproperty("zookeeper.connect", "localhost:2181")
properties.setproperty("group.id", "test")
stream = env
    .addsource(new flinkkafkaconsumer[string]("topic", new simplestringschema(), properties))
    .print()

必须有的:

1.topic名称

2.用于反序列化kafka数据的deserializationschema / kafkadeserializationschema

3.配置参数:“bootstrap.servers” “group.id” (kafka0.8还需要 “zookeeper.connect”)

配置消费起始位置

java:

final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

flinkkafkaconsumer<string> myconsumer = new flinkkafkaconsumer<>(...);
myconsumer.setstartfromearliest();     // start from the earliest record possible
myconsumer.setstartfromlatest();       // start from the latest record
myconsumer.setstartfromtimestamp(...); // start from specified epoch timestamp (milliseconds)
myconsumer.setstartfromgroupoffsets(); // the default behaviour

//指定位置
//map<kafkatopicpartition, long> specificstartoffsets = new hashmap<>();
//specificstartoffsets.put(new kafkatopicpartition("mytopic", 0), 23l);
//myconsumer.setstartfromspecificoffsets(specificstartoffsets);

datastream<string> stream = env.addsource(myconsumer);

scala:

val env = streamexecutionenvironment.getexecutionenvironment()

val myconsumer = new flinkkafkaconsumer[string](...)
myconsumer.setstartfromearliest()      // start from the earliest record possible
myconsumer.setstartfromlatest()        // start from the latest record
myconsumer.setstartfromtimestamp(...)  // start from specified epoch timestamp (milliseconds)
myconsumer.setstartfromgroupoffsets()  // the default behaviour

//指定位置
//val specificstartoffsets = new java.util.hashmap[kafkatopicpartition, java.lang.long]()
//specificstartoffsets.put(new kafkatopicpartition("mytopic", 0), 23l)
//myconsumer.setstartfromspecificoffsets(specificstartoffsets)

val stream = env.addsource(myconsumer)
检查点

启用flink的检查点后,flink kafka consumer将使用主题中的记录,并以一致的方式定期检查其所有kafka偏移以及其他操作的状态。如果作业失败,flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用kafka的记录。

如果禁用了检查点,则flink kafka consumer依赖于内部使用的kafka客户端的自动定期偏移提交功能。

如果启用了检查点,则flink kafka consumer将在检查点完成时提交存储在检查点状态中的偏移量。

java

final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
env.enablecheckpointing(5000); // checkpoint every 5000 msecs

scala

val env = streamexecutionenvironment.getexecutionenvironment()
env.enablecheckpointing(5000) // checkpoint every 5000 msecs
分区发现

flink kafka consumer支持发现动态创建的kafka分区,并使用一次性保证消费它们。

还可以使用正则:

java

final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

properties properties = new properties();
properties.setproperty("bootstrap.servers", "localhost:9092");
properties.setproperty("group.id", "test");

flinkkafkaconsumer011<string> myconsumer = new flinkkafkaconsumer011<>(
    java.util.regex.pattern.compile("test-topic-[0-9]"),
    new simplestringschema(),
    properties);

datastream<string> stream = env.addsource(myconsumer);
...

scala

val env = streamexecutionenvironment.getexecutionenvironment()

val properties = new properties()
properties.setproperty("bootstrap.servers", "localhost:9092")
properties.setproperty("group.id", "test")

val myconsumer = new flinkkafkaconsumer08[string](
  java.util.regex.pattern.compile("test-topic-[0-9]"),
  new simplestringschema,
  properties)

val stream = env.addsource(myconsumer)
...
时间戳和水印

在许多情况下,记录的时间戳(显式或隐式)嵌入记录本身。另外,用户可能想要周期性地或以不规则的方式发出水印。

我们可以定义好timestamp extractors / watermark emitters,通过以下方式将其传递给您的消费者:

java

properties properties = new properties();
properties.setproperty("bootstrap.servers", "localhost:9092");
// only required for kafka 0.8
properties.setproperty("zookeeper.connect", "localhost:2181");
properties.setproperty("group.id", "test");

flinkkafkaconsumer08<string> myconsumer =
    new flinkkafkaconsumer08<>("topic", new simplestringschema(), properties);
myconsumer.assigntimestampsandwatermarks(new customwatermarkemitter());

datastream<string> stream = env
    .addsource(myconsumer)
    .print();

scala

val properties = new properties()
properties.setproperty("bootstrap.servers", "localhost:9092")
// only required for kafka 0.8
properties.setproperty("zookeeper.connect", "localhost:2181")
properties.setproperty("group.id", "test")

val myconsumer = new flinkkafkaconsumer08[string]("topic", new simplestringschema(), properties)
myconsumer.assigntimestampsandwatermarks(new customwatermarkemitter())
stream = env
    .addsource(myconsumer)
    .print()

kafka producer

kafka producer 根据版本分别叫做flinkproducer011 flinkkafkaproducer010等等
kafka >= 1.0.0 的版本就叫flinkkafkaproducer 。

构建flinkkafkaconsumer

java

datastream<string> stream = ...;

flinkkafkaproducer011<string> myproducer = new flinkkafkaproducer011<string>(
        "localhost:9092",            // broker list
        "my-topic",                  // target topic
        new simplestringschema());   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to kafka;
// this method is not available for earlier kafka versions
myproducer.setwritetimestamptokafka(true);

stream.addsink(myproducer);

scala

val stream: datastream[string] = ...

val myproducer = new flinkkafkaproducer011[string](
        "localhost:9092",         // broker list
        "my-topic",               // target topic
        new simplestringschema)   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to kafka;
// this method is not available for earlier kafka versions
myproducer.setwritetimestamptokafka(true)

stream.addsink(myproducer)

需要指定broker list , topic,序列化类。

自定义分区:默认情况下,将使用flinkfixedpartitioner将每个flink kafka producer并行子任务映射到单个kafka分区。

可以实现flinkkafkapartitioner类自定义分区。

flink1.9消费kafka完整代码:

import org.apache.flink.api.common.serialization.simplestringschema;
import org.apache.flink.streaming.api.datastream.datastream;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.streaming.connectors.kafka.flinkkafkaconsumer;
import java.util.properties;

public class kafkaconsumer {

    public static void main(string[] args) throws exception {
        final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

        properties properties = new properties();
        properties.setproperty("bootstrap.servers", "localhost:9092");
        properties.setproperty("group.id", "test");
        //构建flinkkafkaconsumer
        flinkkafkaconsumer<string> myconsumer = new flinkkafkaconsumer<>("topic", new simplestringschema(), properties);
        //指定偏移量
        myconsumer.setstartfromearliest();


        datastream<string> stream = env
                .addsource(myconsumer);

        env.enablecheckpointing(5000);
        stream.print();

        env.execute("flink streaming java api skeleton");
    }

项目地址:

更多flink知识,欢迎关注实时流式计算

file

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网