当前位置: 移动技术网 > IT编程>软件设计>架构 > netty中的发动机--EventLoop及其实现类NioEventLoop的源码分析

netty中的发动机--EventLoop及其实现类NioEventLoop的源码分析

2019年06月27日  | 移动技术网IT编程  | 我要评论

eventloop

在之前介绍bootstrap的初始化以及启动过程时,我们多次接触了nioeventloopgroup这个类,关于这个类的理解,还需要了解netty的线程模型。nioeventloopgroup可以理解为一组线程,这些线程每一个都可以独立地处理多个channel产生的io事件。

nioeventloopgroup初始化

我们看其中一个参数比较多的构造方法,其他一些参数较少的构造方法使用了一些默认值,使用的默认参数如下:

  • selectorprovider类型,用于创建socket通道,udp通道,创建selector对象等,默认值是selectorprovider.provider(),大部分情况下使用默认值就行,这个方法最终创建的是一个windowsselectorprovider对象
  • selectstrategyfactory,select策略类的工厂类,它的默认值是defaultselectstrategyfactory.instance,就是一个selectstrategyfactory对象本身,而selectstrategyfactory工厂产生的是defaultselectstrategy策略类。
  • rejectedexecutionhandler,拒绝任务的策略类,决定在任务队列已满时采取什么样的策略,类似于jdk线程池的rejectedexecutionhandler的作用

接下来,我们看一下其中的一个常用的构造方法,

public nioeventloopgroup(int nthreads, threadfactory threadfactory,
    final selectorprovider selectorprovider, final selectstrategyfactory selectstrategyfactory) {
    super(nthreads, threadfactory, selectorprovider, selectstrategyfactory, rejectedexecutionhandlers.reject());
}

可见,当前类中并没有什么初始化逻辑,直接调用了父类的构造方法,所以我们接着看父类multithreadeventloopgroup的构造方法:

protected multithreadeventloopgroup(int nthreads, threadfactory threadfactory, object... args) {
    super(nthreads == 0 ? default_event_loop_threads : nthreads, threadfactory, args);
}

同样,并未做任务处理,直接调用父类构造方法,所以我们接着看multithreadeventexecutorgroup构造方法,初始化逻辑的实现在这个类中,

multithreadeventexecutorgroup构造方法

通过上一小结的分析,我们知道nioeventloopgroup的构造方法的主要逻辑的实现是在multithreadeventexecutorgroup类中,并且在调用构造方法的过程中加上了一个参数的默认值,即eventexecutorchooserfactory类型参数的默认值defaulteventexecutorchooserfactory.instance,这个类以轮询(roundrobin)的方式从多个线程中依次选出线程用于注册channel。
总结一下这段代码的主要步骤:

  • 首先是一些变量的非空检查和合法性检查
  • 然后根据传入的线程数量,创建若干个子执行器,每个执行器对应一个线程
  • 最后以子执行器数组为参数,使用选择器工厂类创建一个选择器
  • 最后给每个子执行器添加一个监听器,以监听子执行器的终止,做一些簿记工作,使得在所有子执行器全部终止后将当前的执行器组终止

    protected multithreadeventexecutorgroup(int nthreads, executor executor,
    eventexecutorchooserfactory chooserfactory, object... args) {
    // 首先是变量的非空检查以及合法性判断,
    // nthreads在multithreadeventloopgroup的构造方法中已经经过一些默认值处理,
    if (nthreads <= 0) {
    throw new illegalargumentexception(string.format("nthreads: %d (expected: > 0)", nthreads));
    }

      // 这里一般都会使用默认值,
      // threadpertaskexecutor的作用即字面意思,一个任务一个线程
      if (executor == null) {
          executor = new threadpertaskexecutor(newdefaultthreadfactory());
      }
    
      // 子执行器的数组,一个子执行器对应一个线程
      children = new eventexecutor[nthreads];
    
      // 根据传入的线程数量创建多个自执行器
      // 注意,这里子执行器创建好后并不会立即运行起来
      for (int i = 0; i < nthreads; i ++) {
          boolean success = false;
          try {
              children[i] = newchild(executor, args);
              success = true;
          } catch (exception e) {
              // todo: think about if this is a good exception type
              throw new illegalstateexception("failed to create a child event loop", e);
          } finally {
              // 如果创建子执行器不成功,那么需要将已经创建好的子执行器也全部销毁
              if (!success) {
                  for (int j = 0; j < i; j ++) {
                      children[j].shutdowngracefully();
                  }
    
                  // 等待所以子执行器停止后在退出
                  for (int j = 0; j < i; j ++) {
                      eventexecutor e = children[j];
                      try {
                          while (!e.isterminated()) {
                              e.awaittermination(integer.max_value, timeunit.seconds);
                          }
                      } catch (interruptedexception interrupted) {
                          // let the caller handle the interruption.
                          thread.currentthread().interrupt();
                          break;
                      }
                  }
              }
          }
      }
    
      // 创建一个子执行器的选择器,选择器的作用是从子执行器中选出一个
      // 默认使用roundrobin的方式
      chooser = chooserfactory.newchooser(children);
    
      final futurelistener<object> terminationlistener = new futurelistener<object>() {
          @override
          public void operationcomplete(future<object> future) throws exception {
              if (terminatedchildren.incrementandget() == children.length) {
                  terminationfuture.setsuccess(null);
              }
          }
      };
    
      // 给每个子执行器添加监听器,在子执行器终止的时候做一些工作
      // 每有一个子执行器终止时就将terminatedchildren变量加一
      // 当所有子执行器全部终止时,当前这个执行器组就终止了
      for (eventexecutor e: children) {
          e.terminationfuture().addlistener(terminationlistener);
      }
    
      // 包装一个不可变的集合
      set<eventexecutor> childrenset = new linkedhashset<eventexecutor>(children.length);
      collections.addall(childrenset, children);
      readonlychildren = collections.unmodifiableset(childrenset);

    }

nioeventloopgroup.newchild

上面的方法中调用了newchild方法来创建一个子执行器,而这个方法是一个抽象方法,我们看nioeventloopgroup类的实现:

protected eventloop newchild(executor executor, object... args) throws exception {
    return new nioeventloop(this, executor, (selectorprovider) args[0],
        ((selectstrategyfactory) args[1]).newselectstrategy(), (rejectedexecutionhandler) args[2]);
}

可见仅仅是简单地创建了一个nioeventloop对象。

小结

到这里,我们就把nioeventloopgroup的初始化过程分析完了。我们不禁思考,既然nioeventloopgroup是一个执行器组,说白了就是一组线程,那这些线程是什么时候跑起来的呢?如果读者还有印象,应该能记得我们在分析bootstrap建立连接过程时,channel初始化之后需要注册到eventloopgroup中,其实是注册到其中的一个eventloop上,注册逻辑最终是在abstractchannel.abstractunsafe.register方法中实现的,其中有一段代码:

      if (eventloop.ineventloop()) {
            register0(promise);
        } else {
            try {
                eventloop.execute(new runnable() {
                    @override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (throwable t) {
                logger.warn(
                        "force-closing a channel whose registration task was not accepted by an event loop: {}",
                        abstractchannel.this, t);
                closeforcibly();
                closefuture.setclosed();
                safesetfailure(promise, t);
            }
        }

首先调用eventloop.ineventloop()判断执行器的线程与当前线程是否是同一个,如果是则直接执行注册的代码,如果不是就调用eventloop.execute将注册逻辑封装成一个任务放到执行器的任务队列中,接下里我们就以这个方法为切入点,探究一下子执行器线程的启动过程。

abstracteventexecutor.ineventloop

首先,让我们来看一下这个方法,这个方法的作用是判断当前线程与执行器的线程是否同一个线程。

public boolean ineventloop() {
    return ineventloop(thread.currentthread());
}

singlethreadeventexecutor.ineventloop

代码很简单,就不多说了。
public boolean ineventloop(thread thread) {
return thread == this.thread;
}

singlethreadeventexecutor.execute

方法很简单,核心逻辑在startthread方法中,

public void execute(runnable task) {
    // 非空检查
    if (task == null) {
        throw new nullpointerexception("task");
    }

    // 执行到这里一般都是外部调用者,
    boolean ineventloop = ineventloop();
    // 向任务队列中添加一个任务
    addtask(task);
    // 如果当前线程不是执行器的线程,那么需要检查执行器线程是否已经运行,
    // 如果还没在运行,就需要启动线程
    if (!ineventloop) {
        startthread();
        // 检查线程是否被关闭
        if (isshutdown()) {
            boolean reject = false;
            try {
                // 将刚刚添加的任务移除
                if (removetask(task)) {
                    reject = true;
                }
            } catch (unsupportedoperationexception e) {
                // the task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // in worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    // addtaskwakesup不知道这个变量意义是什么,nioeventloop传进来的是false
    // 向任务队列中添加一个空任务,这样就能够唤醒阻塞的执行器线程
    // 有些情况下执行器线程会阻塞在taskqueue上,
    // 所以向阻塞队列中添加一个元素能够唤醒哪些因为队列空而被阻塞的线程
    if (!addtaskwakesup && wakesupfortask(task)) {
        wakeup(ineventloop);
    }
}

singlethreadeventexecutor.startthread

这个方法的主要作用是维护内部的状态量state,使用cas指令并发情况下对状态量的修改是线程安全的,并且对于状态量的判断保证启动逻辑只被执行一次

private void startthread() {
    // 状态量的维护
    if (state == st_not_started) {
        // 这里使用了jdk中的原子更新器atomicintegerfieldupdater类,
        // 使用cpu的cas指令保证并发情况下能够安全地维护状态量
        // 保证只有一个线程能够执行启动逻辑,保证启动逻辑只被执行一次
        if (state_updater.compareandset(this, st_not_started, st_started)) {
            boolean success = false;
            try {
                // 实际启动线程的逻辑
                dostartthread();
                success = true;
            } finally {
                if (!success) {
                    state_updater.compareandset(this, st_started, st_not_started);
                }
            }
        }
    }
}

singlethreadeventexecutor.dostartthread

这个方法我就不贴代码了,说一下它的主要作用:

  • 使用内部的executor对象(一般是一个threadpertaskexecutor)启动一个线程,并执行任务
  • 维护执行器的运行状态,主要是通过内部的状态量和cas指令来保证线程安全;此外维护内部的一些簿记量,例如线程本身的引用,线程启动时间等
  • 线程结束时做一些收尾和清理工作,例如将剩余的任务跑完,运行关闭钩子,关闭底层的selector(这个是具体的子类的清理逻辑),同时更新状态量

具体的业务逻辑仍然是在子类中实现的,也就是singlethreadeventexecutor.run()方法的具体实现。

nioeventloop.run

我们仍然以nioeventloop为例,看一下它实现的run方法。还大概讲一下它的主要逻辑:

  • 首选这个方法是一个循环,不断地通过调用jdk底层的selector接收io事件,并对不同的io事件做处理,同时也会处理任务队列中的任务,以及定时调度或延迟调度的任务
  • 调用jdk的api, selector接收io事件
  • 处理各种类型的io事件
  • 处理任务

这里,我就不贴代码了,其中比较重要的是对一些并发情况的考虑和处理,如selector的唤醒时机。接下来,主要看一下对于各种io事件的处理,至于任务队列以及调度队列中任务的处理比较简单,就不展开了。

nioeventloop.processselectedkeysoptimized

这个方法会遍历所有接受到的io事件对应的selectionkey,然后依次处理。

private void processselectedkeysoptimized() {
    // 遍历所有的io事件的selectionkey
    for (int i = 0; i < selectedkeys.size; ++i) {
        final selectionkey k = selectedkeys.keys[i];
        // null out entry in the array to allow to have it gc'ed once the channel close
        // see https://github.com/netty/netty/issues/2363
        selectedkeys.keys[i] = null;

        final object a = k.attachment();

        if (a instanceof abstractniochannel) {
            // 处理事件
            processselectedkey(k, (abstractniochannel) a);
        } else {
            @suppresswarnings("unchecked")
            niotask<selectablechannel> task = (niotask<selectablechannel>) a;
            processselectedkey(k, task);
        }

        // 如果需要重新select,那么把后面的selectionkey全部置0,然后再次调用selectnow方法
        if (needstoselectagain) {
            // null out entries in the array to allow to have it gc'ed once the channel close
            // see https://github.com/netty/netty/issues/2363
            selectedkeys.reset(i + 1);

            selectagain();
            i = -1;
        }
    }
}

nioeventloop.processselectedkey

这个方法首先对selectionkey无效的情况做了处理,分为两种情况:channel本身无效了;channel仍然是正常的,只不过是被从当前的selector上注销了,可能在其他的selector中仍然是正常运行的

  • 对于第一种情况,需要关闭channel,即关闭底层的连接
  • 对于第二种情况则不需要做任何处理。

接下来,我们着重分析一下对于四种事件的处理逻辑。

private void processselectedkey(selectionkey k, abstractniochannel ch) {
    final abstractniochannel.niounsafe unsafe = ch.unsafe();
    // 如果selectionkey是无效的,那么说明相应的channel是无效的,此时需要关闭这个channel
    if (!k.isvalid()) {
        final eventloop eventloop;
        try {
            eventloop = ch.eventloop();
        } catch (throwable ignored) {
            // if the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // only close ch if ch is still registered to this eventloop. ch could have deregistered from the event loop
        // and thus the selectionkey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // see https://github.com/netty/netty/issues/5125
        // 只关闭注册在当前eventloop上的channel,
        // 理论上来说,一个channel是可以注册到多个eventloop上的,
        // selectionkey无效可能是因为channel从当前eventloop上注销了,
        // 但是channel本身依然是正常的,并且注册在其他的eventloop中
        if (eventloop != this || eventloop == null) {
            return;
        }
        // close the channel if the key is not valid anymore
        // 到这里说明channel已经无效了,关闭它
        unsafe.close(unsafe.voidpromise());
        return;
    }

    // 下面处理正常情况
    try {
        // 准备好的io事件
        int readyops = k.readyops();
        // we first need to call finishconnect() before try to trigger a read(...) or write(...) as otherwise
        // the nio jdk channel implementation may throw a notyetconnectedexception.
        // 处理connect事件
        if ((readyops & selectionkey.op_connect) != 0) {
            // remove op_connect as otherwise selector.select(..) will always return without blocking
            // see https://github.com/netty/netty/issues/924
            int ops = k.interestops();
            ops &= ~selectionkey.op_connect;
            k.interestops(ops);

            unsafe.finishconnect();
        }

        // process op_write first as we may be able to write some queued buffers and so free memory.
        // 处理write事件
        if ((readyops & selectionkey.op_write) != 0) {
            // call forceflush which will also take care of clear the op_write once there is nothing left to write
            ch.unsafe().forceflush();
        }

        // also check for readops of 0 to workaround possible jdk bug which may otherwise lead
        // to a spin loop
        // 处理read和accept事件
        if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) {
            unsafe.read();
        }
    } catch (cancelledkeyexception ignored) {
        unsafe.close(unsafe.voidpromise());
    }
}

connect事件处理

从代码中可以看出,connect事件的处理时通过调用niounsafe.finishconnect完成的,我们看一下abstractniounsafe.finishconnect的实现:

public final void finishconnect() {
        // note this method is invoked by the event loop only if the connection attempt was
        // neither cancelled nor timed out.

        assert eventloop().ineventloop();

        try {
            // 是否已经处于连接成功的状态
            boolean wasactive = isactive();
            // 抽象方法,有子类实现
            dofinishconnect();
            // 处理future对象,将其标记为成功
            fulfillconnectpromise(connectpromise, wasactive);
        } catch (throwable t) {
            fulfillconnectpromise(connectpromise, annotateconnectexception(t, requestedremoteaddress));
        } finally {
            // check for null as the connecttimeoutfuture is only created if a connecttimeoutmillis > 0 is used
            // see https://github.com/netty/netty/issues/1770
            if (connecttimeoutfuture != null) {
                connecttimeoutfuture.cancel(false);
            }
            connectpromise = null;
        }
    }

可以看出,主要是通过调用dofinishconnect实现完成连接的逻辑,具体到子类中,niosocketchannel.dofinishconnect的实现是:

protected void dofinishconnect() throws exception {
    if (!javachannel().finishconnect()) {
        throw new error();
    }
}

write事件处理

对于的write事件的处理时通过调用niounsafe.forceflush方法完成,最终的实现在abstractchannel.abstractunsafe.flush0中:
大体上看,这个方法的逻辑比较简单,但是实际上最复杂也是最核心的写入逻辑在子类实现的dowrite方法中。由于本篇的重点在于把nioeventloop的主干逻辑梳理一下,所以这里不再继续展开,后面会单独来分析这一块的源码,这里涉及到netty中对缓冲区的封装,其中涉及到一些比较复杂的逻辑。

  protected void flush0() {
        // 如果正在写数据,直接返回
        if (inflush0) {
            // avoid re-entrance
            return;
        }

        // 输出的缓冲区
        final channeloutboundbuffer outboundbuffer = this.outboundbuffer;
        if (outboundbuffer == null || outboundbuffer.isempty()) {
            return;
        }

        inflush0 = true;

        // mark all pending write requests as failure if the channel is inactive.
        if (!isactive()) {
            try {
                if (isopen()) {
                    outboundbuffer.failflushed(new notyetconnectedexception(), true);
                } else {
                    // do not trigger channelwritabilitychanged because the channel is closed already.
                    outboundbuffer.failflushed(newclosedchannelexception(initialclosecause), false);
                }
            } finally {
                inflush0 = false;
            }
            return;
        }

        try {
            // 将缓冲区的数据写入到channel中
            dowrite(outboundbuffer);
        } catch (throwable t) {
            if (t instanceof ioexception && config().isautoclose()) {
                /**
                 * just call {@link #close(channelpromise, throwable, boolean)} here which will take care of
                 * failing all flushed messages and also ensure the actual close of the underlying transport
                 * will happen before the promises are notified.
                 *
                 * this is needed as otherwise {@link #isactive()} , {@link #isopen()} and {@link #iswritable()}
                 * may still return {@code true} even if the channel should be closed as result of the exception.
                 */
                initialclosecause = t;
                close(voidpromise(), t, newclosedchannelexception(t), false);
            } else {
                try {
                    shutdownoutput(voidpromise(), t);
                } catch (throwable t2) {
                    initialclosecause = t;
                    close(voidpromise(), t2, newclosedchannelexception(t), false);
                }
            }
        } finally {
            inflush0 = false;
        }
    }

read事件和accept事件处理

乍看会比较奇怪,为什么这两个事件要放到一起处理呢,他们明明是不同的事件。这里主要还是考虑到编码的统一,因为read事件只有niosocketchannel才会有,而accept事件只有nioserversocketchannel才会有,所以这里通过抽象方法,让不同的子类去实现各自的逻辑,是的代码结构上更统一。我们这里看一下nioscketchannel的实现,而对于nioserversocketchannel的实现我会在后续分析netty服务端的启动过程时在具体讲到,即serverbootstrap的启动过程。

niobyteunsafe.read

总结一下这个方法的主要逻辑:

  • 首先会获取缓冲分配器和相应的处理器recvbytebufallocator.handle对象
  • 循环读取数据,每次分配一个一定大小(大小可配置)的缓冲,将channel中待读取的数据读取到缓冲中
  • 以装载有数据的缓冲为消息体,向channel的处理流水线(即pipeline)中触发一个读取的事件,让读取到的数据在流水线中传播,被各个处理器处理
  • 重复此过程,知道channel中没有可供读取的数据
  • 最后向pipeline中触发一个读取完成的事件
  • 最后还要根据最后一次读取到的数据量决定是否关闭通道,如果最后一次读取到的数据量小于0,说明对端已经关闭了输出,所以这里需要将输入关闭,即通道处于半关闭状态。

      public final void read() {
              final channelconfig config = config();
              // 如果通道已经关闭,那么就不需要再读取数据,直接返回
              if (shouldbreakreadready(config)) {
                  clearreadpending();
                  return;
              }
              final channelpipeline pipeline = pipeline();
              // 缓冲分配器
              final bytebufallocator allocator = config.getallocator();
              // 缓冲分配的处理器,处理缓冲分配,读取计数等
              final recvbytebufallocator.handle allochandle = recvbufallochandle();
              allochandle.reset(config);
    
              bytebuf bytebuf = null;
              boolean close = false;
              try {
                  do {
                      // 分配一个缓冲
                      bytebuf = allochandle.allocate(allocator);
                      // 将通道的数据读取到缓冲中
                      allochandle.lastbytesread(doreadbytes(bytebuf));
                      // 如果没有读取到数据,说明通道中没有待读取的数据了,
                      if (allochandle.lastbytesread() <= 0) {
                          // nothing was read. release the buffer.
                          // 因为没读取到数据,所以应该释放缓冲
                          bytebuf.release();
                          bytebuf = null;
                          // 如果读取到的数据量是负数,说明通道已经关闭了
                          close = allochandle.lastbytesread() < 0;
                          if (close) {
                              // there is nothing left to read as we received an eof.
                              readpending = false;
                          }
                          break;
                      }
    
                      // 更新handle内部的簿记量
                      allochandle.incmessagesread(1);
                      readpending = false;
                      // 向channel的处理器流水线中触发一个事件,
                      // 让取到的数据能够被流水线上的各个channelhandler处理
                      pipeline.firechannelread(bytebuf);
                      bytebuf = null;
                      // 这里根据如下条件判断是否继续读:
                      // 上一次读取到的数据量大于0,并且读取到的数据量等于分配的缓冲的最大容量,
                      // 此时说明通道中还有待读取的数据
                  } while (allochandle.continuereading());
    
                  // 读取完成
                  allochandle.readcomplete();
                  // 触发一个读取完成的事件
                  pipeline.firechannelreadcomplete();
    
                  if (close) {
                      closeonread(pipeline);
                  }
              } catch (throwable t) {
                  handlereadexception(pipeline, bytebuf, t, close, allochandle);
              } finally {
                  // check if there is a readpending which was not processed yet.
                  // this could be for two reasons:
                  // * the user called channel.read() or channelhandlercontext.read() in channelread(...) method
                  // * the user called channel.read() or channelhandlercontext.read() in channelreadcomplete(...) method
                  //
                  // see https://github.com/netty/netty/issues/2254
                  // 这里isautoread默认是true, 所以正常情况下会继续监听read事件
                  if (!readpending && !config.isautoread()) {
                      removereadop();
                  }
              }
          }
      }

总结

本篇主要分析了eventloop的事件监听以及处理逻辑,此外处理处理io事件,也会处理添加进来的任务和定时调度任务和延迟调度任务。eventloop就像是整个框架的发动机或者说是心脏,它通过jdk api进而简介地调用系统调用,不断地监听各种io事件,同时对不同的io事件分门别类采用不同的处理方式,对于read事件则会将网络io数据读取到缓冲中,并将读取到的数据传递给用户的处理器进行链式处理。channelpipeline就像一个流水线一样,对触发的的各种事件进行处理。

遗留问题

  • niosocketchannel.dowrite方法的写入逻辑的,待进一步分析
  • channelpipeline的详细分析,各种事件是怎么在处理器之间传播的,设计模式,代码结构等
  • 缓冲分配器和缓冲处理器的分析,它们是怎么对内存进行管理的,这也是netty高性能的原因之一。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网