当前位置: 移动技术网 > IT编程>开发语言>Java > Netty架构 - EventLoop、EventLoopGroup

Netty架构 - EventLoop、EventLoopGroup

2020年07月14日  | 移动技术网IT编程  | 我要评论

前叙

Reactor单线程模型的应用:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup);

Reactor多线程模型的应用:

EventLoopGroup bossGroup = new NioEventLoopGroup(128);
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup);

主从Reactor多线程模型的应用:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup);

NioEventLoopGroup的实例化

NioEventLoopGroup的父类 - MultithreadEventExecutorGroup有一个EventExecutor[] children属性。而这个属性继承ScheduledExecutorService,分析出它是一个线程池组。每个线程池的实例化,其实交给了EventLoopGroup#newChild方法实现,也就是返回了一个NioEventLoop实例。

NioEventLoopGroup的构造器实际上是调用了它的父类 - MultithreadEventExecutorGroup的构造器。如果构造器没有指定nThreads,会使用DEFAULT_EVENT_LOOP_THREADS,如下:

在这里插入图片描述

也就是说,默认的线程数是cpu核心 * 2。

NioEventLoop的实例化

通常来说,NioEventLoop负责执行两个任务:第一个任务是作为I/O线程,执行与Channel相关的I/O操作,包括调用Selector等待就绪的I/O事件、读写数据与数据处理等;第二个任务是执行任务队列中的任务,比如用户调用 EventLoop#schedule方法提交的定时任务。

Netty中,每个Channel有且仅有一个EventLoop与之关联。

NioEventLoop的构造器

首先看下它的构造器:
在这里插入图片描述

provider: NioEventLoopGroup构造器中,调用SelectorProvider#provider()方法来获取SelectorProvider对象。

在这里插入图片描述

SelectorTuple:封装了Selector unwrappedSelectorSelector selector。它的实例是由openSelector()方法赋予。

openSelector()方法如下:(有省略)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

此外,NioEventLoop的构造器也有调用父类 - SingleThreadEventLoop的一个构造器,如下:
在这里插入图片描述

在这里插入图片描述

它的executor属性是由ThreadExecutorMap.apply(...)方法赋予。实际上就是在EventExecutor的帮助下,实例化一个Executor对象。

然后一直调用父类的构造器,直到AbstractEventExecutor的构造器,如下:
在这里插入图片描述

实际上,就是对它的EventExecutorGroup parent属性赋予一个NioEventLoopGroup

NioEventLoop的run()方法

在分析NioEventLoop#run方法之前,需要考虑下run在哪里有被调用。

NioEventLoop的父类 - SingleThreadEventExecutor,有一个private volatile Thread thread属性,使用如下方法进行设置:

在这里插入图片描述

thread设置为当前线程。然后会调用SingleThreadEventExecutor.this.run()方法,实际上是调用NioEventLoop#run()方法。

当然了,这个doStartThread方法,是由ServerBootstrap#bind(...),即绑定客户端开始,经过一层层方法,最后才调用doStartThread方法。

接下来的重头戏是==run()==方法:

(我这里将它分为了两部分)

第一部分:
在这里插入图片描述

首先看下第一部分的处理逻辑:

在这里插入图片描述
在这里插入图片描述

有任务,则调用selectNow()并返回;否则返回SelectStrategy.SELECT,也就是-1。

先来看hasTasks()方法,如下:

在这里插入图片描述

对父类的hasTasks()方法的判断,或者对Queue<Runnable> tailTasks这个队列不为空的判断。

接下来看下它的父类的hasTasks()方法,如下:
在这里插入图片描述

同样是对Queue<Runnable> taskQueue不为空的判断。

根据calculateStrategy(…)方法的判断,有如下几种结果:
在这里插入图片描述

也就是说没有任务的时候(任务队列为空),就会执行SelectStrategy.SELECT的处理逻辑。

wakeup:AtomicBoolean类型,用来控制是否在select阻塞的过程中可以被唤醒。

下面来看第二部分的处理逻辑:

在这里插入图片描述

执行 processSelectedKeys() 方法,根据io比率是否是100决定采用 runAllTasks() 还是 runAllTasks(…) 方法。

select(…)

主要用来设置下次唤醒时间、根据条件调用nio原生的selectNow()或者select(timeout)

private void select(boolean oldWakenUp) throws IOException {
    /* nio原生Selector */
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        /* 当前时间 + 延迟时间 */
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        /* 当前时间 + 延迟时间 - 开始时间 */
        long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
        if (nextWakeupTime != normalizedDeadlineNanos) {
            /* 重置下次唤醒时间 */
            nextWakeupTime = normalizedDeadlineNanos;
        }

        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                	/* 非阻塞的select方式 */
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            /* 任务队列里有任务 并且 wakenUp成功设置为true */
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
            	/* 非阻塞的select方式 */
                selector.selectNow();
                selectCnt = 1;
                break;
            }

			/* 阻塞、超时等待的select方式 */
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() 
                || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
                break;
            }

			/* 如果线程中断,结束select过程 */
            if (Thread.interrupted()) {           
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                selectCnt = 1;
            /* 从"io.netty.selectorAutoRebuildThreshold"属性中获取重建Selector的阈值,默认是512 */  
             /* 为了避免无谓的空轮询,轮询的次数达到阈值之后,需要重建Selector */      
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                /* 创建一个新的Selector,将旧的Selector上注册的Channel迁移到新的Selector上 */
                selector = selectRebuildSelector(selectCnt);
                selectCnt = 1;
                break;
            }
            currentTimeNanos = time;
        }

    } catch (CancelledKeyException e) {
        。。。。。。
    }
}
rebuildSelector0()

从"io.netty.selectorAutoRebuildThreshold"属性中获取重建Selector的阈值,默认是512。

为了避免无谓的空轮询,轮询的次数达到阈值之后,需要重建Selector。

private void rebuildSelector0() {
    final Selector oldSelector = selector;
    final SelectorTuple newSelectorTuple;
    if (oldSelector == null) {
        return;
    }
    try {
        newSelectorTuple = openSelector();
    } catch (Exception e) {
        logger.warn("Failed to create a new Selector.", e);
        return;
    }
    // Register all channels to the new Selector.
    int nChannels = 0;
    for (SelectionKey key: oldSelector.keys()) {
        Object a = key.attachment();
        try {
            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                continue;
            }
            int interestOps = key.interestOps();
            key.cancel();
            /* 在新的Selector上,重新注册原有Selector上已注册的所有Channel */
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            if (a instanceof AbstractNioChannel) {
                // Update SelectionKey
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            nChannels ++;
        } catch (Exception e) {
            logger.warn("Failed to re-register a Channel to the new Selector.", e);
            if (a instanceof AbstractNioChannel) {
                AbstractNioChannel ch = (AbstractNioChannel) a;
                ch.unsafe().close(ch.unsafe().voidPromise());
            } else {
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, key, e);
            }
        }
    }
    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;
    try {
        oldSelector.close();
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("Failed to close the old Selector.", t);
        }
    }
    if (logger.isInfoEnabled()) {
        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
    }
}

processSelectedKeys()

在这里插入图片描述

selectedKeys: SelectedSelectionKeySet类型。其封装了如下内容:
在这里插入图片描述

processSelectedKeysOptimized()

在这里插入图片描述

selectedKeys.key[i] = null:数组元素的帮助gc操作。

processSelectedKey(...):实际的处理操作,下面有分析。

reset(...): 对seletedKeys数组从指定索引开始,重置其元素的值为null。

processSelectedKey

尝试channelReady(…)操作,并且根据状态值的不同,而执行不同的后续操作。
在这里插入图片描述

channelReady(SelectableChannel ch, SelectionKey key): 当SelectableChannel被Selector选出的时候,调用该方法。

invokeChannelUnregistered(...): 实际上就是channelUnregistered(…)方法,当指定的SelectableChannel的SeletionKey被取消时,调用该方法。

processSelectedKeysPlain(…)

在这里插入图片描述

processSelectedKey(...):上面有讲到,尝试channelReady(…)操作,并且根据状态值的不同,而执行不同的后续操作。

selectAgain():对needsToSelectAgain属性值设置为false,然后调用selector.selectNow()方法。

runAllTasks()

从任务队列中拉取所有的任务,然后运行它们。
在这里插入图片描述

assert inEventLoop()是判断SingleThreadEventExecutor#Thread thread属性是否是当前线程。

fetchFromScheduledTaskQueue()方法如下:
在这里插入图片描述

单纯的判断true或者false。返回true的情况是调度任务队列为空、从调度任务队列拉取的调度任务为空。返回false的情况是调度任务添加任务队列失败,将其重新添加到调度任务队列中。

这里的nanoTime实际上是System.nanoTime() - START_TIME

通过pollScheduledTask(...)方法拉取一个调度任务。
在这里插入图片描述

从调度任务队列中获取一个调度任务,如果调度任务不为空并且执行时间有效,就返回它。

runAllTasksFrom(...):运行所有从任务队列中拉取的任务。
在这里插入图片描述

除了返回true或者false之外,对于返回true的情况,会运行所有的拉取的任务。

pollTaskFrom(...):从任务队列拉取任务,要求不是WAKEUP_TASK(run方法体为空),返回这个任务。

safeExecute(...):调度任务的run()方法,执行任务。

对于runAllTasksFrom(...)方法返回true的情况下,会将ranAtLeastOne标记为true,然后执行lastExecutionTime = ScheduledFutureTask.nanoTime()

afterRunningAllTasks():在运行完所有的任务之后调度。在SingleThreadEventExecutor里是一个未实现的protected方法,留给子类实现。

isShuttingDown()

判断状态是否是ST_SHUTTING_DOWN、ST_SHUTDOWN、ST_TERMINATED之一。
在这里插入图片描述

可选状态如下:
在这里插入图片描述

closeAll()

关闭所有的通道,取消SelectionKey,调用channelUnregistered(…)回调方法。
在这里插入图片描述

selectAgain():对needsToSelectAgain设置为false。然后调用Selector#selectNow()方法。

invokeChannelUnregistered(...):实际上是调用channelUnregistered(…)方法。也就是当SelectionKey被取消的时候,调用该方法。

confirmShutdown()

是否确认关闭。(任务队列会添加WAKEUP_TASK)
在这里插入图片描述

判断true或者false。如果是true,有可能是runAllTasks() 或者 runShutdownHooks()返回true的情况下,gracefulShutdownQuietPeriod等于0。或者是状态是ST_SHUTDOWN、ST_TERMINATED之一,或者优雅关机等待的时间超时。在run()方法,如果是true,则直接返回并跳出循环;否则一直进行循环。

isShuttingdown():判断状态是否是ST_SHUTTING_DOWN、ST_SHUTDOWN、ST_TERMINATED之一。

runAllTasks():前面有分析过,运行从任务队列拉取的任务。

isShutdown():判断状态是否是ST_SHUTDOWN、ST_TERMINATED之一。

WAKEUP_TASK:前面有说过,run方法体什么都没有实现。

cancelScheduledTasks()

取消所有的调度任务。
在这里插入图片描述

从调度任务队列中取出调度任务,进行取消。
cancelWithoutRemove(…)

在这里插入图片描述

mayInterruptIfRunning:这里传入的是false,也就是运行时不会被中断。

clearTaskAfterCompletion:如果第一个参数是true,会将PromiseTask#Object task属性设置为CANCELLED。

CANCELLED:它是一个Runnable CANCELLED = new SentinelRunnable("CANCELLED")。toString()方法返回cancelled,而run()方法什么都没有实现。

cancel(…)

在这里插入图片描述

RESULT_UPDATER:AtomicReferenceFieldUpdater类型,这里负责对result属性原子更新。

CANCELLATION_CAUSE_HOLDER:CauseHolder类型,持有Throwable参数。

checkNotifyWaiters():对于waiters大于0,调用notifyAll(),进行唤醒。再根据listeners是否不为空进行返回true或者false。

notifyListeners

在这里插入图片描述

MAX_LISTENER_STACK_DEPTH:"io.netty.defaultPromise.maxListenerStackDepth"属性与8取最小值。

notifyListenersNow():调用GenericFutureListener接口的operationComplete()方法。

runShutdownHooks()

运行所有的shutdownHooks。更新最近执行的时间。
在这里插入图片描述

shutdownHooks:LinkedHashSet类型,里面是Runnable。

本文地址:https://blog.csdn.net/qq_34561892/article/details/107214924

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网