当前位置: 移动技术网 > IT编程>开发语言>Java > Netty源码分析——EventLoopGroup建立

Netty源码分析——EventLoopGroup建立

2020年03月22日  | 移动技术网IT编程  | 我要评论

在上一篇中的简单代码中开头的两行代码是

1         eventloopgroup boss = new nioeventloopgroup(1);
2         eventloopgroup worker = new nioeventloopgroup();

服务端应用要创建首先要创建两个eventloopgroup ,为什么是两个eventloopgroup ,这就涉及netty的核心设计思想——主从reactor模型。这里不展开说,内容比较多,可以自行学习。每个eventloopgroup 可以包含一个或多个eventloop,每一个eventloop简单来说的就是一个死循环的线程,这个线程循环监听一个通道的所有事件,这个模式下面其他流程中也会反复提到。下面开始追代码。

说明一下,上面的两个构造函数一个有参数一个无参数,无参数的通过获取本机cpu的核数(注意是核数不是个数,例如我的电脑4个cpu,每个cpu都是双核的,所以无参的时候应该创建8个eventloop)来决定创建线程的个数,有参数就是指定在这个eventloopgroup 中创建具体个数的eventloop。

  • 一步一步追nioeventloopgroup的构造函数可以发现,代码中,如果指定nthreads 就是用指定的,例如第一个构造函数中那样,不然就是用default_event_loop_threads,default_event_loop_threads在本类的静态代码块中初始化
  protected multithreadeventloopgroup(int nthreads, threadfactory threadfactory, object... args) {
        super(nthreads == 0 ? default_event_loop_threads : nthreads, threadfactory, args);
    }
    static {
        default_event_loop_threads = math.max(1, systempropertyutil.getint(
                "io.netty.eventloopthreads", runtime.getruntime().availableprocessors() * 2));  //获取本机cup核数

        if (logger.isdebugenabled()) {
            logger.debug("-dio.netty.eventloopthreads: {}", default_event_loop_threads);
        }
    }
  • 再追一步会看到,defaulteventexecutorchooserfactory.instance使用默认的选择器工厂,在下一步中会提到,这个东东决定了在eventloopgroup 执行next()时候,也就是切换到下一个eventloop的时候,是以怎样的算法找到下一个eventloop,默认就是按照eventloop数组中的顺序查找下一个的
    protected multithreadeventexecutorgroup(int nthreads, executor executor, object... args) {
        this(nthreads, executor, defaulteventexecutorchooserfactory.instance, args);
    }
  • 再追下一步构造函数,真正干活的来了,注意!!!在主要代码会有注释
        protected multithreadeventexecutorgroup(int nthreads, executor executor,
                                                eventexecutorchooserfactory chooserfactory, object... args) {
            if (nthreads <= 0) {
                throw new illegalargumentexception(string.format("nthreads: %d (expected: > 0)", nthreads));
            }
    
            if (executor == null) {
                executor = new threadpertaskexecutor(newdefaultthreadfactory());   //创建默认的线程执行器
            }
    
            children = new eventexecutor[nthreads];  //创建保存eventloop的数组
    
            for (int i = 0; i < nthreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newchild(executor, args);   //循环填充数组,在这里实例化每一个eventloop
                    success = true;
                } catch (exception e) {
                    // todo: think about if this is a good exception type
                    throw new illegalstateexception("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdowngracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            eventexecutor e = children[j];
                            try {
                                while (!e.isterminated()) {
                                    e.awaittermination(integer.max_value, timeunit.seconds);
                                }
                            } catch (interruptedexception interrupted) {
                                // let the caller handle the interruption.
                                thread.currentthread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            chooser = chooserfactory.newchooser(children);  //将创建好的eventloop数组注册到选择器工厂
    
            final futurelistener<object> terminationlistener = new futurelistener<object>() {
                @override
                public void operationcomplete(future<object> future) throws exception {
                    if (terminatedchildren.incrementandget() == children.length) {
                        terminationfuture.setsuccess(null);
                    }
                }
            };
    
            for (eventexecutor e: children) {
                e.terminationfuture().addlistener(terminationlistener);
            }
    
            set<eventexecutor> childrenset = new linkedhashset<eventexecutor>(children.length);
            collections.addall(childrenset, children);
            readonlychildren = collections.unmodifiableset(childrenset);
        }
    eventexecutor是一个接口,是eventloop接口的子接口,当children[i] = newchild(executor, args)执行时,实际上执行的是nioeventloopgroup中重写的,可以发现创建的nioeventloop的实例,而nioeventloop间接实现了eventloop
        @override
        protected eventloop newchild(executor executor, object... args) throws exception {
            return new nioeventloop(this, executor, (selectorprovider) args[0],
                ((selectstrategyfactory) args[1]).newselectstrategy(), (rejectedexecutionhandler) args[2]);
        }

    至此eventloopgroup以及包含的多个eventloop创建完成,这里只是在eventloopgroup中创建了多个nioeventloop,nioeventloop并没有run起来,需要在注册通道后才能run起来监控通道的事件和执行task。后面章节还会介绍nioeventloop类中的在其他方法。ok,这个部分就到这。

     

 

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网