当前位置: 移动技术网 > IT编程>开发语言>Java > Netty 系列九(支持UDP协议).

Netty 系列九(支持UDP协议).

2018年09月14日  | 移动技术网IT编程  | 我要评论

一、基础知识

    udp 协议相较于 tcp 协议的特点:

1、无连接协议,没有持久化连接;
2、每个 udp 数据报都是一个单独的传输单元;
3、一定的数据报丢失;
4、没有重传机制,也不管数据报是否可达;
5、速度比tcp快很多,可用来高效处理大量数据 —— 牺牲了握手以及消息管理机制。
6、常用于音频、视频场景,可以忍受一定的数据包丢失,追求速度上的提升。

    tcp 协议采用的是一种叫做单播的传输形式,udp 协议提供了向多个接收者发送消息的额外传输形式(多播、广播):

单播(tcp 和 udp):发送消息给一个由唯一的地址所标识的单一的网络目的地。
多播(udp):传输给一个预定义的主机组。
广播(udp):传输到网络(或者子网)上的所有主机。

二、功能说明

    广播方:打开一个文件,通过 udp 使用特殊的受限广播地址或者零网络地址 255.255.255.255,把每一行作为一个消息广播到一个指定的端口。

    接收方:通过 udp 广播,只需简单地通过在指定的端口上启动一个监听程序,便可以创建一个事件监视器来接收日志消息。所有的在该 udp 端口上监听的事件监听器都将会接收到广播信息。

三、实现

    下图展示了怎么将我们的 文件数据 广播为 udp消息:所有的将要被传输的数据都被封装在了 logevent 消息中。 logeventbroadcaster 将把这些写入到 channel 中,并通过 channelpipeline 发送它们,在那里它们将会被转换(编码)为 datagrampacket 消息。最后,他们都将通过 udp 被广播,并由远程节点(监视器)所捕获。

    netty 中支持 udp 协议主要通过以下相关类:

datagrampacket:使用 bytebuf 作为数据源,是 udp 协议传输的消息容器。

datagramchannel:扩展了 netty 的 channel 抽象以支持 udp 的多播组管理,它的实现类 niodatagramchannnel 用来和远程节点通信。

bootstrap:udp 协议的引导类,使用 bind() 方法绑定 channel。    

public class logevent {
    public static final byte separator = ':';
    /**
     * ip套接字地址(ip地址+端口号)
     */
    private final inetsocketaddress inetsocketaddress;
    /**
     * 文件名
     */
    private final string logfile;
    /**
     * 消息内容
     */
    private final string msg;
    
    private final long received;

    /**
     * 用于传入消息的构造函数
     *
     * @param inetsocketaddress
     * @param logfile
     * @param msg
     * @param received
     */
    public logevent(inetsocketaddress inetsocketaddress, string logfile, string msg, long received) {
        this.inetsocketaddress = inetsocketaddress;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }

    /**
     * 用于传出消息的构造函数
     *
     * @param logfile
     * @param msg
     */
    public logevent(string logfile, string msg) {
        this(null, logfile, msg, -1);
    }

    public inetsocketaddress getinetsocketaddress() {
        return inetsocketaddress;
    }

    public string getlogfile() {
        return logfile;
    }

    public string getmsg() {
        return msg;
    }

    public long getreceived() {
        return received;
    }
}
文件实体类 logevent.java
public class logeventencoder extends messagetomessageencoder<logevent> {
    private final inetsocketaddress remoteaddress;

    public logeventencoder(inetsocketaddress remoteaddress) {
        this.remoteaddress = remoteaddress;
    }


    @override
    protected void encode(channelhandlercontext ctx, logevent msg, list<object> out) throws exception {
        byte[] file = msg.getlogfile().getbytes(charsetutil.utf_8);
        byte[] content = msg.getmsg().getbytes(charsetutil.utf_8);
        bytebuf bytebuf = ctx.alloc().buffer(file.length + content.length + 1);
        bytebuf.writebytes(file);
        bytebuf.writebyte(logevent.separator);
        bytebuf.writebytes(content);
        out.add(new datagrampacket(bytebuf, remoteaddress));
    }
}
编码器 logeventencoder.java

    该编码器实现了将 logevent 实体类内容转换为 datagrampacket udp数据报。

public class logeventbroadcaster {
    private final eventloopgroup group;
    private final bootstrap bootstrap;
    private final file file;

    public logeventbroadcaster(inetsocketaddress address, file file) {
        group = new nioeventloopgroup();
        bootstrap = new bootstrap();
        bootstrap.group(group)
                //引导该 niodatagramchannel(无连接的)
                .channel(niodatagramchannel.class)
                // 设置 so_broadcast 套接字选项
                .option(channeloption.so_broadcast, true)
                .handler(new logeventencoder(address));
        this.file = file;
    }
    
    public void run() throws interruptedexception, ioexception {
        //绑定 channel,udp 协议的连接用 bind() 方法
        channel channel = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        //长轮询 监听是否有新的日志文件生成
        while (true) {
            long length = file.length();
            if (length < pointer) {
                // 如果有必要,将文件指针设置到该文件的最后一个字节
                pointer = length;
            } else {
                randomaccessfile raf = new randomaccessfile(file, "r");
                // 确保当前的文件指针,以确保没有任何的旧数据被发送
                raf.seek(pointer);
                string line;
                while ((line = raf.readline()) != null) {
                    //对于每个日志条目,写入一个 logevent 到 channel 中,最后加入一个换行符号
                    channel.writeandflush(new logevent(file.getabsolutepath(), line + system.getproperty("line.separator")));
                }
                pointer = raf.getfilepointer();
                raf.close();
            }
            try {
                // 休眠一秒,如果被中断,则退出循环,否则重新处理它
                thread.sleep(1000);
            } catch (interruptedexception e) {
                while (!thread.interrupted()) {
                    break;
                }
            }
        }
    }

    public void stop() {
        group.shutdowngracefully();
    }

    public static void main(string[] args) throws ioexception, interruptedexception {
        inetsocketaddress socketaddress = new inetsocketaddress("255.255.255.255", 8888);
        file file = new file("e:\\2018-09-12.log");
        logeventbroadcaster logeventbroadcaster = new logeventbroadcaster(socketaddress, file);
        try {
            logeventbroadcaster.run();
        } finally {
            logeventbroadcaster.stop();
        }
    }
}

    现在,我们来测试一下这个 udp 广播类,首先我们需要一个工具 nmap ,用它来监听 udp 的 8888 端口,以接收我们广播的日志文件。下载地址:

    下载完成后,命令行进入安装目录,执行命令:ncat.exe -l -u -p 8888 ,监听 udp 端口。

 

     当然,也可以自己写个测试类监听 udp 端口,打印日志查看。这里我没有用 netty 写监听类,直接用了 java 原生的 datagramsocket 和 datagrampacket 写的监听类,如下:

public class udpserver {

    public static void main(string[] args) {
        datagramsocket server = null;
        try {
            server = new datagramsocket(8888);
            byte[] datas = new byte[1024];
            //用一个字节数组接收udp包,字节数组在传递给构造函数时是空的
            while (true) {
                datagrampacket datagrampacket = new datagrampacket(datas, datas.length);
                server.receive(datagrampacket);
                system.out.println(new string(datas));
            }
        } catch (socketexception e) {
            e.printstacktrace();
        } catch (ioexception e) {
            e.printstacktrace();
        } finally {
            server.close();
        }
    }
}
udpserver.java

    基于 netty 的监听类实现可以参考我上传 github 上的源代码。

 

参考资料:《netty in action》

演示源代码:https://github.com/jmcuixy/nettydemo/tree/master/src/main/java/org/netty/demo/udp

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网