当前位置: 移动技术网 > IT编程>软件设计>架构 > 搭建生产级的Netty项目

搭建生产级的Netty项目

2020年04月02日  | 移动技术网IT编程  | 我要评论

netty是trustin lee在2004年开发的一款高性能的网络应用程序框架。相比于jdk自带的nio,netty做了相当多的增强,且隔离了jdk nio的实现细节,api也比较友好,还支持流量整形等高级特性。在我们常见的一些开源项目中已经普遍的应用到了netty,比如dubbo、elasticsearch、zookeeper等。

netty的具体开发

提示:因代码相对较多,这里只展示其主要部分,至于项目中用到的编解码器、工具类,请直接拉到最后下载源码!也欢迎顺手给个star~

需要的依赖
<dependency>
    <groupid>com.google.code.gson</groupid>
    <artifactid>gson</artifactid>
</dependency>

<dependency>
    <groupid>org.projectlombok</groupid>
    <artifactid>lombok</artifactid>
</dependency>
<dependency>
    <groupid>io.dropwizard.metrics</groupid>
    <artifactid>metrics-core</artifactid>
    <version>4.1.1</version>
</dependency>
<dependency>
    <groupid>io.dropwizard.metrics</groupid>
    <artifactid>metrics-jmx</artifactid>
    <version>4.1.1</version>
</dependency>
<dependency>
    <groupid>org.apache.commons</groupid>
    <artifactid>commons-lang3</artifactid>
</dependency>
<dependency>
    <groupid>io.netty</groupid>
    <artifactid>netty-all</artifactid>
    <version>4.1.29.final</version>
</dependency>
client端代码
package com.example.nettydemo.client;

import com.example.nettydemo.client.codec.*;
import com.example.nettydemo.client.codec.dispatcher.operationresultfuture;
import com.example.nettydemo.client.codec.dispatcher.requestpendingcenter;
import com.example.nettydemo.client.codec.dispatcher.responsedispatcherhandler;
import com.example.nettydemo.common.requestmessage;
import com.example.nettydemo.common.string.stringoperation;
import com.example.nettydemo.util.idutil;
import io.netty.bootstrap.bootstrap;
import io.netty.channel.channelfuture;
import io.netty.channel.channelinitializer;
import io.netty.channel.channelpipeline;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.nio.niochanneloption;
import io.netty.channel.socket.nio.niosocketchannel;
import io.netty.handler.logging.loglevel;
import io.netty.handler.logging.logginghandler;

import javax.net.ssl.sslexception;
import java.util.concurrent.executionexception;

public class client {

    public static void main(string[] args) throws interruptedexception, executionexception, sslexception {

        bootstrap bootstrap = new bootstrap();
        bootstrap.channel(niosocketchannel.class);

        //客户端连接服务器最大允许时间,默认为30s
        bootstrap.option(niochanneloption.connect_timeout_millis, 30 * 1000); //10s

        nioeventloopgroup group = new nioeventloopgroup();
        try {

            bootstrap.group(group);

            requestpendingcenter requestpendingcenter = new requestpendingcenter();
            logginghandler logginghandler = new logginghandler(loglevel.info);

            bootstrap.handler(new channelinitializer<niosocketchannel>() {
                @override
                protected void initchannel(niosocketchannel ch) throws exception {
                    channelpipeline pipeline = ch.pipeline();

                    pipeline.addlast(new framedecoder());
                    pipeline.addlast(new frameencoder());

                    pipeline.addlast(new protocolencoder());
                    pipeline.addlast(new protocoldecoder());

                    pipeline.addlast(new responsedispatcherhandler(requestpendingcenter));
                    pipeline.addlast(new operationtorequestmessageencoder());

//                    pipeline.addlast(logginghandler);

                }
            });

            //连接服务
            channelfuture channelfuture = bootstrap.connect("127.0.0.1", 8888);
            //因为future是异步执行,所以需要先连接上后,再进行下一步操作
            channelfuture.sync();

            long streamid = idutil.nextid();
            /**
             * 发送数据测试,按照定义的规则组装数据
             */
//            orderoperation orderoperation =  new orderoperation(1001, "你好啊,hi");
            requestmessage requestmessage = new requestmessage(streamid, new stringoperation(1234, "你好啊,hi"));

            //将future放入center
            operationresultfuture operationresultfuture = new operationresultfuture();
            requestpendingcenter.add(streamid, operationresultfuture);

            //发送消息
            for (int i = 0; i < 10; i++) {
                channelfuture.channel().writeandflush(requestmessage);
            }

            //阻塞等待结果,结果来了之后会调用responsedispatcherhandler去set结果
//            operationresult operationresult = operationresultfuture.get();
//            //将结果打印
//            system.out.println("返回:"+operationresult);

            channelfuture.channel().closefuture().get();

        } finally {
            group.shutdowngracefully();
        }

    }

}

server端代码
package com.example.nettydemo.server;

import com.example.nettydemo.server.codec.framedecoder;
import com.example.nettydemo.server.codec.frameencoder;
import com.example.nettydemo.server.codec.protocoldecoder;
import com.example.nettydemo.server.codec.protocolencoder;
import com.example.nettydemo.server.handler.metricshandler;
import com.example.nettydemo.server.handler.serveridlecheckhandler;
import com.example.nettydemo.server.handler.serverprocesshandler;
import io.netty.bootstrap.serverbootstrap;
import io.netty.channel.channelfuture;
import io.netty.channel.channelinitializer;
import io.netty.channel.channelpipeline;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.nio.niochanneloption;
import io.netty.channel.socket.nio.nioserversocketchannel;
import io.netty.channel.socket.nio.niosocketchannel;
import io.netty.handler.flush.flushconsolidationhandler;
import io.netty.handler.logging.loglevel;
import io.netty.handler.logging.logginghandler;
import io.netty.handler.traffic.globaltrafficshapinghandler;
import io.netty.util.concurrent.defaultthreadfactory;
import io.netty.util.concurrent.unorderedthreadpooleventexecutor;
import lombok.extern.slf4j.slf4j;

import javax.net.ssl.sslexception;
import java.security.cert.certificateexception;
import java.util.concurrent.executionexception;

/**
 * netty server 入口
 */
@slf4j
public class server {


    public static void main(string... args) throws interruptedexception, executionexception, certificateexception, sslexception {

        serverbootstrap serverbootstrap = new serverbootstrap();
        //设置channel模式,因为是server所以使用nioserversocketchannel
        serverbootstrap.channel(nioserversocketchannel.class);

        //最大的等待连接数量
        serverbootstrap.option(niochanneloption.so_backlog, 1024);
        //设置是否启用 nagle 算法:用将小的碎片数据连接成更大的报文 来提高发送效率。
        //如果需要发送一些较小的报文,则需要禁用该算法
        serverbootstrap.childoption(niochanneloption.tcp_nodelay, true);

        //设置netty自带的log,并设置级别
        serverbootstrap.handler(new logginghandler(loglevel.info));

        //thread
        //用户指定线程名
        nioeventloopgroup bossgroup = new nioeventloopgroup(0, new defaultthreadfactory("boss"));
        nioeventloopgroup workgroup = new nioeventloopgroup(0, new defaultthreadfactory("worker"));
        unorderedthreadpooleventexecutor businessgroup = new unorderedthreadpooleventexecutor(10, new defaultthreadfactory("business"));

        //只能使用一个线程,因globaltrafficshapinghandler比较轻量级
        nioeventloopgroup eventloopgroupfortrafficshaping = new nioeventloopgroup(0, new defaultthreadfactory("ts"));

        try {
            //设置react方式
            serverbootstrap.group(bossgroup, workgroup);

            //metrics
            metricshandler metricshandler = new metricshandler();

            //trafficshaping流量整形
            //long writelimit 写入时控制, long readlimit 读取时控制 具体设置看业务修改
            globaltrafficshapinghandler globaltrafficshapinghandler = new globaltrafficshapinghandler(eventloopgroupfortrafficshaping, 10 * 1024 * 1024, 10 * 1024 * 1024);


            //log
            logginghandler debugloghandler = new logginghandler(loglevel.debug);
            logginghandler infologhandler = new logginghandler(loglevel.info);

            //设置childhandler,按执行顺序放
            serverbootstrap.childhandler(new channelinitializer<niosocketchannel>() {
                @override
                protected void initchannel(niosocketchannel ch) throws exception {

                    channelpipeline pipeline = ch.pipeline();

                    pipeline.addlast("debuglog", debugloghandler);
                    pipeline.addlast("tshandler", globaltrafficshapinghandler);
                    pipeline.addlast("metrichandler", metricshandler);
                    pipeline.addlast("idlehandler", new serveridlecheckhandler());

                    pipeline.addlast("framedecoder", new framedecoder());
                    pipeline.addlast("frameencoder", new frameencoder());
                    pipeline.addlast("protocoldecoder", new protocoldecoder());
                    pipeline.addlast("protocolencoder", new protocolencoder());

                    pipeline.addlast("infolog", infologhandler);
                    //对flush增强,减少flush次数牺牲延迟增强吞吐量
                    pipeline.addlast("flushenhance", new flushconsolidationhandler(10, true));
                    //为业务处理指定单独的线程池
                    pipeline.addlast(businessgroup, new serverprocesshandler());//businessgroup,
                }
            });

            //绑定端口并阻塞启动
            channelfuture channelfuture = serverbootstrap.bind(8888).sync();

            channelfuture.channel().closefuture().sync();

        } finally {
            bossgroup.shutdowngracefully();
            workgroup.shutdowngracefully();
            businessgroup.shutdowngracefully();
            eventloopgroupfortrafficshaping.shutdowngracefully();
        }

    }

}

最后

以上介绍了netty的基本用法,在代码中也做了一部分的关键注释,但可能还会有许多不足,也不可能满足所有人的要求,大家可根据自己的实际需求去改造此项目。附上源码地址

持续学习,记录点滴。更多文章请访问

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网