当前位置: 移动技术网 > IT编程>开发语言>Java > netty源码解析(4.0)-28 ByteBuf内存池:PooledByteBufAllocator-把一切组装起来

netty源码解析(4.0)-28 ByteBuf内存池:PooledByteBufAllocator-把一切组装起来

2019年11月12日  | 移动技术网IT编程  | 我要评论

 

  pooledbytebufallocator负责初始化poolarena(pa)和poolthreadcache(ptc)。它提供了一系列的接口,用来创建使用堆内存或直接内存的pooledbytebuf对象,这些接口只是一张皮,内部完全使用了pa和ptc的能力。初始化过程分两个步骤,首先初始化一系列的默认参数,然后初始化ptc对象和pa数组。

 

默认参数和它们的值

  default_page_size: poolchunk中的page的大小-pagesize,  使用-dio.netty.allocator.pagesize设置, 默认值:8192。

  default_max_order: poolchunk中二叉树的高度: maxorder, 使用-dio.netty.allocator.maxorder设置,默认值:11。

  default_num_heap_arena: 使用堆内存的pa数组的长度,使用-dio.netty.allocator.numheaparenas设置,默认值: cpu核心数 * 2。

  default_num_direct_arena: 使用直接内存的pa数组的长度,使用-dio.netty.allocator.numheaparenas设置,默认值: cpu核心数 * 2。

  default_tiny_cache_size:  ptc对象中每个用来缓存tiny内存的memoryregioncache对象中queue的长度,使用-dio.netty.allocator.tinycachesize设置,默认值:512。

  default_small_cache_size: ptc对象中每个用来缓存small内存的memoryregioncache对象中queue的长度,使用-dio.netty.allocator.smallcachesize设置,默认值:256。

  default_normal_cache_size: ptc对象中每个用来缓存normal内存的memoryregioncache对象中queue的长度,使用-dio.netty.allocator.normalcachesize设置,默认值:64。

  default_max_cached_buffer_capacity: ptc对象中缓存normal内存的大小上限。使用-dio.netty.allocator.maxcachedbuffercapacity设置,默认值32 * 1024。

  default_cache_trim_interval:  ptc对象中释放缓存的内存阈值。当ptc分配内存次数大于这个值时会释放缓存的内存。使用-dio.netty.allocator.cachetriminterval设置,默认值:8192。

  default_use_cache_for_all_threads: 是否对所有的线程使用缓存。使用-dio.netty.allocator.usecacheforallthreads设置,默认值:true。

  default_direct_memory_cache_alignment: 直接内存的对齐参数,分配直接内存的大小必须是它的整数倍。使用-dio.netty.allocator.directmemorycachealignment设置,默认值:0, 表示不对齐。

 

初始化poolarena数组

  pooledbytebufallocator维护了两个数组:

poolarena<byte[]>[] heaparenas; 
poolarena<bytebuffer>[] directarenas;

  heaparenas用来管理堆内存,directarenas用来管理直接内存。这两个数组在构造方法中初始化,构造方法的定义是:

    public pooledbytebufallocator(boolean preferdirect, int nheaparena, int ndirectarena, int pagesize, int maxorder,
                                  int tinycachesize, int smallcachesize, int normalcachesize,
                                  boolean usecacheforallthreads, int directmemorycachealignment)

  prefredirect: 创建pooledbytebuf时,是否优先使用直接内存。

  nheaparena: 默认使用default_num_heap_arena

  ndirectarena: 默认使用default_num_direct_arena

  pagesize: 默认使用的default_page_size

  maxorder: 默认使用default_max_order

  tinycachesize:  默认使用default_tiny_cache_size

  smallcachesize: 默认使用default_small_cache_size

  normalcachesize: 默认使用default_normal_cache_size。

  usecacheforallthreads: 默认使用default_use_cache_for_all_threads

  directmemorycachealignment: 默认使用default_direct_memory_cache_alignment

  

  这两数组的初始化代码如下:

 1       int pageshifts = validateandcalculatepageshifts(pagesize);
 2 
 3         if (nheaparena > 0) {
 4             heaparenas = newarenaarray(nheaparena);
 5             list<poolarenametric> metrics = new arraylist<poolarenametric>(heaparenas.length);
 6             for (int i = 0; i < heaparenas.length; i ++) {
 7                 poolarena.heaparena arena = new poolarena.heaparena(this,
 8                         pagesize, maxorder, pageshifts, chunksize,
 9                         directmemorycachealignment);
10                 heaparenas[i] = arena;
11                 metrics.add(arena);
12             }
13             heaparenametrics = collections.unmodifiablelist(metrics);
14         } else {
15             heaparenas = null;
16             heaparenametrics = collections.emptylist();
17         }
18 
19         if (ndirectarena > 0) {
20             directarenas = newarenaarray(ndirectarena);
21             list<poolarenametric> metrics = new arraylist<poolarenametric>(directarenas.length);
22             for (int i = 0; i < directarenas.length; i ++) {
23                 poolarena.directarena arena = new poolarena.directarena(
24                         this, pagesize, maxorder, pageshifts, chunksize, directmemorycachealignment);
25                 directarenas[i] = arena;
26                 metrics.add(arena);
27             }
28             directarenametrics = collections.unmodifiablelist(metrics);
29         } else {
30             directarenas = null;
31             directarenametrics = collections.emptylist();
32         }

  1行,计算pageshifts,算法是pageshifts = integer.size - 1 - integer.numberofleadingzeros(pagesize) = 31 - integer.numberofleadingzeros(pagesize)。 integer.numberofleadingzeros(pagesize)是pagesize(32位整数)从最高位起连续是0的位数,因此pageshifts可以简化为pageshifts = log2(pagesize)。

  4,20行,创建数组,new poolarena[size]。  

  6-12,22-17行, 初始化数组中的poolarena对象,分别使用pooarena的两个内部类: heaparena, directarena。

 

初始化poolthreadcache

   poolthreadcache使用poolthreadlocalcache(ptlc)间接初始化,ptlc是pooledbytebufallocator的内部内,它的定义如下:

final class poolthreadlocalcache extends fastthreadlocal<poolthreadcache>

  这个类派生自io.netty.util.concurrent.fastthreadlocal<t>,  和java.lang.threadlocal<t>功能一样,实现了线程本地存储(tls)的功能,不同的是fastthreadlocal<t>优化了访问性能。ptlc覆盖了父类的initialvalue方法,这个方法负责初始化线程本地的poolthreadcache对象。当第一次调用ptlc对象的get方法时,这个方法会被调用。

 1         @override
 2         protected synchronized poolthreadcache initialvalue() {
 3             final poolarena<byte[]> heaparena = leastusedarena(heaparenas);
 4             final poolarena<bytebuffer> directarena = leastusedarena(directarenas);
 5 
 6             if (usecacheforallthreads || thread.currentthread() instanceof fastthreadlocalthread) {
 7                 return new poolthreadcache(
 8                         heaparena, directarena, tinycachesize, smallcachesize, normalcachesize,
 9                         default_max_cached_buffer_capacity, default_cache_trim_interval);
10             }
11             // no caching for non fastthreadlocalthreads.
12             return new poolthreadcache(heaparena, directarena, 0, 0, 0, 0, 0);
13         }

  3,4行,分别从headarenas,directarenas中取出一个使用次数最少的poolarena对象。poolarena有一个numthreadcaches属性,这个属性是atomicinteger类型的原子变量。它的作用是在用来记录被poolthreadcache对象使用的次数。poolthreadcache对象创建时会在构造方法中会调用它的getandincrement方法,释放时在free0方法中调用他的getanddecrement方法。

  6行,  如果运行每个线程都使用缓存(usercacheforallthreads==true),或者当成线程对象是fastthreadlocalthread时, 在第8行创建一个线程专用的ptc对象。

 

poolchunklist(pckl)

关键属性

  poolchunklist<t> nextlist

  poolchunklist<t> prevlist

  这两个属性表明pckl对象是一个双向链表的节点。

  poolchunk<t> head

  这个属性表明pckl对象还维护的一个pck类型的链表,head指向这个链表的头。

  int minusage;

  int maxusage;

  int maxcapacity;

  minusage是pck链表中每个pck对象内存的最小使用率,maxuseage是pck的最大使用率。这两个值是百分比,例如:minusage=10, maxuse=50,表示pck链表中只能保存使用率在[10%,50%)的pck对象。 maxcapacity表示pck最大可分配的内存数,算法是: maxcapacity = (int)(chunksize * (100l - minuseage) / 100l)。

 

初始化pckl链表

  pckl链表有poolarena负责维护,在poolarena的构造方法中初始化:

 1 // io.netty.buffer.poolarena#poolarena(pooledbytebufallocator parent, int pagesize,
 2 //          int maxorder, int pageshifts, int chunksize, int cachealignment)
 3 
 4         q100 = new poolchunklist<t>(this, null, 100, integer.max_value, chunksize);
 5         q075 = new poolchunklist<t>(this, q100, 75, 100, chunksize);
 6         q050 = new poolchunklist<t>(this, q075, 50, 100, chunksize);
 7         q025 = new poolchunklist<t>(this, q050, 25, 75, chunksize);
 8         q000 = new poolchunklist<t>(this, q025, 1, 50, chunksize);
 9         qinit = new poolchunklist<t>(this, q000, integer.min_value, 25, chunksize);
10 
11         q100.prevlist(q075);
12         q075.prevlist(q050);
13         q050.prevlist(q025);
14         q025.prevlist(q000);
15         q000.prevlist(null);
16         qinit.prevlist(qinit); 

  4-9行,初始化pckl节点。每个节点的名字q{num},其中num表示这个节点的最小使用率minusage,如q075节点的minusage=%75。

  11-16行,把pckl节点组装成一个链表。

  使用q(minusage, maxusage)表示一个节点,那么:

  qinit = q(integer.min_value, 25%)

  q000 = q(1%, 50%)

  q025 = q(25%, 75%)

  q075 = q(75%, 100%)

  q100 = q(100%, integer.max_value)

  这个链表的结构如下图所示:

  

poolchunk(pck)在poolchunklist(pckl)中移动

  一个新创建的pck对象,它的内存使用率是usage=%0,被放进qinit节节点。每次从这个pck对象中分配内存,都会导致它的使用率增加,当usage>=25%,即大于等于qinit的maxusage时,会把它移动到q000中。继续从pck对象中分配内存,它的usage继续增加,当usage大于等于它所属pckl的maxusage时,把它移动到pkcl链表中的下一个节点,直到q100为止。下面是内存分配导致pck移动的代码:

 1     //io.netty.buffer.poolchunklist#allocate
 2     boolean allocate(pooledbytebuf<t> buf, int reqcapacity, int normcapacity) {
 3         if (head == null || normcapacity > maxcapacity) {
 4             // either this poolchunklist is empty or the requested capacity is larger then the capacity which can
 5             // be handled by the poolchunks that are contained in this poolchunklist.
 6             return false;
 7         }
 8 
 9         for (poolchunk<t> cur = head;;) {
10             long handle = cur.allocate(normcapacity);
11             if (handle < 0) {
12                 cur = cur.next;
13                 if (cur == null) {
14                     return false;
15                 }
16             } else {
17                 cur.initbuf(buf, handle, reqcapacity);
18                 if (cur.usage() >= maxusage) {
19                     remove(cur);
20                     nextlist.add(cur);
21                 }
22                 return true;
23             }
24         }
25     }

  9-12行,尝试从pck链表中的所有pck节点分配所需的内存。

  14行,没有找到能分配内存的pck节点。

  17行,从cur节点分配到所需的内存,并初始化pooledbytebuf对象。

  18-21行,如cur节点的使用率大于等于当前pckl节点maxusage,调用remove方法把cur从head链表中删除,然后调用pckl链表中的下一个节点的add方法,把cur移动到下一个节点中。

 

  如果持续地释放内存,把内存还给pck对象,会导致usage持续减小,当usage小于它所属的pckl的minusage时,把它移动到pckl链表中的前一个节点,直到q000位为止。当释放内存导致pck对象的usage等于%0,会销毁这个pck对象,释放整个chunk的内存。下面是释放内存导致pck对象移动的代码:

 1     //io.netty.buffer.poolchunklist#free
 2     boolean free(poolchunk<t> chunk, long handle) {
 3         chunk.free(handle);
 4         if (chunk.usage() < minusage) {
 5             remove(chunk);
 6             // move the poolchunk down the poolchunklist linked-list.
 7             return move0(chunk);
 8         }
 9         return true;
10     }
11 
12     //io.netty.buffer.poolchunklist#move0
13     private boolean move0(poolchunk<t> chunk) {
14         if (prevlist == null) {
15             // there is no previous poolchunklist so return false which result in having the poolchunk destroyed and
16             // all memory associated with the poolchunk will be released.
17             assert chunk.usage() == 0;
18             return false;
19         }
20         return prevlist.move(chunk);
21     }

  第3行,释放内存,把内存返还给pck对象。

  4-7行,如pck的使用率小于当前pckl的minusage,调用remove方法把pck对象从当前pckl对象中删除,然后调用move0方法把它移动到前一个pckl节点。

  13-31行,移动pck到前一个pckl。

 

完整的内存分配释放流程

内存分配

  入口方法:

  io.netty.buffer.abstractbytebufallocator#heapbuffer(int, int),创建使用堆内存的bytebuf, 调用newheapbuffer方法。

  io.netty.buffer.abstractbytebufallocator#directbuffer(int, int), 创建使用直接内存的bytebuf,  调用newdirectbuffer方法。

  具体实现:

  io.netty.buffer.pooledbytebufallocator#newheapbuffer(int initialcapacity, int maxcapacity)。

  io.netty.buffer.pooledbytebufallocator#newdirectbuffer(int initialcapacity, int maxcapacity)。 

  这两个方法都是从poolthreadcache对象中得到线程专用的poolarena对象,然后调用poolarena的allocate方法创建poolbytebuf对象。

  poolarena入口方法:

  io.netty.buffer.poolarena#allocate(io.netty.buffer.poolthreadcache, int, int),这个方法是poolarena分配内存,创建poolbytebuf对象的入口方法。它先调用子类实现的newbytebuf创建一个poolbytebuf对象,这个方法有两个实现:

  io.netty.buffer.poolarena.heaparena#newbytebuf(int maxcapacity),创建使用堆内存的pooledbytebuf对象。

  io.netty.buffer.poolarena.directarena#newbytebuf(int maxcapacity),创建使用直接内存pooledbytebuf对象。

  然后调用io.netty.buffer.poolarena#allocate(io.netty.buffer.poolthreadcache, io.netty.buffer.pooledbytebuf<t>, int)方法为poolbytebuf对象分配内存,这个方法是分配内存的核心方法,下面来重点分析一下它的代码:

 1      private void allocate(poolthreadcache cache, pooledbytebuf<t> buf, final int reqcapacity) {
 2         final int normcapacity = normalizecapacity(reqcapacity);
 3         if (istinyorsmall(normcapacity)) { // capacity < pagesize
 4             int tableidx;
 5             poolsubpage<t>[] table;
 6             boolean tiny = istiny(normcapacity);
 7             if (tiny) { // < 512
 8                 if (cache.allocatetiny(this, buf, reqcapacity, normcapacity)) {
 9                     // was able to allocate out of the cache so move on
10                     return;
11                 }
12                 tableidx = tinyidx(normcapacity);
13                 table = tinysubpagepools;
14             } else {
15                 if (cache.allocatesmall(this, buf, reqcapacity, normcapacity)) {
16                     // was able to allocate out of the cache so move on
17                     return;
18                 }
19                 tableidx = smallidx(normcapacity);
20                 table = smallsubpagepools;
21             }
22 
23             final poolsubpage<t> head = table[tableidx];
24 
25             /**
26              * synchronize on the head. this is needed as {@link poolchunk#allocatesubpage(int)} and
27              * {@link poolchunk#free(long)} may modify the doubly linked list as well.
28              */
29             synchronized (head) {
30                 final poolsubpage<t> s = head.next;
31                 if (s != head) {
32                     assert s.donotdestroy && s.elemsize == normcapacity;
33                     long handle = s.allocate();
34                     assert handle >= 0;
35                     s.chunk.initbufwithsubpage(buf, handle, reqcapacity);
36                     inctinysmallallocation(tiny);
37                     return;
38                 }
39             }
40             synchronized (this) {
41                 allocatenormal(buf, reqcapacity, normcapacity);
42             }
43 
44             inctinysmallallocation(tiny);
45             return;
46         }
47         if (normcapacity <= chunksize) {
48             if (cache.allocatenormal(this, buf, reqcapacity, normcapacity)) {
49                 // was able to allocate out of the cache so move on
50                 return;
51             }
52             synchronized (this) {
53                 allocatenormal(buf, reqcapacity, normcapacity);
54                 ++allocationsnormal;
55             }
56         } else {
57             // huge allocations are never served via the cache so just call allocatehuge
58             allocatehuge(buf, reqcapacity);
59         }
60     }

  第2行,根据需要的内存大小reqcapacity,计算可以分配的标准内存大小normcapacity。必须满足(1)normcapacity>=reqcapacity, (2)normcapacity是directmemorycachealignment的整数倍,此外,还要根据reqcapacity的大小分3中情况:

    reqcapacity>=chunksize:normcapacity取同时满足(1),(2)的最小值。

    reqcapacity>=512且reqcapacity<chunksize: (3)normcapacity>=512*2k, (4)normcapacity<=chunksize,normcapacit取同时满足(1),(2),(3),(4)的最小值。

    reqcapacity<412: (5)normcapacity<512, (6)normcapacity是16的整数倍,normcapacity取同时满足(1),(2),(5),(6)的最小值。

  8-13行,分配tiny类型的内存(<512)。 8-10行,如果poolthreadcache缓存对象中分配到内存,分配内流程结束。12-13行,如果缓存中没有,就从tiny内存池中分配一块内存。

  15-20行,分配small类型的内存(>=512且<pagesize)。和分配tiny内存的逻辑相同。

  29-27行,  使用从前两个步骤中得到的tiny或small内存的索引,从子页面池中分配一块内存。33行,从子页面中分配内存。35行,使用分配到的内存初始化poolbytebuf对象,如果能到这里,分配内存流程结束。

  41行,如果子页面池中还没有内存可用,调用allocatenormal方法从poolchunk对象中分配一个子页面,再从子页面中分配所需的内存。

  47-55行,分配normal类型的内存(>=pagesize且<chunksize)。48,49行,从缓存中分配内存,如果成功,分配内存流程结束。53行,缓存中没有可用的内存,调用allocatenormal方法从poolchunk中分配内存。

  58行,如果分配的是>chunksize的内存。这块内存不会进入pckl链表中。

  

  上面代码中的allocatenormal方法封装了创建pck对象,从pck对象中分配内存,再把pck对象放入到pckl链表中的逻辑,也是十分重要的代码。

 1     private void allocatenormal(pooledbytebuf<t> buf, int reqcapacity, int normcapacity) {
 2         if (q050.allocate(buf, reqcapacity, normcapacity) || q025.allocate(buf, reqcapacity, normcapacity) ||
 3             q000.allocate(buf, reqcapacity, normcapacity) || qinit.allocate(buf, reqcapacity, normcapacity) ||
 4             q075.allocate(buf, reqcapacity, normcapacity)) {
 5             return;
 6         }
 7 
 8         // add a new chunk.
 9         poolchunk<t> c = newchunk(pagesize, maxorder, pageshifts, chunksize);
10         long handle = c.allocate(normcapacity);
11         assert handle > 0;
12         c.initbuf(buf, handle, reqcapacity);
13         qinit.add(c);
14     }

  2-5行,依次尝试从每个pckl节点中分配内存,如果成功,分配内存流程结束。

  9-13行,先创建一个新的pck对象,然后从中分配内存,使用内存初始化pooledbytebuf对象,最后把pck对象添加pckl链表头节点qinit中。pkcl对象的add方法会和allocate一样,根据pck对象的内存使用率,把它移动到链表中合适的位置。

  

内存释放

  io.netty.buffer.pooledbytebuf#deallocate方法调用io.netty.buffer.poolarena#free方法,这个free方法负责整个内存释放过程。

 1     void free(poolchunk<t> chunk, long handle, int normcapacity, poolthreadcache cache) {
 2         if (chunk.unpooled) {
 3             int size = chunk.chunksize();
 4             destroychunk(chunk);
 5             activebyteshuge.add(-size);
 6             deallocationshuge.increment();
 7         } else {
 8             sizeclass sizeclass = sizeclass(normcapacity);
 9             if (cache != null && cache.add(this, chunk, handle, normcapacity, sizeclass)) {
10                 // cached so not free it.
11                 return;
12             }
13 
14             freechunk(chunk, handle, sizeclass);
15         }
16     }

  这段代码重点在8-14行。第8,9行,优先把内存放到缓存中,这样下次就能快速地从缓存中直接取用。第14行,在不能放进缓存的情况下把内存返回给pck对象。

 1     void freechunk(poolchunk<t> chunk, long handle, sizeclass sizeclass) {
 2         final boolean destroychunk;
 3         synchronized (this) {
 4             switch (sizeclass) {
 5             case normal:
 6                 ++deallocationsnormal;
 7                 break;
 8             case small:
 9                 ++deallocationssmall;
10                 break;
11             case tiny:
12                 ++deallocationstiny;
13                 break;
14             default:
15                 throw new error();
16             }
17             destroychunk = !chunk.parent.free(chunk, handle);
18         }
19         if (destroychunk) {
20             // destroychunk not need to be called while holding the synchronized lock.
21             destroychunk(chunk);
22         }
23     }

  第17行,掉用pckl对象的free方法把内存还给pck对象,移动pck对象在pckl链表中位置。如果此时这个pck对象的使用率变成0,destroychunk=true。

  第21行,调用destroychunk方法销毁掉pck对象。

   

  

 

 

  

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网