当前位置: 移动技术网 > IT编程>开发语言>Java > Netty 在 Dubbo 中是如何应用的?

Netty 在 Dubbo 中是如何应用的?

2020年05月07日  | 移动技术网IT编程  | 我要评论

作者:莫那·鲁道

众所周知,国内知名框架 dubbo 底层使用的是 作为网络通信,那么内部到底是如何使用的呢?今天我们就来一探究竟。

1. dubbo 的 consumer 消费者如何使用 netty


注意:此次代码使用了从 github 上 clone 的 dubbo 源码中的 dubbo-demo 例子。

代码如下:

system.setproperty("java.net.preferipv4stack", "true");
   classpathxmlapplicationcontext context = new classpathxmlapplicationcontext(new string[]{"meta-inf/spring/dubbo-demo-consumer.xml"});
   context.start();
    // @1
   demoservice demoservice = (demoservice) context.getbean("demoservice"); // get remote service proxy
   int a = 0;
   while (true) {
       try {
           thread.sleep(1000);
           system.err.println( ++ a + " ");

           string hello = demoservice.sayhello("world"); // call remote method
           system.out.println(hello); // get result

       } catch (throwable throwable) {
           throwable.printstacktrace();
       }
   }

当代码执行到 @1 的时候,会调用 spring 容器的 getbean 方法,而 dubbo 扩展了 factorybean,所以,会调用 getobject 方法,该方法会创建代理对象。

这个过程中会调用 dubboprotocol 实例的 getclients(url url) 方法,当这个给定的 url 的 client 没有初始化则创建,然后放入缓存,代码如下:

这个 initclient 方法就是创建 netty 的 client 的。

最终调用的就是抽象父类 abstractclient 的构造方法,构造方法中包含了创建 socket 客户端,连接客户端等行为。

public abstractclient(url url, channelhandler handler) throws remotingexception {
   doopen();
   connect();
}

doopent 方法用来创建 netty 的 bootstrap :

protected void doopen() throws throwable {
   nettyhelper.setnettyloggerfactory();
   bootstrap = new clientbootstrap(channelfactory);
   bootstrap.setoption("keepalive", true);
   bootstrap.setoption("tcpnodelay", true);
   bootstrap.setoption("connecttimeoutmillis", gettimeout());
   final nettyhandler nettyhandler = new nettyhandler(geturl(), this);
   bootstrap.setpipelinefactory(new channelpipelinefactory() {
       public channelpipeline getpipeline() {
           nettycodecadapter adapter = new nettycodecadapter(getcodec(), geturl(), nettyclient.this);
           channelpipeline pipeline = channels.pipeline();
           pipeline.addlast("decoder", adapter.getdecoder());
           pipeline.addlast("encoder", adapter.getencoder());
           pipeline.addlast("handler", nettyhandler);
           return pipeline;
       }
   });
}

connect 方法用来连接提供者:

protected void doconnect() throws throwable {
   long start = system.currenttimemillis();
   channelfuture future = bootstrap.connect(getconnectaddress());
   boolean ret = future.awaituninterruptibly(getconnecttimeout(), timeunit.milliseconds);
   if (ret && future.issuccess()) {
       channel newchannel = future.getchannel();
       newchannel.setinterestops(channel.op_read_write);
   }
}

上面的代码中,调用了 bootstrap 的 connect 方法,熟悉的 netty 连接操作。当然这里使用的是  jboss 的 netty3,稍微有点区别。点击这篇:。当连接成功后,注册写事件,准备开始向提供者传递数据。

当 main 方法中调用 demoservice.sayhello(“world”) 的时候,最终会调用 headerexchangechannel 的 request 方法,通过 channel 进行请求。

public responsefuture request(object request, int timeout) throws remotingexception {
   request req = new request();
   req.setversion("2.0.0");
   req.settwoway(true);
   req.setdata(request);
   defaultfuture future = new defaultfuture(channel, req, timeout);
   channel.send(req);
   return future;
}

send 方法中最后调用 jboss  netty 中继承了  niosocketchannel 的 nioclientsocketchannel 的 write 方法。完成了一次数据的传输。

2. dubbo 的 provider 提供者如何使用 netty

provider demo 代码:

system.setproperty("java.net.preferipv4stack", "true");
classpathxmlapplicationcontext context = new classpathxmlapplicationcontext(new string[]{"meta-inf/spring/dubbo-demo-provider.xml"});
context.start();
system.in.read(); // press any key to exit

provider 作为被访问方,肯定是一个 server 模式的 socket。如何启动的呢?

当 spring 容器启动的时候,会调用一些扩展类的初始化方法,比如继承了 initializingbean,applicationcontextaware,applicationlistener 。

而 dubbo 创建了 servicebean 继承了一个监听器。spring 会调用他的 onapplicationevent 方法,该类有一个 export 方法,用于打开 serversocket 。

然后执行了 dubboprotocol 的 createserver 方法,然后创建了一个nettyserver 对象。nettyserver 对象的 构造方法同样是  doopen 方法和。

代码如下:

protected void doopen() throws throwable {
   nettyhelper.setnettyloggerfactory();
   executorservice boss = executors.newcachedthreadpool(new namedthreadfactory("nettyserverboss", true));
   executorservice worker = executors.newcachedthreadpool(new namedthreadfactory("nettyserverworker", true));
   channelfactory channelfactory = new nioserversocketchannelfactory(boss, worker, geturl().getpositiveparameter(constants.io_threads_key, constants.default_io_threads));
   bootstrap = new serverbootstrap(channelfactory);

   final nettyhandler nettyhandler = new nettyhandler(geturl(), this);
   channels = nettyhandler.getchannels();
   bootstrap.setpipelinefactory(new channelpipelinefactory() {
       public channelpipeline getpipeline() {
           nettycodecadapter adapter = new nettycodecadapter(getcodec(), geturl(), nettyserver.this);
           channelpipeline pipeline = channels.pipeline();
           pipeline.addlast("decoder", adapter.getdecoder());
           pipeline.addlast("encoder", adapter.getencoder());
           pipeline.addlast("handler", nettyhandler);
           return pipeline;
       }
   });
   channel = bootstrap.bind(getbindaddress());
}

该方法中,看到了熟悉的 boss 线程,worker 线程,和 serverbootstrap,在添加了编解码 handler  之后,添加一个 nettyhandler,最后调用 bind 方法,完成绑定端口的工作。和我们使用 netty 是一摸一样。

3. 总结


可以看到,dubbo 使用 netty 还是挺简单的,消费者使用 nettyclient,提供者使用 nettyserver,provider  启动的时候,会开启端口监听,使用我们平时启动 netty 一样的方式。

而 client 在 spring getbean 的时候,会创建 client,当调用远程方法的时候,将数据通过 dubbo 协议编码发送到 nettyserver,然后 nettserver 收到数据后解码,并调用本地方法,并返回数据,完成一次完美的 rpc 调用。

好,关于 dubbo 如何使用 netty 就简短的介绍到这里。

推荐去我的博客阅读更多:

1.

2.

3.

4.

觉得不错,别忘了点赞+转发哦!

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网