当前位置: 移动技术网 > IT编程>开发语言>Java > Netty中FastThreadLocal源码分析

Netty中FastThreadLocal源码分析

2019年06月02日  | 移动技术网IT编程  | 我要评论

慢摇舞曲下载,慷慨解囊的意思,一千新闻网

netty中使用fastthreadlocal替代jdk中的threadlocal【java】threadlocal源码分析,其用法和threadlocal 一样,只不过从名字fastthreadlocal来看,其处理效率要比jdk中的threadlocal要高

在类加载的时候,先初始化了一个静态成员:

1 private static final int variablestoremoveindex = internalthreadlocalmap.nextvariableindex();

实际上fastthreadlocal的操作都是通过对internalthreadlocalmap的操作来实现的,

而internalthreadlocalmap是unpaddedinternalthreadlocalmap的子类,unpaddedinternalthreadlocalmap的定义比较简单:

 1 class unpaddedinternalthreadlocalmap {
 2     static final threadlocal<internalthreadlocalmap> slowthreadlocalmap = new threadlocal();
 3     static final atomicinteger nextindex = new atomicinteger();
 4     object[] indexedvariables;
 5     int futurelistenerstackdepth;
 6     int localchannelreaderstackdepth;
 7     map<class<?>, boolean> handlersharablecache;
 8     integerholder counterhashcode;
 9     threadlocalrandom random;
10     map<class<?>, typeparametermatcher> typeparametermatchergetcache;
11     map<class<?>, map<string, typeparametermatcher>> typeparametermatcherfindcache;
12     stringbuilder stringbuilder;
13     map<charset, charsetencoder> charsetencodercache;
14     map<charset, charsetdecoder> charsetdecodercache;
15     arraylist<object> arraylist;
16 
17     unpaddedinternalthreadlocalmap(object[] indexedvariables) {
18         this.indexedvariables = indexedvariables;
19     }
20 }

可以看到在类加载时,会初始化一个泛型为internalthreadlocalmap的jdk的threadlocal对象作为其静态成员slowthreadlocalmap ,还有一个原子化的integer静态成员nextindex

internalthreadlocalmap的定义如下:

1 public final class internalthreadlocalmap extends unpaddedinternalthreadlocalmap {
2     private static final internallogger logger = internalloggerfactory.getinstance(internalthreadlocalmap.class);
3     private static final int default_array_list_initial_capacity = 8;
4     private static final int string_builder_initial_size = systempropertyutil.getint("io.netty.threadlocalmap.stringbuilder.initialsize", 1024);
5     private static final int string_builder_max_size;
6     public static final object unset = new object();
7     private bitset cleanerflags;

internalthreadlocalmap的nextvariableindex方法:

1 public static int nextvariableindex() {
2     int index = nextindex.getandincrement();
3     if (index < 0) {
4         nextindex.decrementandget();
5         throw new illegalstateexception("too many thread-local indexed variables");
6     } else {
7         return index;
8     }
9 }

这是一个cas滞后自增操作,获取nextindex自增前的值,那么variablestoremoveindex初始化时就是0,且恒为0,nextindex此时变成了1

fastthreadlocal对象的初始化:

1 private final int index = internalthreadlocalmap.nextvariableindex();
2 
3 public fastthreadlocal() {
4 }

由上面可知,index成员恒等于nextvariableindex的返回值,nextindex 的cas操作保障了每个fastthreadlocal对象的index是不同的

首先看到set方法:

 1 public final void set(v value) {
 2     if (value != internalthreadlocalmap.unset) {
 3         internalthreadlocalmap threadlocalmap = internalthreadlocalmap.get();
 4         if (this.setknownnotunset(threadlocalmap, value)) {
 5             this.registercleaner(threadlocalmap);
 6         }
 7     } else {
 8         this.remove();
 9     }
10 
11 }

只要set的value不是internalthreadlocalmap.unset,会先调用internalthreadlocalmap的get方法:

1 public static internalthreadlocalmap get() {
2     thread thread = thread.currentthread();
3     return thread instanceof fastthreadlocalthread ? fastget((fastthreadlocalthread)thread) : slowget();
4 }

判断当前线程是否是fastthreadlocalthread,是则调用fastget,否则调用slowget
fastthreadlocalthread是经过包装后的thread:

 1 public class fastthreadlocalthread extends thread {
 2     private final boolean cleanupfastthreadlocals;
 3     private internalthreadlocalmap threadlocalmap;
 4 
 5     public fastthreadlocalthread() {
 6         this.cleanupfastthreadlocals = false;
 7     }
 8 
 9     public fastthreadlocalthread(runnable target) {
10         super(fastthreadlocalrunnable.wrap(target));
11         this.cleanupfastthreadlocals = true;
12     }
13 
14     public fastthreadlocalthread(threadgroup group, runnable target) {
15         super(group, fastthreadlocalrunnable.wrap(target));
16         this.cleanupfastthreadlocals = true;
17     }
18 
19     public fastthreadlocalthread(string name) {
20         super(name);
21         this.cleanupfastthreadlocals = false;
22     }
23 
24     public fastthreadlocalthread(threadgroup group, string name) {
25         super(group, name);
26         this.cleanupfastthreadlocals = false;
27     }
28 
29     public fastthreadlocalthread(runnable target, string name) {
30         super(fastthreadlocalrunnable.wrap(target), name);
31         this.cleanupfastthreadlocals = true;
32     }
33 
34     public fastthreadlocalthread(threadgroup group, runnable target, string name) {
35         super(group, fastthreadlocalrunnable.wrap(target), name);
36         this.cleanupfastthreadlocals = true;
37     }
38 
39     public fastthreadlocalthread(threadgroup group, runnable target, string name, long stacksize) {
40         super(group, fastthreadlocalrunnable.wrap(target), name, stacksize);
41         this.cleanupfastthreadlocals = true;
42     }
43 
44     public final internalthreadlocalmap threadlocalmap() {
45         return this.threadlocalmap;
46     }
47 
48     public final void setthreadlocalmap(internalthreadlocalmap threadlocalmap) {
49         this.threadlocalmap = threadlocalmap;
50     }
51 
52     public boolean willcleanupfastthreadlocals() {
53         return this.cleanupfastthreadlocals;
54     }
55 
56     public static boolean willcleanupfastthreadlocals(thread thread) {
57         return thread instanceof fastthreadlocalthread && ((fastthreadlocalthread)thread).willcleanupfastthreadlocals();
58     }
59 }

如果看过我之前写的threadlocal源码分析,看到这就明白,jdk的threadlocal中很重要的一点是在thread类中有一个threadlocalmap类型的成员,每个线程都维护这一张threadlocalmap,通过threadlocalmap来和threadlocal对象产生映射关系;而这里和jdk同理绑定的就是internalthreadlocalmap。

fastget方法:

1 private static internalthreadlocalmap fastget(fastthreadlocalthread thread) {
2    internalthreadlocalmap threadlocalmap = thread.threadlocalmap();
3     if (threadlocalmap == null) {
4         thread.setthreadlocalmap(threadlocalmap = new internalthreadlocalmap());
5     }
6 
7     return threadlocalmap;
8 }

这里也和jdk的threadlocal类似,判断fastthreadlocalthread 线程的threadlocalmap成员是否为null,若是null,则先创建一个internalthreadlocalmap实例:

1 private internalthreadlocalmap() {
2     super(newindexedvariabletable());
3 }

先调用newindexedvariabletable方法:

1 private static object[] newindexedvariabletable() {
2     object[] array = new object[32];
3     arrays.fill(array, unset);
4     return array;
5 }

创建了一个大小为32的数组,并且用unset这个object填充了整个数组,然后调用unpaddedinternalthreadlocalmap的构造,令indexedvariables成员保存该数组

再来看slowget方法:

 1 private static internalthreadlocalmap slowget() {
 2     threadlocal<internalthreadlocalmap> slowthreadlocalmap = unpaddedinternalthreadlocalmap.slowthreadlocalmap;
 3     internalthreadlocalmap ret = (internalthreadlocalmap)slowthreadlocalmap.get();
 4     if (ret == null) {
 5         ret = new internalthreadlocalmap();
 6         slowthreadlocalmap.set(ret);
 7     }
 8 
 9     return ret;
10 }

可以看到,其实这里为了提高效率,并没有直接使用jdk的threadlocal,而是给当前非fastthreadlocalthread线程绑定了一个threadlocal<internalthreadlocalmap>对象,避免直接使用jdk的threadlocal效率低。

回到fastthreadlocal的set方法,在取得到了当前线程的internalthreadlocalmap成员后,调用setknownnotunset方法:

1 private boolean setknownnotunset(internalthreadlocalmap threadlocalmap, v value) {
2     if (threadlocalmap.setindexedvariable(this.index, value)) {
3         addtovariablestoremove(threadlocalmap, this);
4         return true;
5     } else {
6         return false;
7     }
8 }

首先调用了internalthreadlocalmap的setindexedvariable方法:

 1 public boolean setindexedvariable(int index, object value) {
 2     object[] lookup = this.indexedvariables;
 3     if (index < lookup.length) {
 4         object oldvalue = lookup[index];
 5         lookup[index] = value;
 6         return oldvalue == unset;
 7     } else {
 8         this.expandindexedvariabletableandset(index, value);
 9         return true;
10     }
11 }

因为index是不可更改的常量,所以这里有两种情况:
当indexedvariables这个object数组的长度大于index时,直接将value放在indexedvariables数组下标为index的位置,返回oldvalue是否等于unset,若是不等于unset,说明已经set过了,直进行替换,若是等于unset,还要进行后续的registercleaner
当indexedvariables这个object数组的长度小于等于index时,调用expandindexedvariabletableandset方法扩容

expandindexedvariabletableandset方法:

 1 private void expandindexedvariabletableandset(int index, object value) {
 2     object[] oldarray = this.indexedvariables;
 3     int oldcapacity = oldarray.length;
 4     int newcapacity = index | index >>> 1;
 5     newcapacity |= newcapacity >>> 2;
 6     newcapacity |= newcapacity >>> 4;
 7     newcapacity |= newcapacity >>> 8;
 8     newcapacity |= newcapacity >>> 16;
 9     ++newcapacity;
10     object[] newarray = arrays.copyof(oldarray, newcapacity);
11     arrays.fill(newarray, oldcapacity, newarray.length, unset);
12     newarray[index] = value;
13     this.indexedvariables = newarray;
14 }

如果读过hashmap源码的话对上述的位运算操作因该不陌生,这个位运算产生的newcapacity的值是大于oldcapacity的最小的二的整数幂(【java】hashmap中的tablesizefor方法

然后申请一个newcapacity大小的数组,将原数组的内容拷贝到新数组,并且用unset填充剩余部分,还是将value放在下标为index的位置,用indexedvariables保存新数组。

setindexedvariable成立后,setknownnotunset继续调用addtovariablestoremove方法:

 1 private static void addtovariablestoremove(internalthreadlocalmap threadlocalmap, fastthreadlocal<?> variable) {
 2     object v = threadlocalmap.indexedvariable(variablestoremoveindex);
 3     set variablestoremove;
 4     if (v != internalthreadlocalmap.unset && v != null) {
 5         variablestoremove = (set)v;
 6     } else {
 7         variablestoremove = collections.newsetfrommap(new identityhashmap());
 8         threadlocalmap.setindexedvariable(variablestoremoveindex, variablestoremove);
 9     }
10 
11     variablestoremove.add(variable);
12 }

上面说过variablestoremoveindex恒为0,调用internalthreadlocalmap的indexedvariable方法:

1 public object indexedvariable(int index) {
2     object[] lookup = this.indexedvariables;
3     return index < lookup.length ? lookup[index] : unset;
4 }

由于variablestoremoveindex恒等于0,所以这里判断indexedvariables这个object数组是否为空,若是为空,则返回第0个元素,若不是则返回unset

在addtovariablestoremove中,接着对indexedvariables的返回值进行了判断,
判断不是unset,并且不等于null,则说明是set过的,然后将刚才的返回值强转为set类型
若上述条件不成立,创建一个identityhashmap,将其包装成set赋值给variablestoremove,然后调用internalthreadlocalmap的setindexedvariable方法,这里就和上面不一样了,上面是将value放在下标为index的位置,而这里是将set放在下标为0的位置。

看到这,再结合上面来看,其实已经有一个大致的想法了,一开始在set时,是将value放在internalthreadlocalmap的object数组下标为index的位置,然后在这里获取下标为0的set,说明value是暂时放在下标为index的位置,然后判断下标为0的位置有没有set,若是有,取出这个set ,将当前fastthreadlocal对象放入set中,则说明这个set中存放的是fastthreadlocal集合
那么就有如下关系:

回到fastthreadlocal的set方法,在setknownnotunset成立后,调用registercleaner方法:

1 private void registercleaner(internalthreadlocalmap threadlocalmap) {
2     thread current = thread.currentthread();
3     if (!fastthreadlocalthread.willcleanupfastthreadlocals(current) && !threadlocalmap.iscleanerflagset(this.index)) {
4         threadlocalmap.setcleanerflag(this.index);
5     }
6 }

willcleanupfastthreadlocals的返回值在前面fastthreadlocalthread的初始化时就确定了,看到iscleanerflagset方法:

1 public boolean iscleanerflagset(int index) {
2     return this.cleanerflags != null && this.cleanerflags.get(index);
3 }

cleanerflags 是一个bitset对象,在internalthreadlocalmap初始化时是null,
若不是第一次的set操作,则根据index,获取index在bitset对应位的值

这里使用bitset,使其持有的位和indexedvariables这个object数组形成了一一对应关系,每一位都是0和1代表当前indexedvariables的对应下标位置的使用情况,0表示没有使用对应unset,1则代表有value

在上面条件成立的情况下,调用setcleanerflag方法:

1 public void setcleanerflag(int index) {
2     if (this.cleanerflags == null) {
3         this.cleanerflags = new bitset();
4     }
5 
6     this.cleanerflags.set(index);
7 }

逻辑比较简单,判断cleanerflags是否初始化,若没有,则立即初始化,再将cleanerflags中对应index位的值设为1;

这里通过registercleaner直接标记了所有set了value的下标可,为以后的removeall 清除提高效率。

下来看fastthreadlocal的get方法:

 1 public final v get() {
 2     internalthreadlocalmap threadlocalmap = internalthreadlocalmap.get();
 3     object v = threadlocalmap.indexedvariable(this.index);
 4     if (v != internalthreadlocalmap.unset) {
 5         return v;
 6     } else {
 7         v value = this.initialize(threadlocalmap);
 8         this.registercleaner(threadlocalmap);
 9         return value;
10     }
11 }

和上面一样,先取得当前线程持有的internalthreadlocalmap ,调用indexedvariable方法,根据当前fastthreadlocal的index定位,判断是否是unset(set过),若没有set过则和jdk一样调用initialize先set:

 1 private v initialize(internalthreadlocalmap threadlocalmap) {
 2     object v = null;
 3 
 4     try {
 5         v = this.initialvalue();
 6     } catch (exception var4) {
 7         platformdependent.throwexception(var4);
 8     }
 9 
10     threadlocalmap.setindexedvariable(this.index, v);
11     addtovariablestoremove(threadlocalmap, this);
12     return v;
13 }

initialvalue()方法就是对外提供的,需要手动覆盖:

1 protected v initialvalue() throws exception {
2     return null;
3 }

后面的操作就和set的逻辑一样。

 

remove方法:

1 public final void remove() {
2     this.remove(internalthreadlocalmap.getifset());
3 }

getifset方法:

1 public static internalthreadlocalmap getifset() {
2     thread thread = thread.currentthread();
3     return thread instanceof fastthreadlocalthread ? ((fastthreadlocalthread)thread).threadlocalmap() : (internalthreadlocalmap)slowthreadlocalmap.get();
4 }

和上面的get方法思路相似,只不过在这里如果获取不到不会创建
然后调用remove重载:

 1 public final void remove(internalthreadlocalmap threadlocalmap) {
 2     if (threadlocalmap != null) {
 3         object v = threadlocalmap.removeindexedvariable(this.index);
 4         removefromvariablestoremove(threadlocalmap, this);
 5         if (v != internalthreadlocalmap.unset) {
 6             try {
 7                 this.onremoval(v);
 8             } catch (exception var4) {
 9                 platformdependent.throwexception(var4);
10             }
11         }
12 
13     }
14 }

先检查threadlocalmap是否存在,若存在才进行后续操作:
调用removeindexedvariable方法:

 1 public object removeindexedvariable(int index) {
 2     object[] lookup = this.indexedvariables;
 3     if (index < lookup.length) {
 4         object v = lookup[index];
 5         lookup[index] = unset;
 6         return v;
 7     } else {
 8         return unset;
 9     }
10 }

和之前的setindexedvariable逻辑相似,只不过现在是把index位置的元素设置为unset

接着调用removefromvariablestoremove方法:

1 private static void removefromvariablestoremove(internalthreadlocalmap threadlocalmap, fastthreadlocal<?> variable) {
2     object v = threadlocalmap.indexedvariable(variablestoremoveindex);
3     if (v != internalthreadlocalmap.unset && v != null) {
4         set<fastthreadlocal<?>> variablestoremove = (set)v;
5         variablestoremove.remove(variable);
6     }
7 }

之前说过variablestoremoveindex恒为0,在object数组中下标为0存储的set<fastthreadlocal<?>>集合(不为unset情况下),从集合中,将当前fastthreadlocal移除掉
最后调用了onremoval方法,该方法需要由用户去覆盖:

1 protected void onremoval(v value) throws exception {
2 }


removeall方法,是一个静态方法:

 1 public static void removeall() {
 2     internalthreadlocalmap threadlocalmap = internalthreadlocalmap.getifset();
 3     if (threadlocalmap != null) {
 4         try {
 5             object v = threadlocalmap.indexedvariable(variablestoremoveindex);
 6             if (v != null && v != internalthreadlocalmap.unset) {
 7                 set<fastthreadlocal<?>> variablestoremove = (set)v;
 8                 fastthreadlocal<?>[] variablestoremovearray = (fastthreadlocal[])variablestoremove.toarray(new fastthreadlocal[0]);
 9                 fastthreadlocal[] var4 = variablestoremovearray;
10                 int var5 = variablestoremovearray.length;
11 
12                 for(int var6 = 0; var6 < var5; ++var6) {
13                     fastthreadlocal<?> tlv = var4[var6];
14                     tlv.remove(threadlocalmap);
15                 }
16             }
17         } finally {
18             internalthreadlocalmap.remove();
19         }
20 
21     }
22 }

首先获取当前线程的internalthreadlocalmap,若是存在继续后续操作:
通过indexedvariable方法,取出object数组中下标为0的set集合(如果不是unset情况下),将其转换为fastthreadlocal数组,遍历这个数组调用上面的remove方法。

fastthreadlocal源码分析到此结束。

 

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网