当前位置: 移动技术网 > IT编程>数据库>其他数据库 > KafkaProducer源码分析

KafkaProducer源码分析

2019年09月16日  | 移动技术网IT编程  | 我要评论

kafka常用术语

broker:kafka的服务端即kafka实例,kafka集群由一个或多个broker组成,主要负责接收和处理客户端的请求

topic:主题,kafka承载消息的逻辑容器,每条发布到kafka的消息都有对应的逻辑容器,工作中多用于区分业务

partition:分区,是物理概念,代表有序不变的消息序列,每个topic由一个或多个partion组成

replica:副本,kafka中同一条消息拷贝到多个地方做数据冗余,这些地方就是副本,副本分为leader和follower,角色不同作用不同,副本是对partition而言的,每个分区可配置多个副本来实现高可用

record:消息,kafka处理的对象

offset:消息位移,分区中每条消息的位置信息,是单调递增且不变的值

producer:生产者,向主题发送新消息的应用程序

consumer:消费者,从主题订阅新消息的应用程序

consumer offset:消费者位移,记录消费者的消费进度,每个消费者都有自己的消费者位移

consumer group:消费者组,多个消费者组成一个消费者组,同时消费多个分区来实现高可用(组内消费者的个数不能多于分区个数以免浪费资源

reblance:重平衡,消费组内消费者实例数量变更后,其他消费者实例自动重新分配订阅主题分区的过程

下面用一张图展示上面提到的部分概念(用ppt画的图,太费劲了,画了老半天,有好用的画图工具欢迎推荐)

file

消息生产流程

先来个kafkaproducer的小demo

public static void main(string[] args) throws executionexception, interruptedexception {
        if (args.length != 2) {
            throw new illegalargumentexception("usage: com.ding.kafkaproducerdemo bootstrap-servers topic-name");
        }

        properties props = new properties();
        // kafka服务器ip和端口,多个用逗号分割
        props.put("bootstrap.servers", args[0]);
        // 确认信号配置
        // ack=0 代表producer端不需要等待确认信号,可用性最低
        // ack=1 等待至少一个leader成功把消息写到log中,不保证follower写入成功,如果leader宕机同时follower没有把数据写入成功
        // 消息丢失
        // ack=all leader需要等待所有follower成功备份,可用性最高
        props.put("ack", "all");
        // 重试次数
        props.put("retries", 0);
        // 批处理消息的大小,批处理可以增加吞吐量
        props.put("batch.size", 16384);
        // 延迟发送消息的时间
        props.put("linger.ms", 1);
        // 用来换出数据的内存大小
        props.put("buffer.memory", 33554432);
        // key 序列化方式
        props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
        // value 序列化方式
        props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");

        // 创建kafkaproducer对象,创建时会启动sender线程
        producer<string, string> producer = new kafkaproducer<>(props);
        for (int i = 0; i < 100; i++) {
            // 往recordaccumulator中写消息
            future<recordmetadata> result = producer.send(new producerrecord<>(args[1], integer.tostring(i), integer.tostring(i)));
            recordmetadata rm = result.get();
            system.out.println("topic: " + rm.topic() + ", partition: " +  rm.partition() + ", offset: " + rm.offset());
        }
        producer.close();
    }

实例化

kafkaproducer构造方法主要是根据配置文件进行一些实例化操作

1.解析clientid,若没有配置则由是producer-递增的数字

2.解析并实例化分区器partitioner,可以实现自己的partitioner,比如根据key分区,可以保证相同key分到同一个分区,对保证顺序很有用。若没有指定分区规则,采用默认的规则(消息有key,对key做hash,然后对可用分区取模;若没有key,用随机数对可用分区取模【没有key的时候说随机数对可用分区取模不准确,counter值初始值是随机的,但后面都是递增的,所以可以算到roundrobin】)

3.解析key、value的序列化方式并实例化

4.解析并实例化拦截器

5.解析并实例化recordaccumulator,主要用于存放消息(kafkaproducer主线程往recordaccumulator中写消息,sender线程从recordaccumulator中读消息并发送到kafka中)

6.解析broker地址

7.创建一个sender线程并启动

...
this.sender = newsender(logcontext, kafkaclient, this.metadata);
this.iothread = new kafkathread(iothreadname, this.sender, true);
this.iothread.start();
...

消息发送流程

消息的发送入口是kafkaproducer.send方法,主要过程如下

kafkaproducer.send
kafkaproducer.dosend
// 获取集群信息
kafkaproducer.waitonmetadata 
// key/value序列化
key\value serialize
// 分区
kafkaproducer.partion
// 创建topcipartion对象,记录消息的topic和partion信息
topicpartition
// 写入消息
recordaccumulator.applend
// 唤醒sender线程
sender.wakeup

recordaccumulator

recordaccumulator是消息队列用于缓存消息,根据topicpartition对消息分组

重点看下recordaccumulator.applend追加消息的流程

// 记录进行applend的线程数
appendsinprogress.incrementandget();
// 根据topicpartition获取或新建deque双端队列
deque<producerbatch> dq = getorcreatedeque(tp);
...
private deque<producerbatch> getorcreatedeque(topicpartition tp) {
    deque<producerbatch> d = this.batches.get(tp);
    if (d != null)
        return d;
    d = new arraydeque<>();
    deque<producerbatch> previous = this.batches.putifabsent(tp, d);
    if (previous == null)
        return d;
    else
        return previous;
}
// 尝试将消息加入到缓冲区中
// 加锁保证同一个topicpartition写入有序
synchronized (dq) {
    if (closed)
        throw new kafkaexception("producer closed while send in progress");
    // 尝试写入
    recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq);
    if (appendresult != null)
        return appendresult;
}
private recordappendresult tryappend(long timestamp, byte[] key, byte[] value, header[] headers, callback callback, deque<producerbatch> deque) {
    // 从双端队列的尾部取出producerbatch
    producerbatch last = deque.peeklast();
    if (last != null) {
        // 取到了,尝试添加消息
        futurerecordmetadata future = last.tryappend(timestamp, key, value, headers, callback, time.milliseconds());
        // 空间不够,返回null
        if (future == null)
            last.closeforrecordappends();
        else
            return new recordappendresult(future, deque.size() > 1 || last.isfull(), false);
    }
    // 取不到返回null
    return null;
}
public futurerecordmetadata tryappend(long timestamp, byte[] key, byte[] value, header[] headers, callback callback, long now) {
    // 空间不够,返回null
    if (!recordsbuilder.hasroomfor(timestamp, key, value, headers)) {
        return null;
    } else {
        // 真正添加消息
        long checksum = this.recordsbuilder.append(timestamp, key, value, headers);
        ...
        futurerecordmetadata future = ...
        // future和回调callback进行关联    
        thunks.add(new thunk(callback, future));
        ...
        return future;
    }
}
// 尝试applend失败(返回null),会走到这里。如果tryapplend成功直接返回了
// 从bufferpool中申请内存空间,用于创建新的producerbatch
buffer = free.allocate(size, maxtimetoblock);
synchronized (dq) {
    // 注意这里,前面已经尝试添加失败了,且已经分配了内存,为何还要尝试添加?
    // 因为可能已经有其他线程创建了producerbatch或者之前的producerbatch已经被sender线程释放了一些空间,所以在尝试添加一次。这里如果添加成功,后面会在finally中释放申请的空间
    recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq);
    if (appendresult != null) {
        return appendresult;
    }

    // 尝试添加失败了,新建producerbatch
    memoryrecordsbuilder recordsbuilder = recordsbuilder(buffer, maxusablemagic);
    producerbatch batch = new producerbatch(tp, recordsbuilder, time.milliseconds());
    futurerecordmetadata future = utils.notnull(batch.tryappend(timestamp, key, value, headers, callback, time.milliseconds()));

    dq.addlast(batch);
    incomplete.add(batch);
    // 将buffer置为null,避免在finally汇总释放空间
    buffer = null;
    return new recordappendresult(future, dq.size() > 1 || batch.isfull(), true);
}
finally {
    // 最后如果再次尝试添加成功,会释放之前申请的内存(为了新建producerbatch)
    if (buffer != null)
        free.deallocate(buffer);
    appendsinprogress.decrementandget();
}
// 将消息写入缓冲区
recordaccumulator.recordappendresult result = accumulator.append(tp, timestamp, serializedkey,serializedvalue, headers, interceptcallback, remainingwaitms);
if (result.batchisfull || result.newbatchcreated) {
    // 缓冲区满了或者新创建的producerbatch,唤起sender线程
    this.sender.wakeup();
}
return result.future;

sender发送消息线程

主要流程如下

sender.run
sender.runonce
sender.sendproducerdata
// 获取集群信息
metadata.fetch
// 获取可以发送消息的分区且已经获取到了leader分区的节点
recordaccumulator.ready
// 根据准备好的节点信息从缓冲区中获取topicpartion对应的deque队列中取出producerbatch信息
recordaccumulator.drain
// 将消息转移到每个节点的生产请求队列中
sender.sendproducerequests
// 为消息创建生产请求队列
sender.sendproducerrequest
kafkaclient.newclientrequest
// 下面是发送消息
kafkaclient.sent
networkclient.dosent
selector.send
// 其实上面并不是真正执行i/o,只是写入到kafkachannel中
// poll 真正执行i/o
kafkaclient.poll

通过源码分析下sender线程的主要流程

kafkaproducer的构造方法在实例化时启动一个kafkathread线程来执行sender

// kafkaproducer构造方法启动sender
string iothreadname = network_thread_prefix + " | " + clientid;
this.iothread = new kafkathread(iothreadname, this.sender, true);
this.iothread.start();
// sender->run()->runonce()
long currenttimems = time.milliseconds();
// 发送生产的消息
long polltimeout = sendproducerdata(currenttimems);
// 真正执行i/o操作
client.poll(polltimeout, currenttimems);
// 获取集群信息
cluster cluster = metadata.fetch();
// 获取准备好可以发送消息的分区且已经获取到leader分区的节点
recordaccumulator.readycheckresult result = this.accumulator.ready(cluster, now);
// readycheckresult 包含可以发送消息且获取到leader分区的节点集合、未获取到leader分区节点的topic集合
public final set<node> 的节点;
public final long nextreadycheckdelayms;
public final set<string> unknownleadertopics;

ready方法主要是遍历在上面介绍recordaccumulator添加消息的容器,map<topicpartition, deque>,从集群信息中根据topicpartition获取leader分区所在节点,找不到对应leader节点但有要发送的消息的topic添加到unknownleadertopics中。同时把那些根据topicpartition可以获取leader分区且消息满足发送的条件的节点添加到的节点中

// 遍历batches
for (map.entry<topicpartition, deque<producerbatch>> entry : this.batches.entryset()) {
    topicpartition part = entry.getkey();
    deque<producerbatch> deque = entry.getvalue();
    // 根据topicpartition从集群信息获取leader分区所在节点
    node leader = cluster.leaderfor(part);
    synchronized (deque) {
        if (leader == null && !deque.isempty()) {
            // 添加未找到对应leader分区所在节点但有要发送的消息的topic
            unknownleadertopics.add(part.topic());
        } else if (!readynodes.contains(leader) && !ismuted(part, nowms)) {
                ....
                if (sendable && !backingoff) {
                    // 添加准备好的节点
                    readynodes.add(leader);
                } else {
                   ...
}

然后对返回的unknownleadertopics进行遍历,将topic加入到metadata信息中,调用metadata.requestupdate方法请求更新metadata信息

for (string topic : result.unknownleadertopics)
    this.metadata.add(topic);
    result.unknownleadertopics);
    this.metadata.requestupdate();

对已经准备好的节点进行最后的检查,移除那些节点连接没有就绪的节点,主要根据kafkaclient.ready方法进行判断

iterator<node> iter = result.readynodes.iterator();
long notreadytimeout = long.max_value;
while (iter.hasnext()) {
    node node = iter.next();
    // 调用kafkaclient.ready方法验证节点连接是否就绪
    if (!this.client.ready(node, now)) {
        // 移除没有就绪的节点
        iter.remove();
        notreadytimeout = math.min(notreadytimeout, this.client.polldelayms(node, now));
    }
}

下面开始创建生产消息的请求

// 从recordaccumulator中取出topicpartition对应的deque双端队列,然后从双端队列头部取出producerbatch,作为要发送的信息
map<integer, list<producerbatch>> batches = this.accumulator.drain(cluster, result.readynodes, this.maxrequestsize, now);

把消息封装成clientrequest

clientrequest clientrequest = client.newclientrequest(nodeid, requestbuilder, now, acks != 0,requesttimeoutms, callback);

调用kafkaclient发送消息(并非真正执行i/o),涉及到kafkachannel。kafka的通信采用的是nio方式

// networkclient.dosent方法
string destination = clientrequest.destination();
requestheader header = clientrequest.makeheader(request.version());
...
send send = request.tosend(destination, header);
inflightrequest inflightrequest = new inflightrequest(clientrequest,header,isinternalrequest,request,send,now);
this.inflightrequests.add(inflightrequest);
selector.send(send);

...

// selector.send方法    
string connectionid = send.destination();
kafkachannel channel = openorclosingchannelorfail(connectionid);
if (closingchannels.containskey(connectionid)) {
    this.failedsends.add(connectionid);
} else {
    try {
        channel.setsend(send);
    ...

到这里,发送消息的工作准备的差不多了,调用kafkaclient.poll方法,真正执行i/o操作

client.poll(polltimeout, currenttimems);

用一张图总结sender线程的流程

file

通过上面的介绍,我们梳理出了kafka生产消息的主要流程,涉及到主线程往recordaccumulator中写入消息,同时后台的sender线程从recordaccumulator中获取消息,使用nio的方式把消息发送给kafka,用一张图总结

file

后记

这是本公众号第一次尝试写源码相关的文章,说实话真不知道该如何下笔,代码截图、贴整体代码等感觉都被我否定了,最后采用了这种方式,介绍主要流程,把无关代码省略,配合流程图。

上周参加了华为云kafka实战课程,简单看了下kafka的生产和消费代码,想简单梳理下,然后在周日中午即8.17开始阅读源码,梳理流程,一直写到了晚上12点多,还剩一点没有完成,周一早晨早起完成了这篇文章。当然这篇文章忽略了很多更细节的东西,后面会继续深入,勇于尝试,不断精进,加油!

参考资料

华为云实战

极客时间kafka专栏

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网