当前位置: 移动技术网 > IT编程>开发语言>Java > Netty源码分析 (六)----- 客户端接入accept过程

Netty源码分析 (六)----- 客户端接入accept过程

2019年09月11日  | 移动技术网IT编程  | 我要评论

通读本文,你会了解到
1.netty如何接受新的请求
2.netty如何给新请求分配reactor线程
3.netty如何给每个新连接增加channelhandler

netty中的reactor线程

netty中最核心的东西莫过于两种类型的reactor线程,可以看作netty中两种类型的发动机,驱动着netty整个框架的运转

一种类型的reactor线程是boos线程组,专门用来接受新的连接,然后封装成channel对象扔给worker线程组;还有一种类型的reactor线程是worker线程组,专门用来处理连接的读写

不管是boos线程还是worker线程,所做的事情均分为以下三个步骤

  1. 轮询注册在selector上的io事件
  2. 处理io事件
  3. 执行异步task

对于boos线程来说,第一步轮询出来的基本都是 accept 事件,表示有新的连接,而worker线程轮询出来的基本都是read/write事件,表示网络的读写事件

新连接的建立

简单来说,新连接的建立可以分为三个步骤
1.检测到有新的连接
2.将新的连接注册到worker线程组
3.注册新连接的读事件

检测到有新连接进入

我们已经知道,当服务端绑启动之后,服务端的channel已经注册到boos reactor线程中,reactor不断检测有新的事件,直到检测出有accept事件发生

nioeventloop.java

private static void processselectedkey(selectionkey k, abstractniochannel ch) {
    final niounsafe unsafe = ch.unsafe();
    //检查该selectionkey是否有效,如果无效,则关闭channel
    if (!k.isvalid()) {
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidpromise());
        return;
    }

    try {
        int readyops = k.readyops();
        // also check for readops of 0 to workaround possible jdk bug which may otherwise lead
        // to a spin loop
        // 如果准备好read或accept则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决jdk可能会产生死循环的一个bug。
        if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) {
            unsafe.read();
            if (!ch.isopen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
                // connection already closed - no need to handle write.
                return;
            }
        }
        // 如果准备好了write则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的op_write标记
        if ((readyops & selectionkey.op_write) != 0) {
            // call forceflush which will also take care of clear the op_write once there is nothing left to write
            ch.unsafe().forceflush();
        }
        // 如果是op_connect,则需要移除op_connect否则selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100%
        if ((readyops & selectionkey.op_connect) != 0) {
            // remove op_connect as otherwise selector.select(..) will always return without blocking
            // see https://github.com/netty/netty/issues/924
            int ops = k.interestops();
            ops &= ~selectionkey.op_connect;
            k.interestops(ops);

            unsafe.finishconnect();
        }
    } catch (cancelledkeyexception ignored) {
        unsafe.close(unsafe.voidpromise());
    }
}

该方法主要是对selectionkey k进行了检查,有如下几种不同的情况

1)op_accept,接受客户端连接

2)op_read, 可读事件, 即 channel 中收到了新数据可供上层读取。

3)op_write, 可写事件, 即上层可以向 channel 写入数据。

4)op_connect, 连接建立事件, 即 tcp 连接已经建立, channel 处于 active 状态。

本篇博文主要来看下当boss线程 selector检测到op_accept事件时,内部干了些什么。

if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) {
    unsafe.read();
    if (!ch.isopen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
        // connection already closed - no need to handle write.
        return;
    }
}

boos reactor线程已经轮询到 selectionkey.op_accept 事件,说明有新的连接进入,此时将调用channel的 unsafe来进行实际的操作,此时的channel为 nioserversocketchannel,则unsafe为nioserversocketchannel的属性niomessageunsafe

那么,我们进入到它的read方法,进入新连接处理的第二步

注册到reactor线程

niomessageunsafe.java

private final list<object> readbuf = new arraylist<object>();

public void read() {
    assert eventloop().ineventloop();
    final channelpipeline pipeline = pipeline();
    final recvbytebufallocator.handle allochandle = unsafe().recvbufallochandle();
    do {
        int localread = doreadmessages(readbuf);
        if (localread == 0) {
            break;
        }
        if (localread < 0) {
            closed = true;
            break;
        }
    } while (allochandle.continuereading());
    int size = readbuf.size();
    for (int i = 0; i < size; i ++) {
        pipeline.firechannelread(readbuf.get(i));
    }
    readbuf.clear();
    pipeline.firechannelreadcomplete();
}

调用 doreadmessages 方法不断地读取消息,用 readbuf 作为容器,这里,其实可以猜到读取的是一个个连接,然后调用 pipeline.firechannelread(),将每条新连接经过一层服务端channel的洗礼,之后清理容器,触发 pipeline.firechannelreadcomplete()

下面我们具体看下这两个方法

1.doreadmessages(list)
2.pipeline.firechannelread(niosocketchannel)

doreadmessages()

protected int doreadmessages(list<object> buf) throws exception {
    socketchannel ch = javachannel().accept();

    try {
        if (ch != null) {
            buf.add(new niosocketchannel(this, ch));
            return 1;
        }
    } catch (throwable t) {
        logger.warn("failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (throwable t2) {
            logger.warn("failed to close a socket.", t2);
        }
    }

    return 0;
}

我们终于窥探到netty调用jdk底层nio的边界 javachannel().accept();,由于netty中reactor线程第一步就扫描到有accept事件发生,因此,这里的accept方法是立即返回的,返回jdk底层nio创建的一条channel

serversocketchannel有阻塞和非阻塞两种模式:

a、阻塞模式:serversocketchannel.accept() 方法监听新进来的连接,当 accept()方法返回的时候,它返回一个包含新进来的连接的 socketchannel。阻塞模式下, accept()方法会一直阻塞到有新连接到达。

b、非阻塞模式:,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的socketchannel是否是null.

在nioserversocketchannel的构造函数分析中,我们知道,其通过ch.configureblocking(false);语句设置当前的serversocketchannel为非阻塞的

netty将jdk的 socketchannel 封装成自定义的 niosocketchannel,加入到list里面,这样外层就可以遍历该list,做后续处理

从上一篇文章中,我们已经知道服务端的创建过程中会创建netty中一系列的核心组件,包括pipeline,unsafe等等,那么,接受一条新连接的时候是否也会创建这一系列的组件呢?

带着这个疑问,我们跟进去

niosocketchannel.java

public niosocketchannel(channel parent, socketchannel socket) {
    super(parent, socket);
    config = new niosocketchannelconfig(this, socket.socket());
}

我们重点分析 super(parent, socket),niosocketchannel的父类为 abstractniobytechannel

abstractniobytechannel.java

protected abstractniobytechannel(channel parent, selectablechannel ch) {
    super(parent, ch, selectionkey.op_read);
}

这里,我们看到jdk nio里面熟悉的影子—— selectionkey.op_read,一般在原生的jdk nio编程中,也会注册这样一个事件,表示对channel的读感兴趣

我们继续往上,追踪到abstractniobytechannel的父类 abstractniochannel, 这里,我相信读了上一篇文章你对于这部分代码肯定是有印象的

protected abstractniochannel(channel parent, selectablechannel ch, int readinterestop) {
    super(parent);
    this.ch = ch;
    this.readinterestop = readinterestop;
    try {
        ch.configureblocking(false);
    } catch (ioexception e) {
        try {
            ch.close();
        } catch (ioexception e2) {
            if (logger.iswarnenabled()) {
                logger.warn(
                        "failed to close a partially initialized socket.", e2);
            }
        }
        throw new channelexception("failed to enter non-blocking mode.", e);
    }
}

在创建服务端channel的时候,最终也会进入到这个方法,super(parent), 便是在abstractchannel中创建一系列和该channel绑定的组件,如下

protected abstractchannel(channel parent) {
    this.parent = parent;
    id = newid();
    unsafe = newunsafe();
    pipeline = newchannelpipeline();
}

而这里的 readinterestop 表示该channel关心的事件是 selectionkey.op_read,后续会将该事件注册到selector,之后设置该通道为非阻塞模式,在channel中创建 unsafe 和一条 pipeline 

pipeline.firechannelread(niosocketchannel)

对于 pipeline我们前面已经了解过,在netty的各种类型的channel中,都会包含一个pipeline,字面意思是管道,我们可以理解为一条流水线工艺,流水线工艺有起点,有结束,中间还有各种各样的流水线关卡,一件物品,在流水线起点开始处理,经过各个流水线关卡的加工,最终到流水线结束

对应到netty里面,流水线的开始就是headcontxt,流水线的结束就是tailconextheadcontxt中调用unsafe做具体的操作,tailconext中用于向用户抛出pipeline中未处理异常以及对未处理消息的警告

通过前面的文章中,我们已经知道在服务端的channel初始化时,在pipeline中,已经自动添加了一个pipeline处理器 serverbootstrapacceptor, 并已经将用户代码中设置的一系列的参数传入了构造函数,接下来,我们就来看下serverbootstrapacceptor

serverbootstrapacceptor.java

private static class serverbootstrapacceptor extends channelinboundhandleradapter {
    private final eventloopgroup childgroup;
    private final channelhandler childhandler;
    private final entry<channeloption<?>, object>[] childoptions;
    private final entry<attributekey<?>, object>[] childattrs;

    serverbootstrapacceptor(
            eventloopgroup childgroup, channelhandler childhandler,
            entry<channeloption<?>, object>[] childoptions, entry<attributekey<?>, object>[] childattrs) {
        this.childgroup = childgroup;
        this.childhandler = childhandler;
        this.childoptions = childoptions;
        this.childattrs = childattrs;
    }

    public void channelread(channelhandlercontext ctx, object msg) {
        final channel child = (channel) msg;

        child.pipeline().addlast(childhandler);

        for (entry<channeloption<?>, object> e: childoptions) {
            try {
                if (!child.config().setoption((channeloption<object>) e.getkey(), e.getvalue())) {
                    logger.warn("unknown channel option: " + e);
                }
            } catch (throwable t) {
                logger.warn("failed to set a channel option: " + child, t);
            }
        }

        for (entry<attributekey<?>, object> e: childattrs) {
            child.attr((attributekey<object>) e.getkey()).set(e.getvalue());
        }

        try {
            childgroup.register(child).addlistener(new channelfuturelistener() {
                @override
                public void operationcomplete(channelfuture future) throws exception {
                    if (!future.issuccess()) {
                        forceclose(child, future.cause());
                    }
                }
            });
        } catch (throwable t) {
            forceclose(child, t);
        }
    }
}

前面的 pipeline.firechannelread(niosocketchannel); 最终通过head->unsafe->serverbootstrapacceptor的调用链,调用到这里的 serverbootstrapacceptor 的channelread方法,而 channelread 一上来就把这里的msg强制转换为 channel

然后,拿到该channel,也就是我们之前new出来的 niosocketchannel中对应的pipeline,将用户代码中的 childhandler,添加到pipeline,这里的 childhandler 在用户代码中的体现为

serverbootstrap b = new serverbootstrap();
b.group(bossgroup, workergroup)
 .channel(nioserversocketchannel.class)
 .childhandler(new channelinitializer<socketchannel>() {
     @override
     public void initchannel(socketchannel ch) throws exception {
         channelpipeline p = ch.pipeline();
         p.addlast(new echoserverhandler());
     }
 });

其实对应的是 channelinitializer,到了这里,niosocketchannel中pipeline对应的处理器为 head->channelinitializer->tail,牢记,后面会再次提到!

接着,设置 niosocketchannel 对应的 attr和option,然后进入到 childgroup.register(child),这里的childgroup就是我们在启动代码中new出来的nioeventloopgroup

我们进入到nioeventloopgroupregister方法,代理到其父类multithreadeventloopgroup

multithreadeventloopgroup.java

public channelfuture register(channel channel) {
    return next().register(channel);
}

这里又扯出来一个 next()方法,我们跟进去

multithreadeventloopgroup.java

@override
public eventloop next() {
    return (eventloop) super.next();
}

回到其父类

multithreadeventexecutorgroup.java

@override
public eventexecutor next() {
    return chooser.next();
}

这里的chooser对应的类为 eventexecutorchooser,字面意思为事件执行器选择器,放到我们这里的上下文中的作用就是从worker reactor线程组中选择一个reactor线程

public interface eventexecutorchooserfactory {

    /**
     * returns a new {@link eventexecutorchooser}.
     */
    eventexecutorchooser newchooser(eventexecutor[] executors);

    /**
     * chooses the next {@link eventexecutor} to use.
     */
    @unstableapi
    interface eventexecutorchooser {

        /**
         * returns the new {@link eventexecutor} to use.
         */
        eventexecutor next();
    }
}

chooser的实现有两种

public final class defaulteventexecutorchooserfactory implements eventexecutorchooserfactory {

    public static final defaulteventexecutorchooserfactory instance = new defaulteventexecutorchooserfactory();

    private defaulteventexecutorchooserfactory() { }

    @suppresswarnings("unchecked")
    @override
    public eventexecutorchooser newchooser(eventexecutor[] executors) {
        if (ispoweroftwo(executors.length)) {
            return new poweroftoweventexecutorchooser(executors);
        } else {
            return new genericeventexecutorchooser(executors);
        }
    }

    private static boolean ispoweroftwo(int val) {
        return (val & -val) == val;
    }

    private static final class poweroftoweventexecutorchooser implements eventexecutorchooser {
        private final atomicinteger idx = new atomicinteger();
        private final eventexecutor[] executors;

        poweroftoweventexecutorchooser(eventexecutor[] executors) {
            this.executors = executors;
        }

        @override
        public eventexecutor next() {
            return executors[idx.getandincrement() & executors.length - 1];
        }
    }

    private static final class genericeventexecutorchooser implements eventexecutorchooser {
        private final atomicinteger idx = new atomicinteger();
        private final eventexecutor[] executors;

        genericeventexecutorchooser(eventexecutor[] executors) {
            this.executors = executors;
        }

        @override
        public eventexecutor next() {
            return executors[math.abs(idx.getandincrement() % executors.length)];
        }
    }
}

默认情况下,chooser通过 defaulteventexecutorchooserfactory被创建,在创建reactor线程选择器的时候,会判断reactor线程的个数,如果是2的幂,就创建poweroftoweventexecutorchooser,否则,创建genericeventexecutorchooser

两种类型的选择器在选择reactor线程的时候,都是通过round-robin的方式选择reactor线程,唯一不同的是,poweroftoweventexecutorchooser是通过与运算,而genericeventexecutorchooser是通过取余运算,与运算的效率要高于求余运算

选择完一个reactor线程,即 nioeventloop 之后,我们回到注册的地方

public channelfuture register(channel channel) {
    return next().register(channel);
}

singlethreadeventloop.java

@override
public channelfuture register(channel channel) {
    return register(new defaultchannelpromise(channel, this));
}

其实,这里已经和服务端启动的过程一样了,可以参考我前面的文章

abstractniochannel.java

private void register0(channelpromise promise) {
    boolean firstregistration = neverregistered;
    doregister();
    neverregistered = false;
    registered = true;

    pipeline.invokehandleraddedifneeded();

    safesetsuccess(promise);
    pipeline.firechannelregistered();
    if (isactive()) {
        if (firstregistration) {
            pipeline.firechannelactive();
        } else if (config().isautoread()) {
            beginread();
        }
    }
}

和服务端启动过程一样,先是调用 doregister();做真正的注册过程,如下

protected void doregister() throws exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionkey = javachannel().register(eventloop().selector, 0, this);
            return;
        } catch (cancelledkeyexception e) {
            if (!selected) {
                eventloop().selectnow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}

将该条channel绑定到一个selector上去,一个selector被一个reactor线程使用,后续该channel的事件轮询,以及事件处理,异步task执行都是由此reactor线程来负责

绑定完reactor线程之后,调用 pipeline.invokehandleraddedifneeded()

前面我们说到,到目前为止niosocketchannel 的pipeline中有三个处理器,head->channelinitializer->tail,最终会调用到 channelinitializer 的 handleradded 方法

public void handleradded(channelhandlercontext ctx) throws exception {
    if (ctx.channel().isregistered()) {
        initchannel(ctx);
    }
}

handleradded方法调用 initchannel 方法之后,调用remove(ctx);将自身删除,如下

abstractniochannel.java

private boolean initchannel(channelhandlercontext ctx) throws exception {
    if (initmap.putifabsent(ctx, boolean.true) == null) { 
        try {
            initchannel((c) ctx.channel());
        } catch (throwable cause) {
            exceptioncaught(ctx, cause);
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}

而这里的 initchannel 方法又是神马玩意?让我们回到用户方法,比如下面这段用户代码

用户代码

serverbootstrap b = new serverbootstrap();
b.group(bossgroup, workergroup)
 .channel(nioserversocketchannel.class)
 .option(channeloption.so_backlog, 100)
 .handler(new logginghandler(loglevel.info))
 .childhandler(new channelinitializer<socketchannel>() {
     @override
     public void initchannel(socketchannel ch) throws exception {
         channelpipeline p = ch.pipeline();
         p.addlast(new logginghandler(loglevel.info));
         p.addlast(new echoserverhandler());
     }
 });

原来最终跑到我们自己的代码里去了啊!完了之后,niosocketchannel绑定的pipeline的处理器就包括 head->logginghandler->echoserverhandler->tail

注册读事件

接下来,我们还剩下这些代码没有分析完

abstractniochannel.java

private void register0(channelpromise promise) {
    // ..
    pipeline.firechannelregistered();
    if (isactive()) {
        if (firstregistration) {
            pipeline.firechannelactive();
        } else if (config().isautoread()) {
            beginread();
        }
    }
}

pipeline.firechannelregistered();,其实没有干啥有意义的事情,最终无非是再调用一下业务pipeline中每个处理器的 channelhandleradded方法处理下回调

isactive()在连接已经建立的情况下返回true,所以进入方法块,进入到 pipeline.firechannelactive();在这里我详细步骤先省略,直接进入到关键环节

abstractniochannel.java

@override
protected void dobeginread() throws exception {
    // channel.read() or channelhandlercontext.read() was called
    final selectionkey selectionkey = this.selectionkey;
    if (!selectionkey.isvalid()) {
        return;
    }

    readpending = true;

    final int interestops = selectionkey.interestops();
    if ((interestops & readinterestop) == 0) {
        selectionkey.interestops(interestops | readinterestop);
    }
}

这里其实就是将 selectionkey.op_read事件注册到selector中去,表示这条通道已经可以开始处理read事件了

总结

至此,netty中关于新连接的处理已经向你展示完了,我们做下总结

1.boos reactor线程轮询到有新的连接进入
2.通过封装jdk底层的channel创建 niosocketchannel以及一系列的netty核心组件
3.将该条连接通过chooser,选择一条worker reactor线程绑定上去
4.注册读事件,开始新连接的读写

 

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网