当前位置: 移动技术网 > IT编程>开发语言>Java > 深入探秘 Netty、Kafka 中的零拷贝技术!

深入探秘 Netty、Kafka 中的零拷贝技术!

2020年04月20日  | 移动技术网IT编程  | 我要评论

前言

从字面意思理解就是数据不需要来回的拷贝,大大提升了系统的性能;这个词我们也经常在java
nio,netty,kafka,rocketmq等框架中听到,经常作为其提升性能的一大亮点;下面从i/o的几个概念开始,进而在分析零拷贝。

i/o概念

1.缓冲区

缓冲区是所有i/o的基础,i/o讲的无非就是把数据移进或移出缓冲区;进程执行i/o操作,就是向操作系统发出请求,让它要么把缓冲区的数据排干(写),要么填充缓冲区(读);下面看一个java进程发起read请求加载数据大致的流程图:

进程发起read请求之后,内核接收到read请求之后,会先检查内核空间中是否已经存在进程所需要的数据,如果已经存在,则直接把数据copy给进程的缓冲区;如果没有内核随即向磁盘控制器发出命令,要求从磁盘读取数据,磁盘控制器把数据直接写入内核read缓冲区,这一步通过dma完成;接下来就是内核将数据copy到进程的缓冲区;
如果进程发起write请求,同样需要把用户缓冲区里面的数据copy到内核的socket缓冲区里面,然后再通过dma把数据copy到网卡中,发送出去;
你可能觉得这样挺浪费空间的,每次都需要把内核空间的数据拷贝到用户空间中,所以零拷贝的出现就是为了解决这种问题的;
关于零拷贝提供了两种方式分别是:mmap+write方式,sendfile方式;

2.虚拟内存

所有现代操作系统都使用虚拟内存,使用虚拟的地址取代物理地址,这样做的好处是:
1.一个以上的虚拟地址可以指向同一个物理内存地址,
2.虚拟内存空间可大于实际可用的物理地址;
利用第一条特性可以把内核空间地址和用户空间的虚拟地址映射到同一个物理地址,这样dma就可以填充对内核和用户空间进程同时可见的缓冲区了,大致如下图所示:

省去了内核与用户空间的往来拷贝,java也利用操作系统的此特性来提升性能,下面重点看看java对零拷贝都有哪些支持。

3.mmap+write方式

使用mmap+write方式代替原来的read+write方式,mmap是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系;这样就可以省掉原来内核read缓冲区copy数据到用户缓冲区,但是还是需要内核read缓冲区将数据copy到内核socket缓冲区,大致如下图所示:

4.sendfile方式

sendfile系统调用在内核版本2.1中被引入,目的是简化通过网络在两个通道之间进行的数据传输过程。sendfile系统调用的引入,不仅减少了数据复制,还减少了上下文切换的次数,大致如下图所示:

数据传送只发生在内核空间,所以减少了一次上下文切换;但是还是存在一次copy,能不能把这一次copy也省略掉,linux2.4内核中做了改进,将kernel
buffer中对应的数据描述信息(内存地址,偏移量)记录到相应的socket缓冲区当中,这样连内核空间中的一次cpu copy也省掉了;

java零拷贝

1.mappedbytebuffer

java
nio提供的filechannel提供了map()方法,该方法可以在一个打开的文件和mappedbytebuffer之间建立一个虚拟内存映射,mappedbytebuffer继承于bytebuffer,类似于一个基于内存的缓冲区,只不过该对象的数据元素存储在磁盘的一个文件中;调用get()方法会从磁盘中获取数据,此数据反映该文件当前的内容,调用put()方法会更新磁盘上的文件,并且对文件做的修改对其他阅读者也是可见的;下面看一个简单的读取实例,然后在对mappedbytebuffer进行分析:

    public class mappedbytebuffertest {
    
        public static void main(string[] args) throws exception {
            file file = new file("d://db.txt");
            long len = file.length();
            byte[] ds = new byte[(int) len];
            mappedbytebuffer mappedbytebuffer = new fileinputstream(file).getchannel().map(filechannel.mapmode.read_only, 0,
                    len);
            for (int offset = 0; offset < len; offset++) {
                byte b = mappedbytebuffer.get();
                ds[offset] = b;
            }
            scanner scan = new scanner(new bytearrayinputstream(ds)).usedelimiter(" ");
            while (scan.hasnext()) {
                system.out.print(scan.next() + " ");
            }
        }
    }
    复制代码

主要通过filechannel提供的map()来实现映射,map()方法如下:

        public abstract mappedbytebuffer map(mapmode mode,
                                             long position, long size)
            throws ioexception;
            
    复制代码

分别提供了三个参数,mapmode,position和size;分别表示:
mapmode:映射的模式,可选项包括:read_only,read_write,private;
position:从哪个位置开始映射,字节数的位置;
size:从position开始向后多少个字节;

重点看一下mapmode,请两个分别表示只读和可读可写,当然请求的映射模式受到filechannel对象的访问权限限制,如果在一个没有读权限的文件上启用read_only,将抛出nonreadablechannelexception;private模式表示写时拷贝的映射,意味着通过put()方法所做的任何修改都会导致产生一个私有的数据拷贝并且该拷贝中的数据只有mappedbytebuffer实例可以看到;该过程不会对底层文件做任何修改,而且一旦缓冲区被施以垃圾收集动作(garbage
collected),那些修改都会丢失;大致浏览一下map()方法的源码:

        public mappedbytebuffer map(mapmode mode, long position, long size)
            throws ioexception
        {
                ...省略...
                int pageposition = (int)(position % allocationgranularity);
                long mapposition = position - pageposition;
                long mapsize = size + pageposition;
                try {
                    // if no exception was thrown from map0, the address is valid
                    addr = map0(imode, mapposition, mapsize);
                } catch (outofmemoryerror x) {
                    // an outofmemoryerror may indicate that we've exhausted memory
                    // so force gc and re-attempt map
                    system.gc();
                    try {
                        thread.sleep(100);
                    } catch (interruptedexception y) {
                        thread.currentthread().interrupt();
                    }
                    try {
                        addr = map0(imode, mapposition, mapsize);
                    } catch (outofmemoryerror y) {
                        // after a second oome, fail
                        throw new ioexception("map failed", y);
                    }
                }
    
                // on windows, and potentially other platforms, we need an open
                // file descriptor for some mapping operations.
                filedescriptor mfd;
                try {
                    mfd = nd.duplicateformapping(fd);
                } catch (ioexception ioe) {
                    unmap0(addr, mapsize);
                    throw ioe;
                }
    
                assert (iostatus.checkall(addr));
                assert (addr % allocationgranularity == 0);
                int isize = (int)size;
                unmapper um = new unmapper(addr, mapsize, isize, mfd);
                if ((!writable) || (imode == map_ro)) {
                    return util.newmappedbytebufferr(isize,
                                                     addr + pageposition,
                                                     mfd,
                                                     um);
                } else {
                    return util.newmappedbytebuffer(isize,
                                                    addr + pageposition,
                                                    mfd,
                                                    um);
                }
         }
    复制代码

大致意思就是通过native方法获取内存映射的地址,如果失败,手动gc再次映射;最后通过内存映射的地址实例化出mappedbytebuffer,mappedbytebuffer本身是一个抽象类,其实这里真正实例话出来的是directbytebuffer;

2.directbytebuffer

directbytebuffer继承于mappedbytebuffer,从名字就可以猜测出开辟了一段直接的内存,并不会占用jvm的内存空间;上一节中通过filechannel映射出的mappedbytebuffer其实际也是directbytebuffer,当然除了这种方式,也可以手动开辟一段空间:

    bytebuffer directbytebuffer = bytebuffer.allocatedirect(100);
    复制代码

如上开辟了100字节的直接内存空间;

3.channel-to-channel传输

经常需要从一个位置将文件传输到另外一个位置,filechannel提供了transferto()方法用来提高传输的效率,首先看一个简单的实例:

    public class channeltransfer {
        public static void main(string[] argv) throws exception {
            string files[]=new string[1];
            files[0]="d://db.txt";
            catfiles(channels.newchannel(system.out), files);
        }
    
        private static void catfiles(writablebytechannel target, string[] files)
                throws exception {
            for (int i = 0; i < files.length; i++) {
                fileinputstream fis = new fileinputstream(files[i]);
                filechannel channel = fis.getchannel();
                channel.transferto(0, channel.size(), target);
                channel.close();
                fis.close();
            }
        }
    }
    复制代码

通过filechannel的transferto()方法将文件数据传输到system.out通道,接口定义如下:

        public abstract long transferto(long position, long count,
                                        writablebytechannel target)
            throws ioexception;
    

几个参数也比较好理解,分别是开始传输的位置,传输的字节数,以及目标通道;transferto()允许将一个通道交叉连接到另一个通道,而不需要一个中间缓冲区来传递数据;
注:这里不需要中间缓冲区有两层意思:第一层不需要用户空间缓冲区来拷贝内核缓冲区,另外一层两个通道都有自己的内核缓冲区,两个内核缓冲区也可以做到无需拷贝数据;

netty零拷贝

netty提供了零拷贝的buffer,在传输数据时,最终处理的数据会需要对单个传输的报文,进行组合和拆分,nio原生的bytebuffer无法做到,netty通过提供的composite(组合)和slice(拆分)两种buffer来实现零拷贝;看下面一张图会比较清晰:

tcp层http报文被分成了两个channelbuffer,这两个buffer对我们上层的逻辑(http处理)是没有意义的。
但是两个channelbuffer被组合起来,就成为了一个有意义的http报文,这个报文对应的channelbuffer,才是能称之为”message”的东西,这里用到了一个词”virtual
buffer”。
可以看一下netty提供的compositechannelbuffer源码:

    public class compositechannelbuffer extends abstractchannelbuffer {
    
        private final byteorder order;
        private channelbuffer[] components;
        private int[] indices;
        private int lastaccessedcomponentid;
        private final boolean gathering;
        
        public byte getbyte(int index) {
            int componentid = componentid(index);
            return components[componentid].getbyte(index - indices[componentid]);
        }
        ...省略...
   

components用来保存的就是所有接收到的buffer,indices记录每个buffer的起始位置,lastaccessedcomponentid记录上一次访问的componentid;compositechannelbuffer并不会开辟新的内存并直接复制所有channelbuffer内容,而是直接保存了所有channelbuffer的引用,并在子channelbuffer里进行读写,实现了零拷贝。

其他零拷贝

rocketmq的消息采用顺序写到commitlog文件,然后利用consume
queue文件作为索引;rocketmq采用零拷贝mmap+write的方式来回应consumer的请求;
同样kafka中存在大量的网络数据持久化到磁盘和磁盘文件通过网络发送的过程,kafka使用了sendfile零拷贝方式;

总结

零拷贝如果简单用java里面对象的概率来理解的话,其实就是使用的都是对象的引用,每个引用对象的地方对其改变就都能改变此对象,永远只存在一份对象。

来源:
作者:ksfzhaohui

推荐阅读

学习资料分享

12 套 微服务、spring boot、spring cloud 核心技术资料,这是部分资料目录:

  • spring security 认证与授权
  • spring boot 项目实战(中小型互联网公司后台服务架构与运维架构)
  • spring boot 项目实战(企业权限管理项目))
  • spring cloud 微服务架构项目实战(分布式事务解决方案)
  • ...

公众号后台回复arch028获取资料::

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网