当前位置: 移动技术网 > IT编程>开发语言>Java > Netty服务端的启动源码分析

Netty服务端的启动源码分析

2019年06月04日  | 移动技术网IT编程  | 我要评论

serverbootstrap的构造:

 1 public class serverbootstrap extends abstractbootstrap<serverbootstrap, serverchannel> {
 2     private static final internallogger logger = internalloggerfactory.getinstance(serverbootstrap.class);
 3     private final map<channeloption<?>, object> childoptions = new linkedhashmap();
 4     private final map<attributekey<?>, object> childattrs = new linkedhashmap();
 5     private final serverbootstrapconfig config = new serverbootstrapconfig(this);
 6     private volatile eventloopgroup childgroup;
 7     private volatile channelhandler childhandler;
 8 
 9     public serverbootstrap() {
10     }
11     ......
12 }

隐式地执行了父类的无参构造:

 1 public abstract class abstractbootstrap<b extends abstractbootstrap<b, c>, c extends channel> implements cloneable {
 2     volatile eventloopgroup group;
 3     private volatile channelfactory<? extends c> channelfactory;
 4     private volatile socketaddress localaddress;
 5     private final map<channeloption<?>, object> options = new linkedhashmap();
 6     private final map<attributekey<?>, object> attrs = new linkedhashmap();
 7     private volatile channelhandler handler;
 8 
 9     abstractbootstrap() {
10     }
11     ......
12 }

只是初始化了几个容器成员

在serverbootstrap创建后,需要调用group方法,绑定eventloopgroup,有关eventloopgroup的创建在我之前博客中写过:netty中nioeventloopgroup的创建源码分析


serverbootstrap的group方法:

 1 public serverbootstrap group(eventloopgroup group) {
 2     return this.group(group, group);
 3 }
 4 
 5 public serverbootstrap group(eventloopgroup parentgroup, eventloopgroup childgroup) {
 6     super.group(parentgroup);
 7     if (childgroup == null) {
 8         throw new nullpointerexception("childgroup");
 9     } else if (this.childgroup != null) {
10         throw new illegalstateexception("childgroup set already");
11     } else {
12         this.childgroup = childgroup;
13         return this;
14     }
15 }

首先调用父类的group方法绑定parentgroup:

 1 public b group(eventloopgroup group) {
 2     if (group == null) {
 3         throw new nullpointerexception("group");
 4     } else if (this.group != null) {
 5         throw new illegalstateexception("group set already");
 6     } else {
 7         this.group = group;
 8         return this.self();
 9     }
10 }
11 
12 private b self() {
13     return this;
14 }

将传入的parentgroup绑定给abstractbootstrap的group成员,将childgroup绑定给serverbootstrap的childgroup成员。
group的绑定仅仅是交给了成员保存。

再来看看serverbootstrap的channel方法,,是在abstractbootstrap中实现的:

1 public b channel(class<? extends c> channelclass) {
2     if (channelclass == null) {
3         throw new nullpointerexception("channelclass");
4     } else {
5         return this.channelfactory((io.netty.channel.channelfactory)(new reflectivechannelfactory(channelclass)));
6     }
7 }

使用channelclass构建了一个reflectivechannelfactory对象:

 1 public class reflectivechannelfactory<t extends channel> implements channelfactory<t> {
 2     private final class<? extends t> clazz;
 3 
 4     public reflectivechannelfactory(class<? extends t> clazz) {
 5         if (clazz == null) {
 6             throw new nullpointerexception("clazz");
 7         } else {
 8             this.clazz = clazz;
 9         }
10     }
11 
12     public t newchannel() {
13         try {
14             return (channel)this.clazz.getconstructor().newinstance();
15         } catch (throwable var2) {
16             throw new channelexception("unable to create channel from class " + this.clazz, var2);
17         }
18     }
19 
20     public string tostring() {
21         return stringutil.simpleclassname(this.clazz) + ".class";
22     }
23 }

可以看到reflectivechannelfactory的作用就是通过反射机制,产生clazz的实例(这里以nioserversocketchannel为例)。

在创建完reflectivechannelfactory对象后, 调用channelfactory方法:

 1 public b channelfactory(io.netty.channel.channelfactory<? extends c> channelfactory) {
 2     return this.channelfactory((channelfactory)channelfactory);
 3 }
 4 
 5 public b channelfactory(channelfactory<? extends c> channelfactory) {
 6     if (channelfactory == null) {
 7         throw new nullpointerexception("channelfactory");
 8     } else if (this.channelfactory != null) {
 9         throw new illegalstateexception("channelfactory set already");
10     } else {
11         this.channelfactory = channelfactory;
12         return this.self();
13     }
14 }

将刚才创建的reflectivechannelfactory对象交给channelfactory成员,用于后续服务端nioserversocketchannel的创建。

再来看serverbootstrap的childhandler方法:

1 public serverbootstrap childhandler(channelhandler childhandler) {
2     if (childhandler == null) {
3         throw new nullpointerexception("childhandler");
4     } else {
5         this.childhandler = childhandler;
6         return this;
7     }
8 }

还是交给了childhandler成员保存,可以看到上述这一系列的操作,都是为了填充serverbootstrap,而serverbootstrap真正的启动是在bind时:
serverbootstrap的bind方法,在abstractbootstrap中实现:

 1 public channelfuture bind(int inetport) {
 2     return this.bind(new inetsocketaddress(inetport));
 3 }
 4 
 5 public channelfuture bind(string inethost, int inetport) {
 6 return this.bind(socketutils.socketaddress(inethost, inetport));
 7 }
 8 
 9 public channelfuture bind(inetaddress inethost, int inetport) {
10     return this.bind(new inetsocketaddress(inethost, inetport));
11 }
12 
13 public channelfuture bind(socketaddress localaddress) {
14     this.validate();
15     if (localaddress == null) {
16         throw new nullpointerexception("localaddress");
17     } else {
18         return this.dobind(localaddress);
19     }
20 }

可以看到首先调用了serverbootstrap的validate方法,:

 1 public serverbootstrap validate() {
 2     super.validate();
 3     if (this.childhandler == null) {
 4         throw new illegalstateexception("childhandler not set");
 5     } else {
 6         if (this.childgroup == null) {
 7             logger.warn("childgroup is not set. using parentgroup instead.");
 8             this.childgroup = this.config.group();
 9         }
10     
11         return this;
12     }
13 }

先调用了abstractbootstrap的validate方法:

1 public b validate() {
2     if (this.group == null) {
3         throw new illegalstateexception("group not set");
4     } else if (this.channelfactory == null) {
5         throw new illegalstateexception("channel or channelfactory not set");
6     } else {
7         return this.self();
8     }
9 }


这个方法就是用来检查是否绑定了group和channel以及childhandler,所以在执行bind方法前,无论如何都要执行group、channel和childhandler方法。

实际的bind交给了dobind来完成:

 1 private channelfuture dobind(final socketaddress localaddress) {
 2     final channelfuture regfuture = this.initandregister();
 3     final channel channel = regfuture.channel();
 4     if (regfuture.cause() != null) {
 5         return regfuture;
 6     } else if (regfuture.isdone()) {
 7         channelpromise promise = channel.newpromise();
 8         dobind0(regfuture, channel, localaddress, promise);
 9         return promise;
10     } else {
11         final abstractbootstrap.pendingregistrationpromise promise = new abstractbootstrap.pendingregistrationpromise(channel);
12         regfuture.addlistener(new channelfuturelistener() {
13             public void operationcomplete(channelfuture future) throws exception {
14                 throwable cause = future.cause();
15                 if (cause != null) {
16                     promise.setfailure(cause);
17                 } else {
18                     promise.registered();
19                     abstractbootstrap.dobind0(regfuture, channel, localaddress, promise);
20                 }
21             }
22         });
23         return promise;
24     }
25 }

首先调用initandregister,完成serversocketchannel的创建以及注册:

 1 final channelfuture initandregister() {
 2     channel channel = null;
 3 
 4     try {
 5         channel = this.channelfactory.newchannel();
 6         this.init(channel);
 7     } catch (throwable var3) {
 8         if (channel != null) {
 9             channel.unsafe().closeforcibly();
10             return (new defaultchannelpromise(channel, globaleventexecutor.instance)).setfailure(var3);
11         }
12 
13         return (new defaultchannelpromise(new failedchannel(), globaleventexecutor.instance)).setfailure(var3);
14     }
15 
16     channelfuture regfuture = this.config().group().register(channel);
17     if (regfuture.cause() != null) {
18         if (channel.isregistered()) {
19             channel.close();
20         } else {
21             channel.unsafe().closeforcibly();
22         }
23     }
24 
25     return regfuture;
26 }

首先调用channelfactory的newchannel通过反射机制构建channel实例,也就是nioserversocketchannel,


nioserversocketchannel的无参构造:

1 public class nioserversocketchannel extends abstractniomessagechannel implements serversocketchannel {
2     private static final selectorprovider default_selector_provider = selectorprovider.provider();
3     
4     public nioserversocketchannel() {
5         this(newsocket(default_selector_provider));
6     }
7     ......
8 }

selectorprovider 是jdk的,关于selectorprovider在我之前的博客中有介绍:【java】nio中selector的创建源码分析

在windows系统下默认产生windowsselectorprovider,即default_selector_provider,再来看看newsocket方法:

1 private static java.nio.channels.serversocketchannel newsocket(selectorprovider provider) {
2     try {
3         return provider.openserversocketchannel();
4     } catch (ioexception var2) {
5         throw new channelexception("failed to open a server socket.", var2);
6     }
7 }

使用windowsselectorprovider创建了一个serversocketchannelimpl,其实看到这里就明白了,nioserversocketchannel是为了封装jdk的serversocketchannel

接着调用另一个重载的构造:

1 public nioserversocketchannel(java.nio.channels.serversocketchannel channel) {
2     super((channel)null, channel, 16);
3     this.config = new nioserversocketchannel.nioserversocketchannelconfig(this, this.javachannel().socket());
4 }

首先调用父类的三参构造,其中16对应的是jdk中selectionkey的accept状态:

1 public static final int op_accept = 1 << 4;

其父类的构造处于一条继承链上:

abstractniomessagechannel:

1 protected abstractniomessagechannel(channel parent, selectablechannel ch, int readinterestop) {
2     super(parent, ch, readinterestop);
3 }

abstractniochannel:

 1 protected abstractniochannel(channel parent, selectablechannel ch, int readinterestop) {
 2     super(parent);
 3     this.ch = ch;
 4     this.readinterestop = readinterestop;
 5 
 6     try {
 7         ch.configureblocking(false);
 8     } catch (ioexception var7) {
 9         try {
10             ch.close();
11         } catch (ioexception var6) {
12             if (logger.iswarnenabled()) {
13                 logger.warn("failed to close a partially initialized socket.", var6);
14             }
15         }
16 
17         throw new channelexception("failed to enter non-blocking mode.", var7);
18     }
19 }

abstractchannel:

 1 private final channelid id;
 2 private final channel parent;
 3 private final unsafe unsafe;
 4 private final defaultchannelpipeline pipeline;
 5 
 6 protected abstractchannel(channel parent) {
 7     this.parent = parent;
 8     this.id = this.newid();
 9     this.unsafe = this.newunsafe();
10     this.pipeline = this.newchannelpipeline();
11 }

在abstractchannel中使用newunsafe和newchannelpipeline分别创建了一个unsafe和一个defaultchannelpipeline对象,
在前面的博客介绍nioeventloopgroup时候,在nioeventloop的run方法中,每次轮询完调用processselectedkeys方法时,都是通过这个unsafe根据selectedkey来完成数据的读或写,unsafe是处理基础的数据读写
(unsafe在nioserversocketchannel创建时,产生niomessageunsafe实例,在niosocketchannel创建时产生niosocketchannelunsafe实例)

而pipeline的实现是一条双向责任链,负责处理unsafe提供的数据,进而进行用户的业务逻辑 (netty中的channelpipeline源码分析

在abstractniochannel中调用configureblocking方法给jdk的serversocketchannel设置为非阻塞模式,且让readinterestop成员赋值为16用于未来注册accept事件。

在调用完继承链后回到nioserversocketchannel构造,调用了javachannel方法:

1 protected java.nio.channels.serversocketchannel javachannel() {
2     return (java.nio.channels.serversocketchannel)super.javachannel();
3 }

其实这个javachannel就是刚才出传入到abstractniochannel中的ch成员:

1 protected selectablechannel javachannel() {
2     return this.ch;
3 }

也就是刚才创建的jdk的serversocketchannelimpl,用其socket方法,得到一个serversocket对象,然后产生了一个nioserversocketchannelconfig对象,用于封装相关信息。

 

nioserversocketchannel构建完毕,回到initandregister方法,使用刚创建的nioserversocketchannel调用init方法,这个方法是在serverbootstrap中实现的:

 1 void init(channel channel) throws exception {
 2     map<channeloption<?>, object> options = this.options0();
 3     synchronized(options) {
 4         setchanneloptions(channel, options, logger);
 5     }
 6 
 7     map<attributekey<?>, object> attrs = this.attrs0();
 8     synchronized(attrs) {
 9         iterator var5 = attrs.entryset().iterator();
10 
11         while(true) {
12             if (!var5.hasnext()) {
13                 break;
14             }
15 
16             entry<attributekey<?>, object> e = (entry)var5.next();
17             attributekey<object> key = (attributekey)e.getkey();
18             channel.attr(key).set(e.getvalue());
19         }
20     }
21 
22     channelpipeline p = channel.pipeline();
23     final eventloopgroup currentchildgroup = this.childgroup;
24     final channelhandler currentchildhandler = this.childhandler;
25     map var9 = this.childoptions;
26     final entry[] currentchildoptions;
27     synchronized(this.childoptions) {
28         currentchildoptions = (entry[])this.childoptions.entryset().toarray(newoptionarray(0));
29     }
30 
31     var9 = this.childattrs;
32     final entry[] currentchildattrs;
33     synchronized(this.childattrs) {
34         currentchildattrs = (entry[])this.childattrs.entryset().toarray(newattrarray(0));
35     }
36 
37     p.addlast(new channelhandler[]{new channelinitializer<channel>() {
38         public void initchannel(final channel ch) throws exception {
39             final channelpipeline pipeline = ch.pipeline();
40             channelhandler handler = serverbootstrap.this.config.handler();
41             if (handler != null) {
42                 pipeline.addlast(new channelhandler[]{handler});
43             }
44 
45             ch.eventloop().execute(new runnable() {
46                 public void run() {
47                     pipeline.addlast(new channelhandler[]{new serverbootstrap.serverbootstrapacceptor(ch, currentchildgroup, currentchildhandler, currentchildoptions, currentchildattrs)});
48                 }
49             });
50         }
51     }});
52 }

首先对attrs和options这两个成员进行了填充属性配置,这不是重点,然后获取刚才创建的nioserversocketchannel的责任链pipeline,通过addlast将channelinitializer加入责任链,在channelinitializer中重写了initchannel方法,首先根据handler是否是null(这个handler是serverbootstrap调用handler方法添加的,和childhandler方法不一样),若是handler不是null,将handler加入责任链,无论如何,都会异步将一个serverbootstrapacceptor对象加入责任链(后面会说为什么是异步)

 

这个channelinitializer的initchannel方法的执行需要等到后面注册时才会被调用,在后面pipeline处理channelregistered请求时,此initchannel方法才会被执行 (netty中的channelpipeline源码分析

channelinitializer的channelregistered方法:

1 public final void channelregistered(channelhandlercontext ctx) throws exception {
2     if (initchannel(ctx)) {
3         ctx.pipeline().firechannelregistered();
4     } else {
5         ctx.firechannelregistered();
6     }
7 }

首先调用initchannel方法(和上面的initchannel不是一个):

 1 private boolean initchannel(channelhandlercontext ctx) throws exception {
 2     if (initmap.putifabsent(ctx, boolean.true) == null) { 
 3         try {
 4             initchannel((c) ctx.channel());
 5         } catch (throwable cause) {
 6             exceptioncaught(ctx, cause);
 7         } finally {
 8             remove(ctx);
 9         }
10         return true;
11     }
12     return false;
13 }

可以看到,这个channelinitializer只会在pipeline中初始化一次,仅用于channel的注册,在完成注册后,会调用remove方法将其从pipeline中移除:
remove方法:

 1 private void remove(channelhandlercontext ctx) {
 2     try {
 3         channelpipeline pipeline = ctx.pipeline();
 4         if (pipeline.context(this) != null) {
 5             pipeline.remove(this);
 6         }
 7     } finally {
 8         initmap.remove(ctx);
 9     }
10 }

在移除前,就会回调用刚才覆盖的initchannel方法,异步向pipeline添加了serverbootstrapacceptor,用于后续的nioserversocketchannel侦听到客户端连接后,完成在服务端的niosocketchannel的注册。

回到initandregister,在对nioserversocketchannel初始化完毕,接下来就是注册逻辑:

1 channelfuture regfuture = this.config().group().register(channel);

首先调用config().group(),这个就得到了一开始在serverbootstrap的group方法传入的parentgroup,调用parentgroup的register方法,parentgroup是nioeventloopgroup,这个方法是在子类multithreadeventloopgroup中实现的:

1 public channelfuture register(channel channel) {
2     return this.next().register(channel);
3 }

首先调用next方法:

1 public eventloop next() {
2     return (eventloop)super.next();
3 }

实际上调用父类multithreadeventexecutorgroup的next方法:

1 public eventexecutor next() {
2     return this.chooser.next();
3 }

关于chooser在我之前博客:netty中nioeventloopgroup的创建源码分析 介绍过,在nioeventloopgroup创建时,默认会根据cpu个数创建二倍个nioeventloop,而chooser就负责通过取模,每次选择一个nioeventloop使用

所以在multithreadeventloopgroup的register方法实际调用了nioeventloop的register方法:

nioeventloop的register方法在子类singlethreadeventloop中实现:

1 public channelfuture register(channel channel) {
2     return this.register((channelpromise)(new defaultchannelpromise(channel, this)));
3 }
4 
5 public channelfuture register(channelpromise promise) {
6    objectutil.checknotnull(promise, "promise");
7     promise.channel().unsafe().register(this, promise);
8     return promise;
9 }

先把channel包装成channelpromise,默认是defaultchannelpromise (netty中的channelfuture和channelpromise),用于处理异步操作

调用重载方法,而在重载方法里,可以看到,实际上的register操作交给了channel的unsafe来实现:

unsafe的register方法在abstractunsafe中实现:

 1 public final void register(eventloop eventloop, final channelpromise promise) {
 2     if (eventloop == null) {
 3         throw new nullpointerexception("eventloop");
 4     } else if (abstractchannel.this.isregistered()) {
 5         promise.setfailure(new illegalstateexception("registered to an event loop already"));
 6     } else if (!abstractchannel.this.iscompatible(eventloop)) {
 7         promise.setfailure(new illegalstateexception("incompatible event loop type: " + eventloop.getclass().getname()));
 8     } else {
 9         abstractchannel.this.eventloop = eventloop;
10         if (eventloop.ineventloop()) {
11             this.register0(promise);
12         } else {
13             try {
14                 eventloop.execute(new runnable() {
15                     public void run() {
16                         abstractunsafe.this.register0(promise);
17                     }
18                 });
19             } catch (throwable var4) {
20                 abstractchannel.logger.warn("force-closing a channel whose registration task was not accepted by an event loop: {}", abstractchannel.this, var4);
21                 this.closeforcibly();
22                 abstractchannel.this.closefuture.setclosed();
23                 this.safesetfailure(promise, var4);
24             }
25         }
26 
27     }
28 }

前面的判断做了一些检查就不细说了,直接看到else块
首先给当前channel绑定了eventloop,即通过刚才chooser选择的eventloop,该channel也就是nioserversocketchannel
由于unsafe的操作是在轮询线程中异步执行的,所里,这里需要判断ineventloop是否处于轮询中
在之前介绍nioeventloopgroup的时候说过,nioeventloop在没有调用dostartthread方法时并没有启动轮询的,所以ineventloop判断不成立

那么就调用eventloop的execute方法,实际上的注册方法可以看到调用了abstractunsafe的register0方法,而将这个方法封装为runnable交给eventloop作为一个task去异步执行
先来看eventloop的execute方法实现,是在nioeventloop的子类singlethreadeventexecutor中实现的:

 1 public void execute(runnable task) {
 2     if (task == null) {
 3         throw new nullpointerexception("task");
 4     } else {
 5         boolean ineventloop = this.ineventloop();
 6         this.addtask(task);
 7         if (!ineventloop) {
 8             this.startthread();
 9             if (this.isshutdown() && this.removetask(task)) {
10                 reject();
11             }
12         }
13 
14         if (!this.addtaskwakesup && this.wakesupfortask(task)) {
15             this.wakeup(ineventloop);
16         }
17 
18     }
19 }

这里首先将task,即刚才的注册事件放入阻塞任务队列中,然后调用startthread方法:

 1 private void startthread() {
 2     if (this.state == 1 && state_updater.compareandset(this, 1, 2)) {
 3         try {
 4             this.dostartthread();
 5         } catch (throwable var2) {
 6             state_updater.set(this, 1);
 7             platformdependent.throwexception(var2);
 8         }
 9     }
10 
11 }

nioeventloop此时还没有轮询,所以状态是1,对应st_not_started,此时利用cas操作,将状态修改为2,即st_started ,标志着nioeventloop要启动轮询了,果然,接下来就调用了dostartthread开启轮询线程:

  1 private void dostartthread() {
  2     assert this.thread == null;
  3 
  4     this.executor.execute(new runnable() {
  5         public void run() {
  6             singlethreadeventexecutor.this.thread = thread.currentthread();
  7             if (singlethreadeventexecutor.this.interrupted) {
  8                 singlethreadeventexecutor.this.thread.interrupt();
  9             }
 10 
 11             boolean success = false;
 12             singlethreadeventexecutor.this.updatelastexecutiontime();
 13             boolean var112 = false;
 14 
 15             int oldstate;
 16             label1907: {
 17                 try {
 18                     var112 = true;
 19                     singlethreadeventexecutor.this.run();
 20                     success = true;
 21                     var112 = false;
 22                     break label1907;
 23                 } catch (throwable var119) {
 24                     singlethreadeventexecutor.logger.warn("unexpected exception from an event executor: ", var119);
 25                     var112 = false;
 26                 } finally {
 27                     if (var112) {
 28                         int oldstatex;
 29                         do {
 30                             oldstatex = singlethreadeventexecutor.this.state;
 31                         } while(oldstatex < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstatex, 3));
 32 
 33                         if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l && singlethreadeventexecutor.logger.iserrorenabled()) {
 34                             singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates.");
 35                         }
 36 
 37                         try {
 38                             while(!singlethreadeventexecutor.this.confirmshutdown()) {
 39                                 ;
 40                             }
 41                         } finally {
 42                             try {
 43                                 singlethreadeventexecutor.this.cleanup();
 44                             } finally {
 45                                 singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5);
 46                                 singlethreadeventexecutor.this.threadlock.release();
 47                                 if (!singlethreadeventexecutor.this.taskqueue.isempty() && singlethreadeventexecutor.logger.iswarnenabled()) {
 48                                     singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')');
 49                                 }
 50 
 51                                 singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null);
 52                             }
 53                         }
 54 
 55                     }
 56                 }
 57 
 58                 do {
 59                     oldstate = singlethreadeventexecutor.this.state;
 60                 } while(oldstate < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstate, 3));
 61 
 62                 if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l && singlethreadeventexecutor.logger.iserrorenabled()) {
 63                     singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates.");
 64                 }
 65 
 66                 try {
 67                     while(!singlethreadeventexecutor.this.confirmshutdown()) {
 68                         ;
 69                     }
 70 
 71                     return;
 72                 } finally {
 73                     try {
 74                         singlethreadeventexecutor.this.cleanup();
 75                     } finally {
 76                         singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5);
 77                         singlethreadeventexecutor.this.threadlock.release();
 78                         if (!singlethreadeventexecutor.this.taskqueue.isempty() && singlethreadeventexecutor.logger.iswarnenabled()) {
 79                             singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')');
 80                         }
 81 
 82                         singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null);
 83                     }
 84                 }
 85             }
 86 
 87             do {
 88                 oldstate = singlethreadeventexecutor.this.state;
 89             } while(oldstate < 3 && !singlethreadeventexecutor.state_updater.compareandset(singlethreadeventexecutor.this, oldstate, 3));
 90 
 91             if (success && singlethreadeventexecutor.this.gracefulshutdownstarttime == 0l && singlethreadeventexecutor.logger.iserrorenabled()) {
 92                 singlethreadeventexecutor.logger.error("buggy " + eventexecutor.class.getsimplename() + " implementation; " + singlethreadeventexecutor.class.getsimplename() + ".confirmshutdown() must be called before run() implementation terminates.");
 93             }
 94 
 95             try {
 96                 while(!singlethreadeventexecutor.this.confirmshutdown()) {
 97                     ;
 98                 }
 99             } finally {
100                 try {
101                     singlethreadeventexecutor.this.cleanup();
102                 } finally {
103                     singlethreadeventexecutor.state_updater.set(singlethreadeventexecutor.this, 5);
104                     singlethreadeventexecutor.this.threadlock.release();
105                     if (!singlethreadeventexecutor.this.taskqueue.isempty() && singlethreadeventexecutor.logger.iswarnenabled()) {
106                         singlethreadeventexecutor.logger.warn("an event executor terminated with non-empty task queue (" + singlethreadeventexecutor.this.taskqueue.size() + ')');
107                     }
108 
109                     singlethreadeventexecutor.this.terminationfuture.setsuccess((object)null);
110                 }
111             }
112 
113         }
114     });
115 }

关于dostartthread方法,我在 netty中nioeventloopgroup的创建源码分析 中已经说的很细了,这里就不再一步一步分析了

因为此时还没真正意义上的启动轮询,所以thread等于null成立的,然后调用executor的execute方法,这里的executor是一个线程池,在之前说过的,所以里面的run方法是处于一个线程里面的,然后给thread成员赋值为当前线程,表明正式进入了轮询。
而singlethreadeventexecutor.this.run()才是真正的轮询逻辑,这在之前也说过,这个run的实现是在父类nioeventloop中:

 1 protected void run() {
 2     while(true) {
 3         while(true) {
 4             try {
 5                 switch(this.selectstrategy.calculatestrategy(this.selectnowsupplier, this.hastasks())) {
 6                 case -2:
 7                     continue;
 8                 case -1:
 9                     this.select(this.wakenup.getandset(false));
10                     if (this.wakenup.get()) {
11                         this.selector.wakeup();
12                     }
13                 default:
14                     this.cancelledkeys = 0;
15                     this.needstoselectagain = false;
16                     int ioratio = this.ioratio;
17                     if (ioratio == 100) {
18                         try {
19                             this.processselectedkeys();
20                         } finally {
21                             this.runalltasks();
22                         }
23                     } else {
24                         long iostarttime = system.nanotime();
25                         boolean var13 = false;
26 
27                         try {
28                             var13 = true;
29                             this.processselectedkeys();
30                             var13 = false;
31                         } finally {
32                             if (var13) {
33                                 long iotime = system.nanotime() - iostarttime;
34                                 this.runalltasks(iotime * (long)(100 - ioratio) / (long)ioratio);
35                             }
36                         }
37 
38                         long iotime = system.nanotime() - iostarttime;
39                         this.runalltasks(iotime * (long)(100 - ioratio) / (long)ioratio);
40                     }
41                 }
42             } catch (throwable var21) {
43                 handleloopexception(var21);
44             }
45 
46             try {
47                 if (this.isshuttingdown()) {
48                     this.closeall();
49                     if (this.confirmshutdown()) {
50                         return;
51                     }
52                 }
53             } catch (throwable var18) {
54                 handleloopexception(var18);
55             }
56         }
57     }
58 }

首先由于task已经有一个了,就是刚才的注册事件,所以选择策略calculatestrategy最终调用selectnow(也是之前说过的):

 1 private final intsupplier selectnowsupplier = new intsupplier() {
 2     public int get() throws exception {
 3         return nioeventloop.this.selectnow();
 4     }
 5 };
 6 
 7 int selectnow() throws ioexception {
 8     int var1;
 9     try {
10         var1 = this.selector.selectnow();
11     } finally {
12         if (this.wakenup.get()) {
13             this.selector.wakeup();
14         }
15 
16     }
17 
18     return var1;
19 }

使用jdk原生selector进行selectnow,由于此时没有任何channel的注册,所以selectnow会立刻返回0,此时就进入default逻辑,由于没有任何注册,processselectedkeys方法也做不了什么,所以在这一次的轮询实质上只进行了runalltasks方法,此方法会执行阻塞队列中的task的run方法(还是在之前博客中介绍过),由于轮询是在线程池中的一个线程中运行的,所以task的执行是一个异步操作。(在执行完task,将task移除阻塞对立,线程继续轮询)

这时就可以回到abstractchannel的register方法中了,由上面可以知道task实际上异步执行了:

1 abstractunsafe.this.register0(promise);

register0方法:

 1 private void register0(channelpromise promise) {
 2     try {
 3         if (!promise.setuncancellable() || !this.ensureopen(promise)) {
 4             return;
 5         }
 6 
 7         boolean firstregistration = this.neverregistered;
 8         abstractchannel.this.doregister();
 9         this.neverregistered = false;
10         abstractchannel.this.registered = true;
11         abstractchannel.this.pipeline.invokehandleraddedifneeded();
12         this.safesetsuccess(promise);
13         abstractchannel.this.pipeline.firechannelregistered();
14         if (abstractchannel.this.isactive()) {
15             if (firstregistration) {
16                 abstractchannel.this.pipeline.firechannelactive();
17             } else if (abstractchannel.this.config().isautoread()) {
18                 this.beginread();
19             }
20         }
21     } catch (throwable var3) {
22         this.closeforcibly();
23         abstractchannel.this.closefuture.setclosed();
24         this.safesetfailure(promise, var3);
25     }
26 
27 }

可以看到实际上的注册逻辑又交给了abstractchannel的doregister,而这个方法在abstractniochannel中实现:

 1 protected void doregister() throws exception {
 2     boolean selected = false;
 3 
 4     while(true) {
 5         try {
 6             this.selectionkey = this.javachannel().register(this.eventloop().unwrappedselector(), 0, this);
 7             return;
 8         } catch (cancelledkeyexception var3) {
 9             if (selected) {
10                 throw var3;
11             }
12 
13             this.eventloop().selectnow();
14             selected = true;
15         }
16     }
17 }

javachannel就是之前产生的jdk的serversocketchannel,unwrappedselector在之前说过,就是未经修改的jdk原生selector,这个selector和eventloop是一对一绑定的,可以看到调用jdk原生的注册方法,完成了对serversocketchannel的注册,但是注册的是一个0状态(缺省值),而传入的this,即abstractniochannel对象作为了一个附件,用于以后processselectedkeys方法从selectionkey中得到对应的netty的channel(还是之前博客说过)
关于缺省值,是由于abstractniochannel不仅用于nioserversocketchannel的注册,还用于niosocketchannel的注册,只有都使用缺省值注册才不会产生异常  【java】nio中channel的注册源码分析 ,并且,在以后processselectedkeys方法会对0状态判断,再使用unsafe进行相应的逻辑处理。

 

在完成jdk的注册后,调用pipeline的invokehandleraddedifneeded方法(netty中的channelpipeline源码分析),处理channelhandler的handleradded的回调,即调用用户添加的channelhandler的handleradded方法。
调用safesetsuccess,标志异步操作完成:

1 protected final void safesetsuccess(channelpromise promise) {
2     if (!(promise instanceof voidchannelpromise) && !promise.trysuccess()) {
3         logger.warn("failed to mark a promise as success because it is done already: {}", promise);
4     }
5 }

关于异步操作我在之前的博客中说的很清楚了:netty中的channelfuture和channelpromise


接着调用pipeline的firechannelregistered方法,也就是在责任链上调用channelregistered方法,这时,就会调用之在serverbootstrap中向pipeline添加的channelinitializer的channelregistered,进而回调initchannel方法,完成对serverbootstrapacceptor的添加。

回到register0方法,在处理完pipeline的责任链后,根据当前abstractchannel即nioserversocketchannel的isactive:

1 public boolean isactive() {
2     return this.javachannel().socket().isbound();
3 }

获得nioserversocketchannel绑定的jdk的serversocketchannel,进而获取serversocket,判断isbound:

1 public boolean isbound() {
2    // before 1.3 serversockets were always bound during creation
3     return bound || oldimpl;
4 }

这里实际上就是判断serversocket是否调用了bind方法,前面说过register0方法是一个异步操作,在多线程环境下不能保证执行顺序,若是此时已经完成了serversocket的bind,根据firstregistration判断是否需要pipeline传递channelactive请求,首先会执行pipeline的head即headcontext的channelactive方法:

1 @override
2 public void channelactive(channelhandlercontext ctx) throws exception {
3     ctx.firechannelactive();
4 
5     readifisautoread();
6 }

在headcontext通过channelhandlercontext 传递完channelactive请求后,会调用readifisautoread方法:

1 private void readifisautoread() {
2     if (channel.config().isautoread()) {
3         channel.read();
4     }
5 }

此时调用abstractchannel的read方法:

1 public channel read() {
2     pipeline.read();
3     return this;
4 }

最终在请求链由headcontext执行read方法:

1 public void read(channelhandlercontext ctx) {
2     unsafe.beginread();
3 }

终于可以看到此时调用unsafe的beginread方法:

 1 public final void beginread() {
 2     asserteventloop();
 3 
 4     if (!isactive()) {
 5         return;
 6     }
 7 
 8     try {
 9         dobeginread();
10     } catch (final exception e) {
11         invokelater(new runnable() {
12             @override
13             public void run() {
14                 pipeline.fireexceptioncaught(e);
15             }
16         });
17         close(voidpromise());
18     }
19 }

最终执行了dobeginread方法,由abstractniochannel实现:

 1 protected void dobeginread() throws exception {
 2     final selectionkey selectionkey = this.selectionkey;
 3     if (!selectionkey.isvalid()) {
 4         return;
 5     }
 6     
 7     readpending = true;
 8     
 9     final int interestops = selectionkey.interestops();
10     if ((interestops & readinterestop) == 0) {
11         selectionkey.interestops(interestops | readinterestop);
12     }
13 }

这里,就完成了向selector注册readinterestop事件,从前面来看就是accept事件

 

回到abstractbootstrap的dobind方法,在initandregister逻辑结束后,由上面可以知道,实际上的register注册逻辑是一个异步操作,在register0中完成
根据channelfuture来判断异步操作是否完成,如果isdone,则表明异步操作先完成,即完成了safesetsuccess方法,
然后调用newpromise方法:

1 public channelpromise newpromise() {
2     return pipeline.newpromise();
3 }

给channel的pipeline绑定异步操作channelpromise
然后调用dobind0方法完成serversocket的绑定,若是register0这个异步操作还没完成,就需要给channelfuture产生一个异步操作的侦听channelfuturelistener对象,等到register0方法调用safesetsuccess时,在promise的trysuccess中会回调channelfuturelistener的operationcomplete方法,进而调用dobind0方法

dobind0方法:

 1 private static void dobind0(
 2         final channelfuture regfuture, final channel channel,
 3         final socketaddress localaddress, final channelpromise promise) {
 4     channel.eventloop().execute(new runnable() {
 5         @override
 6         public void run() {
 7             if (regfuture.issuccess()) {
 8                 channel.bind(localaddress, promise).addlistener(channelfuturelistener.close_on_failure);
 9             } else {
10                 promise.setfailure(regfuture.cause());
11             }
12         }
13     });
14 }

向轮询线程提交了一个任务,异步处理bind,可以看到,只有在regfuture异步操作成功结束后,调用channel的bind方法:

1 public channelfuture bind(socketaddress localaddress, channelpromise promise) {
2    return pipeline.bind(localaddress, promise);
3 }

实际上的bind又交给pipeline,去完成,pipeline中就会交给责任链去完成,最终会交给headcontext完成:

1 public void bind(
2                 channelhandlercontext ctx, socketaddress localaddress, channelpromise promise)
3                 throws exception {
4     unsafe.bind(localaddress, promise);
5 }

可以看到,绕了一大圈,交给了unsafe完成:

 1 public final void bind(final socketaddress localaddress, final channelpromise promise) {
 2     asserteventloop();
 3 
 4     if (!promise.setuncancellable() || !ensureopen(promise)) {
 5         return;
 6     }
 7     
 8     if (boolean.true.equals(config().getoption(channeloption.so_broadcast)) &&
 9         localaddress instanceof inetsocketaddress &&
10         !((inetsocketaddress) localaddress).getaddress().isanylocaladdress() &&
11         !platformdependent.iswindows() && !platformdependent.maybesuperuser()) {
12         logger.warn(
13                 "a non-root user can't receive a broadcast packet if the socket " +
14                 "is not bound to a wildcard address; binding to a non-wildcard " +
15                 "address (" + localaddress + ") anyway as requested.");
16     }
17 
18     boolean wasactive = isactive();
19     try {
20         dobind(localaddress);
21     } catch (throwable t) {
22         safesetfailure(promise, t);
23         closeifclosed();
24         return;
25     }
26 
27     if (!wasactive && isactive()) {
28         invokelater(new runnable() {
29             @override
30             public void run() {
31                 pipeline.firechannelactive();
32             }
33         });
34     }
35 
36     safesetsuccess(promise);
37 }

然而,真正的bind还是回调了dobind方法,最终是由nioserversocketchannel来实现:

1 @override
2 protected void dobind(socketaddress localaddress) throws exception {
3     if (platformdependent.javaversion() >= 7) {
4         javachannel().bind(localaddress, config.getbacklog());
5     } else {
6         javachannel().socket().bind(localaddress, config.getbacklog());
7     }
8 }

在这里终于完成了对jdk的serversocketchannel的bind操作


在上面的

1 if (!wasactive && isactive()) {
2     invokelater(new runnable() {
3         @override
4         public void run() {
5             pipeline.firechannelactive();
6         }
7     });
8 }

这个判断,就是确保在register0中isactive时,还没完成绑定,也就没有beginread操作来向selector注册accept事件,那么就在这里进行注册,进而让serversocket去侦听客户端的连接


在服务端accept到客户端的连接后,在nioeventloop轮询中,就会调用processselectedkeys处理accept的事件就绪,然后交给unsafe的read去处理  netty中nioeventloopgroup的创建源码分析

 

在服务端,由niomessageunsafe实现:

 1 public void read() {
 2         assert eventloop().ineventloop();
 3         final channelconfig config = config();
 4         final channelpipeline pipeline = pipeline();
 5         final recvbytebufallocator.handle allochandle = unsafe().recvbufallochandle();
 6         allochandle.reset(config);
 7 
 8         boolean closed = false;
 9         throwable exception = null;
10         try {
11             try {
12                 do {
13                     int localread = doreadmessages(readbuf);
14                     if (localread == 0) {
15                         break;
16                     }
17                     if (localread < 0) {
18                         closed = true;
19                         break;
20                     }
21 
22                     allochandle.incmessagesread(localread);
23                 } while (allochandle.continuereading());
24             } catch (throwable t) {
25                 exception = t;
26             }
27 
28             int size = readbuf.size();
29             for (int i = 0; i < size; i ++) {
30                 readpending = false;
31                 pipeline.firechannelread(readbuf.get(i));
32             }
33             readbuf.clear();
34             allochandle.readcomplete();
35             pipeline.firechannelreadcomplete();
36 
37             if (exception != null) {
38                 closed = closeonreaderror(exception);
39 
40                 pipeline.fireexceptioncaught(exception);
41             }
42 
43             if (closed) {
44                 inputshutdown = true;
45                 if (isopen()) {
46                     close(voidpromise());
47                 }
48             }
49         } finally {
50             if (!readpending && !config.isautoread()) {
51                 removereadop();
52             }
53         }
54     }
55 }

核心在doreadmessages方法,由nioserversocketchannel实现:

 1 protected int doreadmessages(list<object> buf) throws exception {
 2     socketchannel ch = socketutils.accept(javachannel());
 3 
 4     try {
 5         if (ch != null) {
 6             buf.add(new niosocketchannel(this, ch));
 7             return 1;
 8         }
 9     } catch (throwable t) {
10         logger.warn("failed to create a new channel from an accepted socket.", t);
11 
12         try {
13             ch.close();
14         } catch (throwable t2) {
15             logger.warn("failed to close a socket.", t2);
16         }
17     }
18 
19     return 0;
20 }

socketutils的accept方法其实就是用来调用jdk中serversocketchannel原生的accept方法,来得到一个jdk的socketchannel对象,然后通过这个socketchannel对象,将其包装成niosocketchannel对象添加在buf这个list中

由此可以看到doreadmessages用来侦听所有就绪的连接,包装成niosocketchannel将其放在list中
然后遍历这个list,调用 nioserversocketchannel的pipeline的firechannelread方法,传递channelread请求,、
在前面向pipeline中添加了serverbootstrapacceptor这个channelhandler,此时,它也会响应这个请求,回调channelread方法:

 1 public void channelread(channelhandlercontext ctx, object msg) {
 2     final channel child = (channel) msg;
 3 
 4     child.pipeline().addlast(childhandler);
 5 
 6     setchanneloptions(child, childoptions, logger);
 7 
 8     for (entry<attributekey<?>, object> e: childattrs) {
 9         child.attr((attributekey<object>) e.getkey()).set(e.getvalue());
10     }
11 
12     try {
13         childgroup.register(child).addlistener(new channelfuturelistener() {
14             @override
15             public void operationcomplete(channelfuture future) throws exception {
16                 if (!future.issuccess()) {
17                     forceclose(child, future.cause());
18                 }
19             }
20         });
21     } catch (throwable t) {
22         forceclose(child, t);
23     }
24 }

msg就是侦听到的niosocketchannel对象,给该对象的pipeline添加childhandler,也就是我们在serverbootstrap中通过childhandler方法添加的
然后通过register方法完成对niosocketchannel的注册(和nioserversocketchannel注册逻辑一样)


至此netty服务端的启动结束。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网