当前位置: 移动技术网 > IT编程>开发语言>Java > Kafka Network层解析,还是有人把它说清楚了

Kafka Network层解析,还是有人把它说清楚了

2019年08月16日  | 移动技术网IT编程  | 我要评论

用什么洗脸最好,泫然泪下歌词,ankonian

我们知道kafka是基于tcp连接的。其并没有像很多中间件使用netty作为tcp服务器。而是自己基于java nio写了一套。

几个重要类

先看下kafka client的网络层架构。

 

 

本文主要分析的是network层。

network层有两个重要的类:selectorkafkachannel

这两个类和java nio层的java.nio.channels.selectorchannel有点类似。

selector几个关键字段如下

// jdk nio中的selector
java.nio.channels.selector nioselector;
// 记录当前selector的所有连接信息
map<string, kafkachannel> channels;
// 已发送完成的请求
list<send> completedsends;
// 已收到的请求
list<networkreceive> completedreceives;
// 还没有完全收到的请求,对上层不可见
map<kafkachannel, deque<networkreceive>> stagedreceives;
// 作为client端,调用connect连接远端时返回true的连接
set<selectionkey> immediatelyconnectedkeys;
// 已经完成的连接
list<string> connected;
// 一次读取的最大大小
int maxreceivesize;

 

从网络层来看kafka是分为client端(producer和consumer,broker作为从时也是client)和server端(broker)的。本文将分析client端是如何建立连接,以及收发数据的。server也是依靠selectorkafkachannel进行网络传输。在network层两端的区别并不大。

建立连接

kafka的client端启动时会调用selector#connect(下文中如无特殊注明,均指org.apache.kafka.common.network.selector)方法建立连接。

public void connect(string id, inetsocketaddress address, int sendbuffersize, int receivebuffersize) throws ioexception {
    if (this.channels.containskey(id))
        throw new illegalstateexception("there is already a connection for id " + id);
    // 创建一个socketchannel
    socketchannel socketchannel = socketchannel.open();
    // 设置为非阻塞模式
    socketchannel.configureblocking(false);
    // 创建socket并设置相关属性
    socket socket = socketchannel.socket();
    socket.setkeepalive(true);
    if (sendbuffersize != selectable.use_default_buffer_size)
        socket.setsendbuffersize(sendbuffersize);
    if (receivebuffersize != selectable.use_default_buffer_size)
        socket.setreceivebuffersize(receivebuffersize);
    socket.settcpnodelay(true);
    boolean connected;
    try {
        // 调用socketchannel的connect方法,该方法会向远端发起tcp建连请求
        // 因为是非阻塞的,所以该方法返回时,连接不一定已经建立好(即完成3次握手)。连接如果已经建立好则返回true,否则返回false。一般来说server和client在一台机器上,该方法可能返回true。
        connected = socketchannel.connect(address);
    } catch (unresolvedaddressexception e) {
        socketchannel.close();
        throw new ioexception("can't resolve address: " + address, e);
    } catch (ioexception e) {
        socketchannel.close();
        throw e;
    }
    // 对connect事件进行注册
    selectionkey key = socketchannel.register(nioselector, selectionkey.op_connect);
    kafkachannel channel;
    try {
        // 构造一个kafkachannel
        channel = channelbuilder.buildchannel(id, key, maxreceivesize);
    } catch (exception e) {
      ...
    }
    // 将kafkachannel绑定到selectionkey上
    key.attach(channel);
    // 放入到map中,id是远端服务器的名称
    this.channels.put(id, channel);
    // connectct为true代表该连接不会再触发connect事件,所以这里要单独处理
    if (connected) {
        // op_connect won't trigger for immediately connected channels
        log.debug("immediately connected to node {}", channel.id());
        // 加入到一个单独的集合中
        immediatelyconnectedkeys.add(key);
        // 取消对该连接的connect事件的监听
        key.interestops(0);
    }
}

 

这里的流程和标准的nio流程差不多,需要单独说下的是socketchannel#connect方法返回true的场景,该方法的注释中有提到

* <p> if this channel is in non-blocking mode then an invocation of this
* method initiates a non-blocking connection operation.  if the connection
* is established immediately, as can happen with a local connection, then
* this method returns <tt>true</tt>.  otherwise this method returns
* <tt>false</tt> and the connection operation must later be completed by
* invoking the {@link #finishconnect finishconnect} method.

 

也就是说在非阻塞模式下,对于local connection,连接可能在马上就建立好了,那该方法会返回true,对于这种情况,不会再触发之后的connect事件。因此kafka用一个单独的集合immediatelyconnectedkeys将这些特殊的连接记录下来。在接下来的步骤会进行特殊处理。

之后会调用poll方法对网络事件监听:

public void poll(long timeout) throws ioexception {
...
// select方法是对java.nio.channels.selector#select的一个简单封装
int readykeys = select(timeout);
...
// 如果有就绪的事件或者immediatelyconnectedkeys非空
if (readykeys > 0 || !immediatelyconnectedkeys.isempty()) {
    // 对已就绪的事件进行处理,第2个参数为false
    pollselectionkeys(this.nioselector.selectedkeys(), false, endselect);
    // 对immediatelyconnectedkeys进行处理。第2个参数为true
    pollselectionkeys(immediatelyconnectedkeys, true, endselect);
}

addtocompletedreceives();

...
}

private void pollselectionkeys(iterable<selectionkey> selectionkeys,
                           boolean isimmediatelyconnected,
                           long currenttimenanos) {
iterator<selectionkey> iterator = selectionkeys.iterator();
// 遍历集合
while (iterator.hasnext()) {
    selectionkey key = iterator.next();
    // 移除当前元素,要不然下次poll又会处理一遍
    iterator.remove();
    // 得到connect时创建的kafkachannel
    kafkachannel channel = channel(key);
   ...

    try {
        // 如果当前处理的是immediatelyconnectedkeys集合的元素或处理的是connect事件
        if (isimmediatelyconnected || key.isconnectable()) {
            // finishconnect中会增加read事件的监听
            if (channel.finishconnect()) {
                this.connected.add(channel.id());
                this.sensors.connectioncreated.record();
                ...
            } else
                continue;
        }

        // 对于ssl的连接还有些额外的步骤
        if (channel.isconnected() && !channel.ready())
            channel.prepare();

        // 如果是read事件
        if (channel.ready() && key.isreadable() && !hasstagedreceive(channel)) {
            networkreceive networkreceive;
            while ((networkreceive = channel.read()) != null)
                addtostagedreceives(channel, networkreceive);
        }

        // 如果是write事件
        if (channel.ready() && key.iswritable()) {
            send send = channel.write();
            if (send != null) {
                this.completedsends.add(send);
                this.sensors.recordbytessent(channel.id(), send.size());
            }
        }

        // 如果连接失效
        if (!key.isvalid())
            close(channel, true);

    } catch (exception e) {
        string desc = channel.socketdescription();
        if (e instanceof ioexception)
            log.debug("connection with {} disconnected", desc, e);
        else
            log.warn("unexpected error from {}; closing connection", desc, e);
        close(channel, true);
    } finally {
        mayberecordtimeperconnection(channel, channelstarttimenanos);
    }
}
}

 

因为immediatelyconnectedkeys中的连接不会触发connnect事件,所以在poll时会单独对immediatelyconnectedkeys的channel调用finishconnect方法。在明文传输模式下该方法会调用到plaintexttransportlayer#finishconnect,其实现如下:

public boolean finishconnect() throws ioexception {
    // 返回true代表已经连接好了
    boolean connected = socketchannel.finishconnect();
    if (connected)
        // 取消监听connect事件,增加read事件的监听
        key.interestops(key.interestops() & ~selectionkey.op_connect | selectionkey.op_read);
    return connected;
}

 

关于immediatelyconnectedkeys更详细的内容可以看看这里

发送数据

kafka发送数据分为两个步骤:

1.调用selector#send将要发送的数据保存在对应的kafkachannel中,该方法并没有进行真正的网络io。

// selector#send
public void send(send send) {
    string connectionid = send.destination();
    // 如果所在的连接正在关闭中,则加入到失败集合failedsends中
    if (closingchannels.containskey(connectionid))
        this.failedsends.add(connectionid);
    else {
        kafkachannel channel = channelorfail(connectionid, false);
        try {
            channel.setsend(send);
        } catch (cancelledkeyexception e) {
            this.failedsends.add(connectionid);
            close(channel, false);
        }
    }
}

//kafkachannel#setsend
public void setsend(send send) {
    // 如果还有数据没有发送出去则报错
    if (this.send != null)
        throw new illegalstateexception("attempt to begin a send operation with prior send operation still in progress.");
    // 保存下来
    this.send = send;
    // 添加对write事件的监听
    this.transportlayer.addinterestops(selectionkey.op_write);
}
调用selector#poll,在第一步中已经对该channel注册了write事件的监听,所以在当channel可写时,会调用到pollselectionkeys将数据真正的发送出去。
private void pollselectionkeys(iterable<selectionkey> selectionkeys,
                           boolean isimmediatelyconnected,
                           long currenttimenanos) {
iterator<selectionkey> iterator = selectionkeys.iterator();
// 遍历集合
while (iterator.hasnext()) {
    selectionkey key = iterator.next();
    // 移除当前元素,要不然下次poll又会处理一遍
    iterator.remove();
    // 得到connect时创建的kafkachannel
    kafkachannel channel = channel(key);
   ...

    try {
        ...
 

        // 如果是write事件
        if (channel.ready() && key.iswritable()) {
            // 真正的网络写
            send send = channel.write();
            // 一个send对象可能会被拆成几次发送,write非空代表一个send发送完成
            if (send != null) {
                // completedsends代表已发送完成的集合
                this.completedsends.add(send);
                this.sensors.recordbytessent(channel.id(), send.size());
            }
        }
        ...
    } catch (exception e) {
     ...
    } finally {
        mayberecordtimeperconnection(channel, channelstarttimenanos);
    }
}
}

 

当可写时,会调用kafkachannel#write方法,该方法中会进行真正的网络io:

public send write() throws ioexception {
    send result = null;
    if (send != null && send(send)) {
        result = send;
        send = null;
    }
    return result;
}
private boolean send(send send) throws ioexception {
    // 最终调用socketchannel#write进行真正的写
    send.writeto(transportlayer);
    if (send.completed())
        // 如果写完了,则移除对write事件的监听
        transportlayer.removeinterestops(selectionkey.op_write);

    return send.completed();
}

 

接收数据

如果远端有发送数据过来,那调用poll方法时,会对接收到的数据进行处理。

public void poll(long timeout) throws ioexception {
...
// select方法是对java.nio.channels.selector#select的一个简单封装
int readykeys = select(timeout);
...
// 如果有就绪的事件或者immediatelyconnectedkeys非空
if (readykeys > 0 || !immediatelyconnectedkeys.isempty()) {
    // 对已就绪的事件进行处理,第2个参数为false
    pollselectionkeys(this.nioselector.selectedkeys(), false, endselect);
    // 对immediatelyconnectedkeys进行处理。第2个参数为true
    pollselectionkeys(immediatelyconnectedkeys, true, endselect);
}

addtocompletedreceives();

...
}

private void pollselectionkeys(iterable<selectionkey> selectionkeys,
                           boolean isimmediatelyconnected,
                           long currenttimenanos) {
iterator<selectionkey> iterator = selectionkeys.iterator();
// 遍历集合
while (iterator.hasnext()) {
    selectionkey key = iterator.next();
    // 移除当前元素,要不然下次poll又会处理一遍
    iterator.remove();
    // 得到connect时创建的kafkachannel
    kafkachannel channel = channel(key);
   ...

    try {
        ...
 

        // 如果是read事件
        if (channel.ready() && key.isreadable() && !hasstagedreceive(channel)) {
            networkreceive networkreceive;
            // read方法会从网络中读取数据,但可能一次只能读取一个req的部分数据。只有读到一个完整的req的情况下,该方法才返回非null
            while ((networkreceive = channel.read()) != null)
                // 将读到的请求存在stagedreceives中
                addtostagedreceives(channel, networkreceive);
        }
        ...
    } catch (exception e) {
     ...
    } finally {
        mayberecordtimeperconnection(channel, channelstarttimenanos);
    }
}
}

private void addtostagedreceives(kafkachannel channel, networkreceive receive) {
    if (!stagedreceives.containskey(channel))
        stagedreceives.put(channel, new arraydeque<networkreceive>());

    deque<networkreceive> deque = stagedreceives.get(channel);
    deque.add(receive);
}

 

在之后的addtocompletedreceives方法中会对该集合进行处理。

private void addtocompletedreceives() {
    if (!this.stagedreceives.isempty()) {
        iterator<map.entry<kafkachannel, deque<networkreceive>>> iter = this.stagedreceives.entryset().iterator();
        while (iter.hasnext()) {
            map.entry<kafkachannel, deque<networkreceive>> entry = iter.next();
            kafkachannel channel = entry.getkey();
            // 对于client端来说该ismute返回为false,server端则依靠该方法保证消息的顺序
            if (!channel.ismute()) {
                deque<networkreceive> deque = entry.getvalue();
                addtocompletedreceives(channel, deque);
                if (deque.isempty())
                    iter.remove();
            }
        }
    }
}
private void addtocompletedreceives(kafkachannel channel, deque<networkreceive> stageddeque) {
    // 将每个channel的第一个networkreceive加入到completedreceives
    networkreceive networkreceive = stageddeque.poll();
    this.completedreceives.add(networkreceive);
    this.sensors.recordbytesreceived(channel.id(), networkreceive.payload().limit());
}

 

读出数据后,会先放到stagedreceives集合中,然后在addtocompletedreceives方法中对于每个channel都会从stagedreceives取出一个networkreceive(如果有的话),放入到completedreceives中。

这样做的原因有两点:

  1. 对于ssl的连接来说,其数据内容是加密的,所以不能精准的确定本次需要读取的数据大小,只能尽可能的多读,这样会导致可能会比请求的数据读的要多。那如果该channel之后没有数据可以读,会导致多读的数据将不会被处理。
  2. kafka需要确保一个channel上request被处理的顺序是其发送的顺序。因此对于每个channel而言,每次poll上层最多只能看见一个请求,当该请求处理完成之后,再处理其他的请求。在sever端,每次poll后都会将该channel给mute掉,即不再从该channel上读取数据。当处理完成之后,才将该channelunmute,即之后可以从该socket上读取数据。而client端则是通过inflightrequests#cansendmore控制。

代码中关于这段逻辑的注释如下:

/* in the "plaintext" setting, we are using socketchannel to read & write to the network. but for the "ssl" setting,
* we encrypt the data before we use socketchannel to write data to the network, and decrypt before we return the responses.
* this requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrypted
* we won't be able to read exact no.of bytes as kafka protocol requires. we read as many bytes as we can, up to sslengine's
* application buffer size. this means we might be reading additional bytes than the requested size.
* if there is no further data to read from socketchannel selector won't invoke that channel and we've have additional bytes
* in the buffer. to overcome this issue we added "stagedreceives" map which contains per-channel deque. when we are
* reading a channel we read as many responses as we can and store them into "stagedreceives" and pop one response during
* the poll to add the completedreceives. if there are any active channels in the "stagedreceives" we set "timeout" to 0
* and pop response and add to the completedreceives.

* atmost one entry is added to "completedreceives" for a channel in each poll. this is necessary to guarantee that
     * requests from a channel are processed on the broker in the order they are sent. since outstanding requests added
     * by socketserver to the request queue may be processed by different request handler threads, requests on each
     * channel must be processed one-at-a-time to guarantee ordering.
*/

 

end

本文分析了kafka network层的实现,在阅读kafka源码时,如果不把network层搞清楚会比较迷,比如req/resp的顺序保障机制、真正进行网络io的不是send方法等等。

 

本人免费整理了java高级资料,涵盖了java、redis、mongodb、mysql、zookeeper、spring cloud、dubbo高并发分布式等教程,一共30g,需要自己领取。
传送门:https://mp.weixin.qq.com/s/jzddfh-7ynudmkjt0irl8q

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网