前面的第一篇文章中,我以spark中的netty客户端的创建为切入点,分析了netty的客户端引导类bootstrap的参数设置以及启动过程。显然,我们还有另一个重要的部分--服务端的初始化和启动过程没有探究,所以这一节,我们就来从源码层面详细分析一下netty的服务端引导类serverbootstrap的启动过程。
我们仍然以spark中对netty的使用为例,以此为源码分析的切入点,首先我们看一下spark的nettyrpc模块中创建netty服务端引导类的代码:
transportserver的构造方法中会调用init方法,serverbootstrap类就是在init方法中被创建并初始化以及启动的。
这个方法主要分为三块:
很显然,serverbootstrap的启动入口就是bind方法。
// 初始化netty服务端 private void init(string hosttobind, int porttobind) { // io模式,有两种选项nio, epoll iomode iomode = iomode.valueof(conf.iomode()); // 创建bossgroup和workergroup,即主线程组合子线程组 eventloopgroup bossgroup = nettyutils.createeventloop(iomode, conf.serverthreads(), conf.getmodulename() + "-server"); eventloopgroup workergroup = bossgroup; // 缓冲分配器,分为堆内存和直接内存 pooledbytebufallocator allocator = nettyutils.createpooledbytebufallocator( conf.preferdirectbufs(), true /* allowcache */, conf.serverthreads()); // 创建一个netty服务端引导对象,并设置相关参数 bootstrap = new serverbootstrap() .group(bossgroup, workergroup) .channel(nettyutils.getserverchannelclass(iomode)) .option(channeloption.allocator, allocator) .childoption(channeloption.allocator, allocator); // 内存使用的度量对象 this.metrics = new nettymemorymetrics( allocator, conf.getmodulename() + "-server", conf); // 排队的连接数 if (conf.backlog() > 0) { bootstrap.option(channeloption.so_backlog, conf.backlog()); } // socket接收缓冲区大小 if (conf.receivebuf() > 0) { bootstrap.childoption(channeloption.so_rcvbuf, conf.receivebuf()); } // socket发送缓冲区大小 if (conf.sendbuf() > 0) { bootstrap.childoption(channeloption.so_sndbuf, conf.sendbuf()); } // 子channel处理器 bootstrap.childhandler(new channelinitializer<socketchannel>() { @override protected void initchannel(socketchannel ch) { rpchandler rpchandler = apprpchandler; for (transportserverbootstrap bootstrap : bootstraps) { rpchandler = bootstrap.dobootstrap(ch, rpchandler); } context.initializepipeline(ch, rpchandler); } }); inetsocketaddress address = hosttobind == null ? new inetsocketaddress(porttobind): new inetsocketaddress(hosttobind, porttobind); // 绑定到ip地址和端口 channelfuture = bootstrap.bind(address); // 同步等待绑定成功 channelfuture.syncuninterruptibly(); port = ((inetsocketaddress) channelfuture.channel().localaddress()).getport(); logger.debug("shuffle server started on port: {}", port); }
这里的校验主要是对group和channelfactory的非空校验
public channelfuture bind(socketaddress localaddress) {
validate();
return dobind(objectutil.checknotnull(localaddress, "localaddress"));
}
这个方法,我们之前在分析bootstrap的启动过程时提到过,它的主要作用如下:
之前,我们分析了niosocketchannel的构造过程,以及bootstarp中对channel的初始化过程,
本节我们要分析nioserversocketchannel的构造过程,以及serverbootstrap的init方法的实现。
private channelfuture dobind(final socketaddress localaddress) { // 创建一个channel,并对这个channel做一些初始化工作 final channelfuture regfuture = initandregister(); final channel channel = regfuture.channel(); if (regfuture.cause() != null) { return regfuture; } if (regfuture.isdone()) { // at this point we know that the registration was complete and successful. channelpromise promise = channel.newpromise(); // 将这个channel绑定到指定的地址 dobind0(regfuture, channel, localaddress, promise); return promise; } else {// 对于尚未注册成功的情况,采用异步的方式,即添加一个回调 // registration future is almost always fulfilled already, but just in case it's not. final pendingregistrationpromise promise = new pendingregistrationpromise(channel); regfuture.addlistener(new channelfuturelistener() { @override public void operationcomplete(channelfuture future) throws exception { throwable cause = future.cause(); if (cause != null) { // registration on the eventloop failed so fail the channelpromise directly to not cause an // illegalstateexception once we try to access the eventloop of the channel. promise.setfailure(cause); } else { // registration was successful, so set the correct executor to use. // see https://github.com/netty/netty/issues/2586 promise.registered(); dobind0(regfuture, channel, localaddress, promise); } } }); return promise; } }
这里通过调用jdk的api创建了一个serversocketchannel。
public nioserversocketchannel() {
this(newsocket(default_selector_provider));
}
与niosocketchannelconfig类似,nioserversocketchannelconfig也是一种门面模式,是对nioserversocketchannel中的参数接口的封装。
此外,我们注意到,这里规定了nioserversocketchannel的初始的感兴趣的事件是accept事件,即默认会监听请求建立连接的事件。
而在niosocketchannel中的初始感兴趣的事件是read事件。
所以,这里与niosocketchannel构造过程最主要的不同就是初始的感兴趣事件不同。
public nioserversocketchannel(serversocketchannel channel) { super(null, channel, selectionkey.op_accept); config = new nioserversocketchannelconfig(this, javachannel().socket()); }
这里首先调用了父类的构造方法,最终调用了abstractniochannel类的构造方法,这个过程我们在之前分析niosocketchannel初始化的时候已经详细说过,主要就是创建了内部的unsafe对象和channelpipeline对象。
分析完了channel的构造过程,我们再来看一下serverbootstrap是怎么对channel对象进行初始化的。
所以,很显然,我们接下来就要看一下这个特殊的handler,serverbootstrapacceptor的read方法。
void init(channel channel) throws exception { final map<channeloption<?>, object> options = options0(); // 设置参数 synchronized (options) { setchanneloptions(channel, options, logger); } // 设置属性 final map<attributekey<?>, object> attrs = attrs0(); synchronized (attrs) { for (entry<attributekey<?>, object> e: attrs.entryset()) { @suppresswarnings("unchecked") attributekey<object> key = (attributekey<object>) e.getkey(); channel.attr(key).set(e.getvalue()); } } channelpipeline p = channel.pipeline(); // 子channel的group和handler参数 final eventloopgroup currentchildgroup = childgroup; final channelhandler currentchildhandler = childhandler; final entry<channeloption<?>, object>[] currentchildoptions; final entry<attributekey<?>, object>[] currentchildattrs; synchronized (childoptions) { currentchildoptions = childoptions.entryset().toarray(newoptionarray(0)); } synchronized (childattrs) { currentchildattrs = childattrs.entryset().toarray(newattrarray(0)); } // 添加处理器 p.addlast(new channelinitializer<channel>() { @override public void initchannel(final channel ch) throws exception { final channelpipeline pipeline = ch.pipeline(); // 一般情况下,对于serverbootstrap用户无需设置handler channelhandler handler = config.handler(); if (handler != null) { pipeline.addlast(handler); } // 这里添加了一个关键的handler,并且顺手启动了对应的eventloop的线程 ch.eventloop().execute(new runnable() { @override public void run() { pipeline.addlast(new serverbootstrapacceptor( ch, currentchildgroup, currentchildhandler, currentchildoptions, currentchildattrs)); } }); } }); }
在分析serverbootstrapacceptor之前,我们首先来回顾一下nioeventloop的循环中,对于accept事件的处理逻辑,这里截取其中的一小段代码:
// 处理read和accept事件 if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) { unsafe.read(); }
可见,对于accept事件和read事件一样,调用niounsafe的read方法
因为nioserversocketchannel继承了abstractniomessagechannel,并且read方法的实现也是在abstractniomessagechannel中,
根据前面对channelpipeline的分析,我们知道,读事件对从头结点开始,向尾节点传播。上面我们也提到了,对于初始的那个nioserversocketchannel,会在serverbootstarp的init方法中向这个channel的处理链中加入一个serverbootstrapacceptor处理器,所以,很显然,接下来我们应该分析serverbootstrapacceptor中对读事件的处理。
public void read() { // 确认当前代码的执行是在eventloop的线程中 assert eventloop().ineventloop(); final channelconfig config = config(); final channelpipeline pipeline = pipeline(); final recvbytebufallocator.handle allochandle = unsafe().recvbufallochandle(); allochandle.reset(config); boolean closed = false; throwable exception = null; try { try { do { // 这里读取到的是建立的连接对应的channel, // jdk的socketchannel被包装成了netty的niosocketchannel int localread = doreadmessages(readbuf); if (localread == 0) { break; } if (localread < 0) { closed = true; break; } allochandle.incmessagesread(localread); } while (allochandle.continuereading()); } catch (throwable t) { exception = t; } int size = readbuf.size(); for (int i = 0; i < size; i ++) { readpending = false; // 把接收到的每一个channel作为消息,在channelpipeline中触发一个读事件 pipeline.firechannelread(readbuf.get(i)); } readbuf.clear(); allochandle.readcomplete(); // 最后触发一个读完成的事件 pipeline.firechannelreadcomplete(); if (exception != null) { closed = closeonreaderror(exception); pipeline.fireexceptioncaught(exception); } if (closed) { inputshutdown = true; if (isopen()) { close(voidpromise()); } } } finally { // check if there is a readpending which was not processed yet. // this could be for two reasons: // * the user called channel.read() or channelhandlercontext.read() in channelread(...) method // * the user called channel.read() or channelhandlercontext.read() in channelreadcomplete(...) method // // see https://github.com/netty/netty/issues/2254 if (!readpending && !config.isautoread()) { removereadop(); } } } }
代码逻辑还是比较简单的,因为有了前面的铺垫,即在serverbootstrap的init方法对创始的那个serverchannel进行初始化时,将用户设置的子channel的参数,属性,子channel的handler和子group等参数作为构造参数全部传给了serverbootstrapacceptor,所以在这里直接用就行了。
其实这里的子channel的初始化和注册过程和bootstrap中对一个新创建的channel的初始化过程基本一样,区别在于bootstrap中channel是用户代码通过调用connect方法最终在initandregistry中通过反射构造的一个对象;而在服务端,通过监听serversocketchannel的accept事件,当有新的连接建立请求时,会自动创建一个socketchannel(jdk的代码实现),然后nioserversocketchannel将其包装成一个niosocketchannel,并作为消息在传递给处理器,所以在serversocketchannel中的子channel的创建是由底层的jdk的库实现的。
public void channelread(channelhandlercontext ctx, object msg) { // 类型转换,这里的强制转换是安全的的, // 是由各种具体的abstractniomessagechannel子类型的实现保证的 // 各种具体的abstractniomessagechannel子类型的读方法确保它们读取并最终返回的是一个channel类型 final channel child = (channel) msg; // 给子channel添加handler child.pipeline().addlast(childhandler); // 给子channel设置参数 setchanneloptions(child, childoptions, logger); // 给子channel设置属性 for (entry<attributekey<?>, object> e: childattrs) { child.attr((attributekey<object>) e.getkey()).set(e.getvalue()); } try { // 将子channel注册到子group中 childgroup.register(child).addlistener(new channelfuturelistener() { @override public void operationcomplete(channelfuture future) throws exception { if (!future.issuccess()) { forceclose(child, future.cause()); } } }); } catch (throwable t) { forceclose(child, t); } }
回到dobind方法中,在完成了channel的构造,初始化和注册逻辑后,接下来就要把这个server类型的channel绑定到一个地址上,这样才能接受客户端建立连接的请求。
从代码中可以看出,调用了channel的bind方法实现绑定的逻辑。
private static void dobind0( final channelfuture regfuture, final channel channel, final socketaddress localaddress, final channelpromise promise) { // this method is invoked before channelregistered() is triggered. give user handlers a chance to set up // the pipeline in its channelregistered() implementation. channel.eventloop().execute(new runnable() { @override public void run() { if (regfuture.issuccess()) { // 调用了channel.bind方法完成绑定的逻辑 channel.bind(localaddress, promise).addlistener(channelfuturelistener.close_on_failure); } else { promise.setfailure(regfuture.cause()); } } }); }
bind操作的传递是从尾节点开始向前传递,所以我们直接看headcontext对于bind方法的实现
public channelfuture bind(socketaddress localaddress, channelpromise promise) {
return pipeline.bind(localaddress, promise);
}
public final channelfuture bind(socketaddress localaddress, channelpromise promise) { return tail.bind(localaddress, promise); }
调用了unsafe的bind方法。
public void bind( channelhandlercontext ctx, socketaddress localaddress, channelpromise promise) { unsafe.bind(localaddress, promise); }
因为后面右有几个事件的触发,每个触发事件都是通过channel的相关方法来触发,然后又是通过channelpipeline的传递事件,这些事件最后基本都是由headcontext处理了,所以这里我只简单地叙述一下后面的 大概逻辑,代码比较繁琐,而且很多都是相同的调用过程,所以就不贴代码了。
从代码中可以看出来,最终调用了jdk的api,将感兴趣的事件添加到selectionkey中。通过前面的 分析,我们知道对于niosocketchannel,它的感兴趣的读事件类型是selectionkey.op_read,也就是读事件;
而对于nioserversocketchannel,根据前面对其构造方法的分析,它的感兴趣的事件是selectionkey.op_accept,也就是建立连接的事件。
protected void dobeginread() throws exception { // channel.read() or channelhandlercontext.read() was called final selectionkey selectionkey = this.selectionkey; if (!selectionkey.isvalid()) { return; } readpending = true; // 将读事件类型加入到selectionkey的感兴趣的事件中 // 这样jdk底层的selector就会监听相应类型的事件 final int interestops = selectionkey.interestops(); if ((interestops & readinterestop) == 0) { selectionkey.interestops(interestops | readinterestop); } }
到这里,我们就把serverbootstrap的主要功能代码分析完了,这里面主要包括三个方面:
如对本文有疑问, 点击进行留言回复!!
网友评论