在 java 中,同步容器主要包括 2 类:
同步容器的同步原理就是在方法上用 synchronized
修饰。那么,这些方法每次只允许一个线程调用执行。
由于被 synchronized
修饰的方法,每次只允许一个线程执行,其他试图访问这个方法的线程只能等待。显然,这种方式比没有使用 synchronized
的容器性能要差。
同步容器真的一定安全吗?
答案是:未必。同步容器未必真的安全。在做复合操作时,仍然需要加锁来保护。
常见复合操作如下:
public class test { static vector<integer> vector = new vector<integer>(); public static void main(string[] args) throws interruptedexception { while(true) { for(int i=0;i<10;i++) vector.add(i); thread thread1 = new thread(){ public void run() { for(int i=0;i<vector.size();i++) vector.remove(i); }; }; thread thread2 = new thread(){ public void run() { for(int i=0;i<vector.size();i++) vector.get(i); }; }; thread1.start(); thread2.start(); while(thread.activecount()>10) { } } } }
执行时可能会出现数组越界错误。
vector 是线程安全的,为什么还会报这个错?很简单,对于 vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:
当某个线程在某个时刻执行这句时:
for(int i=0;i<vector.size();i++) vector.get(i);
假若此时 vector 的 size 方法返回的是 10,i 的值为 9
然后另外一个线程执行了这句:
for(int i=0;i<vector.size();i++) vector.remove(i);
将下标为 9 的元素删除了。
那么通过 get 方法访问下标为 9 的元素肯定就会出问题了。
因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:
public class test { static vector<integer> vector = new vector<integer>(); public static void main(string[] args) throws interruptedexception { while(true) { for(int i=0;i<10;i++) vector.add(i); thread thread1 = new thread(){ public void run() { synchronized (test.class) { //进行额外的同步 for(int i=0;i<vector.size();i++) vector.remove(i); } }; }; thread thread2 = new thread(){ public void run() { synchronized (test.class) { for(int i=0;i<vector.size();i++) vector.get(i); } }; }; thread1.start(); thread2.start(); while(thread.activecount()>10) { } } } }
在对 vector 等容器并发地进行迭代修改时,会报 concurrentmodificationexception 异常,关于这个异常将会在后续文章中讲述。
但是在并发容器中不会出现这个问题。
jdk 的 java.util.concurrent
包(即 juc)中提供了几个非常有用的并发容器。
concurrenthashmap 类在 jdk1.7 中的设计,其基本结构如图所示:
每一个 segment 都是一个 hashentry<k,v>[] table, table 中的每一个元素本质上都是一个 hashentry 的单向队列。比如 table[3]为首节点,table[3]->next 为节点 1,之后为节点 2,依次类推。
public class concurrenthashmap<k, v> extends abstractmap<k, v> implements concurrentmap<k, v>, serializable { // 将整个hashmap分成几个小的map,每个segment都是一个锁;与hashtable相比,这么设计的目的是对于put, remove等操作,可以减少并发冲突,对 // 不属于同一个片段的节点可以并发操作,大大提高了性能 final segment<k,v>[] segments; // 本质上segment类就是一个小的hashmap,里面table数组存储了各个节点的数据,继承了reentrantlock, 可以作为互拆锁使用 static final class segment<k,v> extends reentrantlock implements serializable { transient volatile hashentry<k,v>[] table; transient int count; } // 基本节点,存储key, value值 static final class hashentry<k,v> { final int hash; final k key; volatile v value; volatile hashentry<k,v> next; } }
transient volatile hashentry<k,v>[] table
保存数据,采用 table 数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。final v putval(k key, v value, boolean onlyifabsent) { if (key == null || value == null) throw new nullpointerexception(); int hash = spread(key.hashcode()); int bincount = 0; for (node<k,v>[] tab = table;;) { node<k,v> f; int n, i, fh; // 如果table为空,初始化;否则,根据hash值计算得到数组索引i,如果tab[i]为空,直接新建节点node即可。注:tab[i]实质为链表或者红黑树的首节点。 if (tab == null || (n = tab.length) == 0) tab = inittable(); else if ((f = tabat(tab, i = (n - 1) & hash)) == null) { if (castabat(tab, i, null, new node<k,v>(hash, key, value, null))) break; // no lock when adding to empty bin } // 如果tab[i]不为空并且hash值为moved,说明该链表正在进行transfer操作,返回扩容完成后的table。 else if ((fh = f.hash) == moved) tab = helptransfer(tab, f); else { v oldval = null; // 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突 synchronized (f) { if (tabat(tab, i) == f) { if (fh >= 0) { bincount = 1; for (node<k,v> e = f;; ++bincount) { k ek; // 如果在链表中找到值为key的节点e,直接设置e.val = value即可。 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldval = e.val; if (!onlyifabsent) e.val = value; break; } // 如果没有找到值为key的节点,直接新建node并加入链表即可。 node<k,v> pred = e; if ((e = e.next) == null) { pred.next = new node<k,v>(hash, key, value, null); break; } } } // 如果首节点为treebin类型,说明为红黑树结构,执行puttreeval操作。 else if (f instanceof treebin) { node<k,v> p; bincount = 2; if ((p = ((treebin<k,v>)f).puttreeval(hash, key, value)) != null) { oldval = p.val; if (!onlyifabsent) p.val = value; } } } } if (bincount != 0) { // 如果节点数>=8,那么转换链表结构为红黑树结构。 if (bincount >= treeify_threshold) treeifybin(tab, i); if (oldval != null) return oldval; break; } } } // 计数增加1,有可能触发transfer操作(扩容)。 addcount(1l, bincount); return null; }
示例
public class concurrenthashmapdemo { public static void main(string[] args) throws interruptedexception { // hashmap 在并发迭代访问时会抛出 concurrentmodificationexception 异常 // map<integer, character> map = new hashmap<>(); map<integer, character> map = new concurrenthashmap<>(); thread wthread = new thread(() -> { system.out.println("写操作线程开始执行"); for (int i = 0; i < 26; i++) { map.put(i, (char) ('a' + i)); } }); thread rthread = new thread(() -> { system.out.println("读操作线程开始执行"); for (integer key : map.keyset()) { system.out.println(key + " - " + map.get(key)); } }); wthread.start(); rthread.start(); thread.sleep(1000); } }
/** the lock protecting all mutators */ final transient reentrantlock lock = new reentrantlock(); /** the array, accessed only via getarray/setarray. */ private transient volatile object[] array;
public boolean add(e e) { //reentrantlock加锁,保证线程安全 final reentrantlock lock = this.lock; lock.lock(); try { object[] elements = getarray(); int len = elements.length; //拷贝原容器,长度为原容器长度加一 object[] newelements = arrays.copyof(elements, len + 1); //在新副本上执行添加操作 newelements[len] = e; //将原容器引用指向新副本 setarray(newelements); return true; } finally { //解锁 lock.unlock(); } }
删除操作
public e remove(int index) { //加锁 final reentrantlock lock = this.lock; lock.lock(); try { object[] elements = getarray(); int len = elements.length; e oldvalue = get(elements, index); int nummoved = len - index - 1; if (nummoved == 0) //如果要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用 setarray(arrays.copyof(elements, len - 1)); else { //否则,将除要删除元素之外的其他元素拷贝到新副本中,并切换引用 object[] newelements = new object[len - 1]; system.arraycopy(elements, 0, newelements, 0, index); system.arraycopy(elements, index + 1, newelements, index, nummoved); setarray(newelements); } return oldvalue; } finally { //解锁 lock.unlock(); } }
public e get(int index) { return get(getarray(), index); } private e get(object[] a, int index) { return (e) a[index]; }
public class copyonwritearraylistdemo { static class readtask implements runnable { list<string> list; readtask(list<string> list) { this.list = list; } public void run() { for (string str : list) { system.out.println(str); } } } static class writetask implements runnable { list<string> list; int index; writetask(list<string> list, int index) { this.list = list; this.index = index; } public void run() { list.remove(index); list.add(index, "write_" + index); } } public void run() { final int num = 10; // arraylist 在并发迭代访问时会抛出 concurrentmodificationexception 异常 // list<string> list = new arraylist<>(); copyonwritearraylist<string> list = new copyonwritearraylist<>(); for (int i = 0; i < num; i++) { list.add("main_" + i); } executorservice executorservice = executors.newfixedthreadpool(num); for (int i = 0; i < num; i++) { executorservice.execute(new readtask(list)); executorservice.execute(new writetask(list, i)); } executorservice.shutdown(); } public static void main(string[] args) { new copyonwritearraylistdemo().run(); } }
免费java资料需要自己领取,涵盖了java、redis、mongodb、mysql、zookeeper、spring cloud、dubbo高并发分布式等教程。
传送门:https://mp.weixin.qq.com/s/jzddfh-7ynudmkjt0irl8q
如对本文有疑问, 点击进行留言回复!!
springboot利用profile配置文件进行多环境切换
如何使用MyBatis-Plus代码生成器(逆向工程)一键生成代码
网友评论