当前位置: 移动技术网 > IT编程>开发语言>Java > Java同步容器和并发容器

Java同步容器和并发容器

2019年04月05日  | 移动技术网IT编程  | 我要评论

同步容器

在 java 中,同步容器主要包括 2 类:

  • vector、stack、hashtablecollections 类中提供的静态工厂方法创建的类(由 collections.synchronizedxxxx 等方法)
    • vector 实现了 list 接口,vector 实际上就是一个数组,和 arraylist 类似,但是 vector 中的方法都是 synchronized 方法,即进行了同步措施。
    • stack 也是一个同步容器,它的方法也用 synchronized 进行了同步,它实际上是继承于 vector 类。
    • hashtable 实现了 map 接口,它和 hashmap 很相似,但是 hashtable 进行了同步处理,而 hashmap 没有。

同步容器的缺陷

同步容器的同步原理就是在方法上用 synchronized 修饰。那么,这些方法每次只允许一个线程调用执行。

性能问题

由于被 synchronized 修饰的方法,每次只允许一个线程执行,其他试图访问这个方法的线程只能等待。显然,这种方式比没有使用 synchronized 的容器性能要差。

安全问题

同步容器真的一定安全吗?

答案是:未必。同步容器未必真的安全。在做复合操作时,仍然需要加锁来保护。

常见复合操作如下:

  • 迭代:反复访问元素,直到遍历完全部元素;
  • 跳转:根据指定顺序寻找当前元素的下一个(下 n 个)元素;
  • 条件运算:例如若没有则添加等;
不安全的示例
public class test {
    static vector<integer> vector = new vector<integer>();
    public static void main(string[] args) throws interruptedexception {
        while(true) {
            for(int i=0;i<10;i++)
                vector.add(i);
            thread thread1 = new thread(){
                public void run() {
                    for(int i=0;i<vector.size();i++)
                        vector.remove(i);
                };
            };
            thread thread2 = new thread(){
                public void run() {
                    for(int i=0;i<vector.size();i++)
                        vector.get(i);
                };
            };
            thread1.start();
            thread2.start();
            while(thread.activecount()>10)   {

            }
        }
    }
}

 

执行时可能会出现数组越界错误。

vector 是线程安全的,为什么还会报这个错?很简单,对于 vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:

当某个线程在某个时刻执行这句时:

for(int i=0;i<vector.size();i++)
    vector.get(i);

 

假若此时 vector 的 size 方法返回的是 10,i 的值为 9

然后另外一个线程执行了这句:

for(int i=0;i<vector.size();i++)
    vector.remove(i);

 

将下标为 9 的元素删除了。

那么通过 get 方法访问下标为 9 的元素肯定就会出问题了。

安全示例

因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:

public class test {
    static vector<integer> vector = new vector<integer>();
    public static void main(string[] args) throws interruptedexception {
        while(true) {
            for(int i=0;i<10;i++)
                vector.add(i);
            thread thread1 = new thread(){
                public void run() {
                    synchronized (test.class) {   //进行额外的同步
                        for(int i=0;i<vector.size();i++)
                            vector.remove(i);
                    }
                };
            };
            thread thread2 = new thread(){
                public void run() {
                    synchronized (test.class) {
                        for(int i=0;i<vector.size();i++)
                            vector.get(i);
                    }
                };
            };
            thread1.start();
            thread2.start();
            while(thread.activecount()>10)   {

            }
        }
    }
}

 

concurrentmodificationexception 异常

在对 vector 等容器并发地进行迭代修改时,会报 concurrentmodificationexception 异常,关于这个异常将会在后续文章中讲述。

但是在并发容器中不会出现这个问题。

并发容器

jdk 的 java.util.concurrent 包(即 juc)中提供了几个非常有用的并发容器。

  • copyonwritearraylist - 线程安全的 arraylist
  • copyonwritearrayset - 线程安全的 set,它内部包含了一个 copyonwritearraylist,因此本质上是由 copyonwritearraylist 实现的。
  • concurrentskiplistset - 相当于线程安全的 treeset。它是有序的 set。它由 concurrentskiplistmap 实现。
  • concurrenthashmap - 线程安全的 hashmap。采用分段锁实现高效并发。
  • concurrentskiplistmap - 线程安全的有序 map。使用跳表实现高效并发。
  • concurrentlinkedqueue - 线程安全的无界队列。底层采用单链表。支持 fifo。
  • concurrentlinkeddeque - 线程安全的无界双端队列。底层采用双向链表。支持 fifo 和 filo。
  • arrayblockingqueue - 数组实现的阻塞队列。
  • linkedblockingqueue - 链表实现的阻塞队列。
  • linkedblockingdeque - 双向链表实现的双端阻塞队列。

concurrenthashmap

要点

  • 作用:concurrenthashmap 是线程安全的 hashmap。
  • 原理:jdk6 与 jdk7 中,concurrenthashmap 采用了分段锁机制。jdk8 中,摒弃了锁分段机制,改为利用 cas 算法。

源码

jdk7

concurrenthashmap 类在 jdk1.7 中的设计,其基本结构如图所示:

每一个 segment 都是一个 hashentry<k,v>[] table, table 中的每一个元素本质上都是一个 hashentry 的单向队列。比如 table[3]为首节点,table[3]->next 为节点 1,之后为节点 2,依次类推。

public class concurrenthashmap<k, v> extends abstractmap<k, v>
        implements concurrentmap<k, v>, serializable {

    // 将整个hashmap分成几个小的map,每个segment都是一个锁;与hashtable相比,这么设计的目的是对于put, remove等操作,可以减少并发冲突,对
    // 不属于同一个片段的节点可以并发操作,大大提高了性能
    final segment<k,v>[] segments;

    // 本质上segment类就是一个小的hashmap,里面table数组存储了各个节点的数据,继承了reentrantlock, 可以作为互拆锁使用
    static final class segment<k,v> extends reentrantlock implements serializable {
        transient volatile hashentry<k,v>[] table;
        transient int count;
    }

    // 基本节点,存储key, value值
    static final class hashentry<k,v> {
        final int hash;
        final k key;
        volatile v value;
        volatile hashentry<k,v> next;
    }
}

 

jdk8
  • jdk8 中主要做了 2 方面的改进
  • 取消 segments 字段,直接采用 transient volatile hashentry<k,v>[] table 保存数据,采用 table 数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。
  • 将原先 table 数组+单向链表的数据结构,变更为 table 数组+单向链表+红黑树的结构。对于 hash 表来说,最核心的能力在于将 key hash 之后能均匀的分布在数组中。如果 hash 之后散列的很均匀,那么 table 数组中的每个队列长度主要为 0 或者 1。但实际情况并非总是如此理想,虽然 concurrenthashmap 类默认的加载因子为 0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为 o(n);因此,对于个数超过 8(默认值)的列表,jdk1.8 中采用了红黑树的结构,那么查询的时间复杂度可以降低到 o(logn),可以改进性能。
final v putval(k key, v value, boolean onlyifabsent) {
    if (key == null || value == null) throw new nullpointerexception();
    int hash = spread(key.hashcode());
    int bincount = 0;
    for (node<k,v>[] tab = table;;) {
        node<k,v> f; int n, i, fh;
        // 如果table为空,初始化;否则,根据hash值计算得到数组索引i,如果tab[i]为空,直接新建节点node即可。注:tab[i]实质为链表或者红黑树的首节点。
        if (tab == null || (n = tab.length) == 0)
            tab = inittable();
        else if ((f = tabat(tab, i = (n - 1) & hash)) == null) {
            if (castabat(tab, i, null,
                         new node<k,v>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 如果tab[i]不为空并且hash值为moved,说明该链表正在进行transfer操作,返回扩容完成后的table。
        else if ((fh = f.hash) == moved)
            tab = helptransfer(tab, f);
        else {
            v oldval = null;
            // 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突
            synchronized (f) {
                if (tabat(tab, i) == f) {
                    if (fh >= 0) {
                        bincount = 1;
                        for (node<k,v> e = f;; ++bincount) {
                            k ek;
                            // 如果在链表中找到值为key的节点e,直接设置e.val = value即可。
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldval = e.val;
                                if (!onlyifabsent)
                                    e.val = value;
                                break;
                            }
                            // 如果没有找到值为key的节点,直接新建node并加入链表即可。
                            node<k,v> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new node<k,v>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 如果首节点为treebin类型,说明为红黑树结构,执行puttreeval操作。
                    else if (f instanceof treebin) {
                        node<k,v> p;
                        bincount = 2;
                        if ((p = ((treebin<k,v>)f).puttreeval(hash, key,
                                                       value)) != null) {
                            oldval = p.val;
                            if (!onlyifabsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (bincount != 0) {
                // 如果节点数>=8,那么转换链表结构为红黑树结构。
                if (bincount >= treeify_threshold)
                    treeifybin(tab, i);
                if (oldval != null)
                    return oldval;
                break;
            }
        }
    }
    // 计数增加1,有可能触发transfer操作(扩容)。
    addcount(1l, bincount);
    return null;
}

 

示例
public class concurrenthashmapdemo {

    public static void main(string[] args) throws interruptedexception {

        // hashmap 在并发迭代访问时会抛出 concurrentmodificationexception 异常
        // map<integer, character> map = new hashmap<>();
        map<integer, character> map = new concurrenthashmap<>();

        thread wthread = new thread(() -> {
            system.out.println("写操作线程开始执行");
            for (int i = 0; i < 26; i++) {
                map.put(i, (char) ('a' + i));
            }
        });
        thread rthread = new thread(() -> {
            system.out.println("读操作线程开始执行");
            for (integer key : map.keyset()) {
                system.out.println(key + " - " + map.get(key));
            }
        });
        wthread.start();
        rthread.start();
        thread.sleep(1000);
    }
}

 

copyonwritearraylist

要点

  • 作用:copyonwrite 字面意思为写入时复制。copyonwritearraylist 是线程安全的 arraylist。
  • 原理:
    • 在 copyonwriteaarraylist 中,读操作不同步,因为它们在内部数组的快照上工作,所以多个迭代器可以同时遍历而不会相互阻塞(1,2,4)。
    • 所有的写操作都是同步的。他们在备份数组(3)的副本上工作。写操作完成后,后备阵列将被替换为复制的阵列,并释放锁定。支持数组变得易变,所以替换数组的调用是原子(5)。
    • 写操作后创建的迭代器将能够看到修改的结构(6,7)。
    • 写时复制集合返回的迭代器不会抛出 concurrentmodificationexception,因为它们在数组的快照上工作,并且无论后续的修改(2,4)如何,都会像迭代器创建时那样完全返回元素。

源码

重要属性
  • lock - 执行写时复制操作,需要使用可重入锁加锁
  • array - 对象数组,用于存放元素
    /** the lock protecting all mutators */
    final transient reentrantlock lock = new reentrantlock();

    /** the array, accessed only via getarray/setarray. */
    private transient volatile object[] array;

 

重要方法
  • 添加操作
    • 添加的逻辑很简单,先将原容器 copy 一份,然后在新副本上执行写操作,之后再切换引用。当然此过程是要加锁的。
  • public boolean add(e e) {
        //reentrantlock加锁,保证线程安全
        final reentrantlock lock = this.lock;
        lock.lock();
        try {
            object[] elements = getarray();
            int len = elements.length;
            //拷贝原容器,长度为原容器长度加一
            object[] newelements = arrays.copyof(elements, len + 1);
            //在新副本上执行添加操作
            newelements[len] = e;
            //将原容器引用指向新副本
            setarray(newelements);
            return true;
        } finally {
            //解锁
            lock.unlock();
        }
    }

     

    删除操作
    • 删除操作同理,将除要删除元素之外的其他元素拷贝到新副本中,然后切换引用,将原容器引用指向新副本。同属写操作,需要加锁。
public e remove(int index) {
    //加锁
    final reentrantlock lock = this.lock;
    lock.lock();
    try {
        object[] elements = getarray();
        int len = elements.length;
        e oldvalue = get(elements, index);
        int nummoved = len - index - 1;
        if (nummoved == 0)
            //如果要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用
            setarray(arrays.copyof(elements, len - 1));
        else {
            //否则,将除要删除元素之外的其他元素拷贝到新副本中,并切换引用
            object[] newelements = new object[len - 1];
            system.arraycopy(elements, 0, newelements, 0, index);
            system.arraycopy(elements, index + 1, newelements, index,
                              nummoved);
            setarray(newelements);
        }
        return oldvalue;
    } finally {
        //解锁
        lock.unlock();
    }
}

 

  • 读操作
    • copyonwritearraylist 的读操作是不用加锁的,性能很高。
public e get(int index) {
    return get(getarray(), index);
}
private e get(object[] a, int index) {
    return (e) a[index];
}

 

示例

public class copyonwritearraylistdemo {

    static class readtask implements runnable {

        list<string> list;

        readtask(list<string> list) {
            this.list = list;
        }

        public void run() {
            for (string str : list) {
                system.out.println(str);
            }
        }
    }

    static class writetask implements runnable {

        list<string> list;
        int index;

        writetask(list<string> list, int index) {
            this.list = list;
            this.index = index;
        }

        public void run() {
            list.remove(index);
            list.add(index, "write_" + index);
        }
    }

    public void run() {
        final int num = 10;
        // arraylist 在并发迭代访问时会抛出 concurrentmodificationexception 异常
        // list<string> list = new arraylist<>();
        copyonwritearraylist<string> list = new copyonwritearraylist<>();
        for (int i = 0; i < num; i++) {
            list.add("main_" + i);
        }
        executorservice executorservice = executors.newfixedthreadpool(num);
        for (int i = 0; i < num; i++) {
            executorservice.execute(new readtask(list));
            executorservice.execute(new writetask(list, i));
        }
        executorservice.shutdown();
    }

    public static void main(string[] args) {
        new copyonwritearraylistdemo().run();
    }
}

免费java资料需要自己领取,涵盖了java、redis、mongodb、mysql、zookeeper、spring cloud、dubbo高并发分布式等教程。
传送门:https://mp.weixin.qq.com/s/jzddfh-7ynudmkjt0irl8q

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网