温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Netty分布式ByteBuf使用的回收逻辑是什么

发布时间:2022-03-29 09:06:51 来源:亿速云 阅读:139 作者:iii 栏目:开发技术

这篇文章主要介绍“Netty分布式ByteBuf使用的回收逻辑是什么”,在日常操作中,相信很多人在Netty分布式ByteBuf使用的回收逻辑是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Netty分布式ByteBuf使用的回收逻辑是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

ByteBuf回收

之前的章节我们提到过, 堆外内存是不受jvm垃圾回收机制控制的, 所以我们分配一块堆外内存进行ByteBuf操作时, 使用完毕要对对象进行回收, 这一小节, 就以PooledUnsafeDirectByteBuf为例讲解有关内存分配的相关逻辑

PooledUnsafeDirectByteBuf中内存释放的入口方法是其父类AbstractReferenceCountedByteBuf中的release方法:

@Override  public boolean release() {      return release0(1);  }

这里调用了release0, 跟进去

private boolean release0(int decrement) {     for (;;) {         int refCnt = this.refCnt;         if (refCnt < decrement) {             throw new IllegalReferenceCountException(refCnt, -decrement);         }         if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {              if (refCnt == decrement) {                  deallocate();                 return true;             }             return false;         }     } }

 if (refCnt == decrement) 中判断当前byteBuf是否没有被引用了, 如果没有被引用, 则通过deallocate()方法进行释放

因为我们是以PooledUnsafeDirectByteBuf为例, 所以这里会调用其父类PooledByteBuf的deallocate方法:

protected final void deallocate() {     if (handle >= 0) {         final long handle = this.handle;         this.handle = -1;         memory = null;         chunk.arena.free(chunk, handle, maxLength, cache);         recycle();     } }

this.handle = -1表示当前的ByteBuf不再指向任何一块内存

memory = null这里将memory也设置为null

chunk.arena.free(chunk, handle, maxLength, cache)这一步是将ByteBuf的内存进行释放

recycle()是将对象放入的对象回收站, 循环利用

我们首先分析free方法

void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {     //是否为unpooled     if (chunk.unpooled) {         int size = chunk.chunkSize();         destroyChunk(chunk);         activeBytesHuge.add(-size);         deallocationsHuge.increment();     } else {         //那种级别的Size         SizeClass sizeClass = sizeClass(normCapacity);         //加到缓存里         if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {             return;         }         //将缓存对象标记为未使用         freeChunk(chunk, handle, sizeClass);     } }

首先判断是不是unpooled, 我们这里是Pooled, 所以会走到else块中:

sizeClass(normCapacity)计算是哪种级别的size, 我们按照tiny级别进行分析

cache.add(this, chunk, handle, normCapacity, sizeClass)是将当前当前ByteBuf进行缓存

我们之前讲过, 再分配ByteBuf时首先在缓存上分配, 而这步, 就是将其缓存的过程, 跟进去:

boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {     //拿到MemoryRegionCache节点     MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);     if (cache == null) {         return false;     }     //将chunk, 和handle封装成实体加到queue里面     return cache.add(chunk, handle); }

首先根据根据类型拿到相关类型缓存节点, 这里会根据不同的内存规格去找不同的对象, 我们简单回顾一下, 每个缓存对象都包含一个queue, queue中每个节点是entry, 每一个entry中包含一个chunk和handle, 可以指向唯一的连续的内存

我们跟到cache中

private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {     switch (sizeClass) {     case Normal:         return cacheForNormal(area, normCapacity);     case Small:         return cacheForSmall(area, normCapacity);     case Tiny:         return cacheForTiny(area, normCapacity);     default:         throw new Error();     } }

假设我们是tiny类型, 这里就会走到cacheForTiny(area, normCapacity)方法中, 跟进去:

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {      int idx = PoolArena.tinyIdx(normCapacity);     if (area.isDirect()) {         return cache(tinySubPageDirectCaches, idx);     }     return cache(tinySubPageHeapCaches, idx); }

这个方法我们之前剖析过, 就是根据大小找到第几个缓存中的第几个缓存, 拿到下标之后, 通过cache去超相对应的缓存对象:  

private static <T>  MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {     if (cache == null || idx > cache.length - 1) {         return null;     }     return cache[idx]; }

我们这里看到, 是直接通过下标拿的缓存对象

回到add方法中

boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {     //拿到MemoryRegionCache节点     MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);     if (cache == null) {         return false;     }     //将chunk, 和handle封装成实体加到queue里面     return cache.add(chunk, handle); }

这里的cache对象调用了一个add方法, 这个方法就是将chunk和handle封装成一个entry加到queue里面

我们跟到add方法中:

public final boolean add(PoolChunk<T> chunk, long handle) {     Entry<T> entry = newEntry(chunk, handle);      boolean queued = queue.offer(entry);     if (!queued) {         entry.recycle();     }     return queued; }

我们之前介绍过, 从在缓存中分配的时候从queue弹出一个entry, 会放到一个对象池里面, 而这里Entry<T> entry = newEntry(chunk, handle)就是从对象池里去取一个entry对象, 然后将chunk和handle进行赋值

然后通过queue.offer(entry)加到queue中

我们回到free方法中

void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {     //是否为unpooled     if (chunk.unpooled) {         int size = chunk.chunkSize();         destroyChunk(chunk);         activeBytesHuge.add(-size);         deallocationsHuge.increment();     } else {         //那种级别的Size         SizeClass sizeClass = sizeClass(normCapacity);         //加到缓存里         if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {             return;         }          freeChunk(chunk, handle, sizeClass);     } }

这里加到缓存之后, 如果成功, 就会return, 如果不成功, 就会调用freeChunk(chunk, handle, sizeClass)方法, 这个方法的意义是, 将原先给ByteBuf分配的内存区段标记为未使用

跟进freeChunk简单分析下:

void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {     final boolean destroyChunk;     synchronized (this) {         switch (sizeClass) {         case Normal:             ++deallocationsNormal;             break;         case Small:             ++deallocationsSmall;             break;         case Tiny:             ++deallocationsTiny;             break;         default:             throw new Error();         }         destroyChunk = !chunk.parent.free(chunk, handle);     }     if (destroyChunk) {         destroyChunk(chunk);     } }

我们再跟到free方法中:

boolean free(PoolChunk<T> chunk, long handle) {     chunk.free(handle);     if (chunk.usage() < minUsage) {         remove(chunk);         return move0(chunk);     }     return true; }

chunk.free(handle)的意思是通过chunk释放一段连续的内存

再跟到free方法中:

void free(long handle) {     int memoryMapIdx = memoryMapIdx(handle);     int bitmapIdx = bitmapIdx(handle);     if (bitmapIdx != 0) {          PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];         assert subpage != null && subpage.doNotDestroy;         PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);         synchronized (head) {             if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {                 return;             }         }     }     freeBytes += runLength(memoryMapIdx);     setValue(memoryMapIdx, depth(memoryMapIdx));     updateParentsFree(memoryMapIdx); }

 if (bitmapIdx != 0)这 里判断是当前缓冲区分配的级别是Page还是Subpage, 如果是Subpage, 则会找到相关的Subpage将其位图标记为0

如果不是subpage, 这里通过分配内存的反向标记, 将该内存标记为未使用

这段逻辑可以读者自行分析, 如果之前分配相关的知识掌握扎实的话, 这里的逻辑也不是很难

回到PooledByteBuf的deallocate方法中:

protected final void deallocate() {     if (handle >= 0) {         final long handle = this.handle;         this.handle = -1;         memory = null;         chunk.arena.free(chunk, handle, maxLength, cache);         recycle();     } }

到此,关于“Netty分布式ByteBuf使用的回收逻辑是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI