并发容器

最后更新于:2022-08-13 12:56:34

古老和过时的同步容器

Vector和Hashtable

性能不够好,对复合操作支持不够好,并发线程修改容器中的内容的时候,可能会抛出异常。

Vector演示

public class VectorDemo {
    public static void main(String[] args) {
        Vector vector = new Vector<>();
        vector.add("test");
        System.out.println(vector.get(0));
    }
}

结果

test

点击进入get源码

方法被synchronized修饰,也就意味着在这个类中的其他方法上如果也有synchronized修饰,那么他们就不能被多个线程同时执行的。然后我们随便找几个其他方法

可以看到有大部分的方法都被synchronized修饰,因为synchronized的性质,导致在并发量大的时候,没有办法做到性能特别好

Hashtable演示

public class HashtableDemo {
    public static void main(String[] args) {
        Hashtable hashtable = new Hashtable<>();
        hashtable.put("key", "hello word");
        System.out.println(hashtable.get("key"));
    }
}

结果

hello word

点击进入get源码

同样是被synchronized修饰,再看看其他方法

和其他方法一样,都是大多被synchronized修饰,那么结果同Vector的性质是一样的。

ArrayList和HashMap

线程不够安全,但是可以用
Colleations.synchronizedList(new ArrayList<>())和
Colleations.synchronizedMap(new HashMap<K,V>())使之变成线程安全

演示

public static void main(String[] args) {
      List list = Collections.synchronizedList(new ArrayList());
      list.add(5);
      System.out.println(list.get(0));
    }

结果

5

进入Collections.synchronizedList方法,查看它是如何让不安全的ArrayList变得安全的。

方法中提示如果可以转成RandomAccess,是的话,就会返回SynchronizedRandomAccessList,而Arraylist,是实现了RandomAccess的

RandomAccess随机访问,即支持跳着访问

进入SynchronizedRandomAccessLis,查看这个类做了什么事情

进入以后,发现这个类继承了SynchronizedList,继续查看SynchronizedList

查看后发现,里面的各个方法都是使用了synchronized,而且同步的都是同一个对象mutex,如此就保证了线程安全。但是,虽然同步的不是方法,但是性能也没有什么提高,所以使用Collections.synchronizedList,并没有比Vector和Hashtable高明。

比较不错的容器

ConcurrentHashMap和CopyOnWriteArrayList

绝大部分并发情况下,ConcurrentHashMap和CopyOnWriteArrayList的性能都更好。
如果一个list会被频繁的修改删除,那么用Collections.synchronizedList包装出来的ArrayList的性能会比CopyOnWriteArrayList好的多,因为CopyOnWriteArrayList适合读多写少的情况,每次写入都会完整复制整个列表。所以比较消耗资源。而ConcurrentHashMap是没有例外,任何场景下都优于Hashtable和HashMap。

ConcurrentHashMap

首先查看Map的主要实现

  1. HashMap

会根据键的hashcode存储,大多数时候可以算出hashcode值,所以可以直接定位到所有的值,所以访问速度是非常快的,允许键为null进行写入,对值不做限制,线程不安全,如果同时有多个线程进行操作,可能会造成数据不一致的情况出现,如果需要线程安全,可以使用Colleations.synchronizedMap进行包装

  1. Hashtable

历史遗留类,多数功能与HashMap相同,但属于线程安全的类,任何时候都只能有一个线程对他进行操作,所以性能不好

  1. linkedHashMap

HashMap的一个子类,保存了记录的插入顺序,遍历时的顺序和插入时的顺序是一样的

  1. TreeMap

实现了Sorted接口,所以拥有了根据键值进行排序的功能,默认时升序,可以自定义排序。所以遍历后的结果也是排过序的。

UML

可以看到Hashtable和HashMap直接就是Map的实现,而linkedHashMap继承了HashMap,并对其进行了升级。而TreeMap则是实现了SortMap,所以具有排序的功能。这些map都要求我们的键值是不可变对象,不可变即这个对象在创建后它的哈希值不会改变。因为这些map是通过哈希值去定位,一旦哈希值更改,也就无法定位正确的位置。

HashMap线程不安全的原因

  1. 同时put碰撞导致数据丢失

多个线程同时put相同的key值。最终只能保存一个数据,导致数据丢失

  1. 同时put扩容导致数据丢失

多个线程同时put,同时发现需要扩容,那么扩容之后,只有一个数组保存下来,导致数据丢失

  1. 死循环造成的CPU100%

https://coolshell.cn/articles/9606.html

HashMap关于并发的特点

  1. 非线程安全
  2. 迭代时不允许修改内容
  3. 只读的并发是安全的
  4. 如果一定要把HashMap用在并发环境,用Colleations.synchronizedMap(new HashMap())

JDK1.8的ConcurrentHashMap实现和分析

put
 /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // 判断key或者value是否为null,为null则抛错
        if (key == null || value == null) throw new NullPointerException();
        // 计算出key的hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        //在此for循环中完成添加操作
        for (Node[] tab = table;;) {
            Node f; int n, i, fh;
            // 判断table是否完成初始化
            if (tab == null || (n = tab.length) == 0)
                // 如果tab为null,或者长度为0则进行初始化
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 如果已经完成初始化,且当前位置为空,则直接通过cas将当前值存储
                if (casTabAt(tab, i, null,
                             new Node(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // MOVED特殊节点,代表当前节点正在扩容
            else if ((fh = f.hash) == MOVED)
                //进行帮助扩容或者转移的工作
                tab = helpTransfer(tab, f);
            else {
                // 当前槽点有值则进行以下操作
                V oldVal = null;
                // 添加锁,保证并发安全
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node e = f;; ++binCount) {
                                K ek;
                                // 根据hash值进行对应的操作
                                // 判断存在不存在当前的这个key
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    //找到对应的位置以后,将原来的值赋值给oldVal以供返回
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                // 运算到这里,说明当前key是一个新的值
                                Node pred = e;
                                if ((e = e.next) == null) {
                                    // 创建新的节点,并放到链表的最后
                                    pred.next = new Node(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            // 运算到这里,说明当前是一个红黑树
                            Node p;
                            binCount = 2;
                            // 把值放入红黑树中,并返回oldVal
                            if ((p = ((TreeBin)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 运算到这里,说明已经完成添加工作
                if (binCount != 0) {
                    // 判断是否要将链表转成树,TREEIFY_THRESHOLD为8
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
get
    public V get(Object key) {
        Node[] tab; Node e, p; int n, eh; K ek;
        // 获取key的hash值
        int h = spread(key.hashCode());
        //判断当前的table不等于null,且长度大于0,代表已经初始化完成
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //判断槽点的hash值是否符合
            if ((eh = e.hash) == h) {
                //如果hash值符合,且key符合则直接返回val
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                // 如果值为负数,则表示为一个红黑树或者转义节点,使用对应的方法去获得值
                return (p = e.find(h, key)) != null ? p.val : null;
            //如果既不是红黑树也不是转义节点,则代表是一个链表,则使用链表循环去获得结果并返回
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

ConcurrentHashMap的错误使用

ConcurrentHashMap是线程安全的,但是如果使用错误,也会导致变成线程不安全的

/**
 * 组合操作并不保证线程安全
 */
public class OptionsNotSafe implements Runnable {

    private static ConcurrentHashMap scores = new ConcurrentHashMap<>();

    public static void main(String[] args) throws InterruptedException {
        scores.put("小明", 0);
        Thread thread1 = new Thread(new OptionsNotSafe());
        Thread thread2 = new Thread(new OptionsNotSafe());
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(scores.get("小明"));
    }

    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            Integer score = scores.get("小明");
            Integer newScore = score + 1;
            scores.put("小明", newScore);
        }
    }
}

结果

1157

ConcurrentHashMap可以保证你同时get和put的时候线程是安全的,但是不能保证get以后做别的操作的时候的线程安全,能保证是你多个线程同时put的时候,内部的数据不会错乱,相对于HashMap是不同的。

这样的结果明显不是我们想要的,因为用了ConcurrentHashMap,不能保证我们的线程安全,那就没有必要用了啊。所以ConcurrentHashMap提供了一个保证线程安全的方法replace,代码做以下修改

public class OptionsNotSafe implements Runnable {

    private static ConcurrentHashMap scores = new ConcurrentHashMap<>();

    public static void main(String[] args) throws InterruptedException {
        scores.put("小明", 0);
        Thread thread1 = new Thread(new OptionsNotSafe());
        Thread thread2 = new Thread(new OptionsNotSafe());
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(scores.get("小明"));
    }

    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            while (true) {
                Integer score = scores.get("小明");
                Integer newScore = score + 1;
                if (scores.replace("小明", score, newScore)) {
                    break;
                }
            }
        }
    }
}

结果

2000

replace 传递的参数是key,当前值,和新的值,只有当当前值与map中保存的当前值一致时才会进行put操作,这样就保证了每个线程都会进行一次想要的put操作

CopyOnWriteArrayList

诞生原因

  1. 代替Vector和SynchronizedList,就和ConcurrentHashMap代替SynchronizedMap的原因一样
  2. Vector和SynchronizedList的锁的粒度太大,并发效率相对比较低,并且迭代时无法编辑
  3. Copy-On-Write并发容器还包括CopyOnWriteArraySet,用来替代同步Set
适用场景
  1. 读操作可以尽可能的块,而写即使慢一些也没有太大关系
  2. 读多写少:黑名单,每日更新;监听器:迭代操作远多于修改操作
读写规则
  1. 读写锁:读读共享,其他都互斥(写写互斥、读写互斥、写读互斥)
  2. CopyOnWriteArrayList读写锁规则升级:读取是完全不用加锁,并且更厉害的是,写入也不会阻塞读取操作,只有写入和写入之间需要同步等待

普通的list在循环过程中,是不允许做修改的,否则会抛异常,如以下代码

    public static void main(String[] args) {
        ArrayList list = new ArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Iterator iterator = list.iterator();
        while (iterator.hasNext()) {
            System.out.println("list is" + list);
            String next = iterator.next();
            System.out.println(next);
            if (next.equals("2")) {
                list.remove("5");
            }

            if (next.equals("3")) {
                list.add("3 found");
            }
        }
    }

结果

list is[1, 2, 3, 4, 5]
1
list is[1, 2, 3, 4, 5]
2
list is[1, 2, 3, 4, 5, 5]
Exception in thread "main" java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
    at java.util.ArrayList$Itr.next(ArrayList.java:859)

修改使用

   public static void main(String[] args) {
//        ArrayList list = new ArrayList<>();
        CopyOnWriteArrayList list = new CopyOnWriteArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Iterator iterator = list.iterator();
        while (iterator.hasNext()) {
            System.out.println("list is" + list);
            String next = iterator.next();
            System.out.println(next);
            if (next.equals("2")) {
                list.remove("5");
            }

            if (next.equals("3")) {
                list.add("3 found");
            }
        }
    }
}

结果

list is[1, 2, 3, 4, 5]
1
list is[1, 2, 3, 4, 5]
2
list is[1, 2, 3, 4]
3
list is[1, 2, 3, 4, 3 found]
4
list is[1, 2, 3, 4, 3 found]
5

根据结果可以得出:

  1. 允许在迭代过程中进行修改
  2. 最后list修改为了3 found 迭代器打印出了5,对修改结果无感知。这也就是CopyOnWriteArrayList的特性,修改与读取不共享,各论各的。
原理

CopyOnWrite:当内存里空间进行修改的时候,不直接进行修改,而是开辟一个新的空间,将当前空间的数据复制过去,然后修改当前新内存的数据,修改完毕以后再将原来空间的指针指向当前修改后的空间,原来的内存因为没有被引用,就会被垃圾回收机制给回收掉。这样在读的时候就不受限制了,因为你修改的时候,是完全新的空间,而读则是继续读旧的空间,两者之间是完全不干涉的。虽然数据上会有一定时间的过期,但是至少我们是可以同步操作的,提高了并发效率,所以CopyOnWriteArrayList是适合读多写少的情况。

创建新副本,读写分离:通过创建新的副本进行修改,读和写是完全不同的两个容器。

不可变原理:因为没有修改都是创建新的容器,所以对于旧的容器来说,就是不可变的,没有人去修改,只有读取,所以就一定是并发安全的.

迭代的时候:进行数据修改的时候,迭代器是不知道,会依然访问旧的list,而且也不会报错.

ArrayList报错原因

每次进行next的时候,会先调用以下checkForComodification,进入查看

判断当前修改的次数modCount,和我迭代之前修改的次数expectedModCount是否相等,如果不相等就会抛出这个异常

而expectedModCount,是我们创建这个迭代器的时候获取的修改次数,且不再修改了.而当你发生过修改的时候.我立刻就可以发现,然后抛出异常.

CopyOnWriteArrayList缺点

数据一致性问题:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果希望写入的数据,马上能读到,请不要使用CopyOnWrite容器

内存占用问题:因为CopyOnWrite的写是复写机制,所以在进行读写操作的时候,内存里会同时驻扎两个对象的内存。

源码分析

add

首先获取到锁,然后上锁,接着先拿到原来的数组及数组长度,接着复制一份新的数组并且长度加1,获取到新的数组后,将最新的值放入数据,再把引用指向最新的数组.

get

获取当前数组,然后再将对应下标的数据直接返回

并发队列

为什么使用队列

用队列可以在线程间传递数据:生产者消费者模式、银行转账
考虑锁等线程安全问题的重任从“你”转移到了“队列”上

并发队列简介

Queue

用来保存一组等待处理的数据,如:ConcurrentLinkedQueue(先进先出)、PriorityQueue(优先队列,不支持并发,但是可以根据内容进行排序)

BlockingQueue

增加了可阻塞功能的Queue,如果队列中没有数据,取操作就会一直阻塞,直到有了数据,如果队列中数据满了,插入操作也会一直阻塞,直到队列中有位置了为止

UML

JAVA提供了线程安全的队列,也可以称为并发队列,并发队列下又分为阻塞队列和非阻塞队列,其中阻塞队列的典型就是BlockingQueue,BlockingQueue下又有四种实现PriorityBlockingQueue、ArrayBlockingQueue、SynchronousQueue、LinkedBlockingQueue。非阻塞队列为ConcurrentLinkedQueue。

阻塞队列BlockingQueue

简介

阻塞队列是具有阻塞功能的队列,所以他首先是一个队列,其次是具有阻塞功能
通常,阻塞队列的一端是给生产者放数据用,另一端给消费者拿数据用。阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的

是否有界(容量有多大)

这是一个非常重要的属性,无界队列意味着里面可以容纳非常多(Integer.MAX_VALUE,约为2的31次方,是非常大的数。可以近似认为是无限容量)

和线程池的关系

阻塞队列是线程池的重要组成部分

最有特色的两个带有阻塞功能的方法

  1. take()

    获取并移除队列的头节点,一旦如果执行take的时候,队列里无数据,则阻塞,直到队列里有数据

  2. put()

    插入元素。但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间

主要的3组方法

  1. put,take
  2. add(添加元素,队列满了,继续添加会抛出异常),remove(队列为空,继续删除会抛出异常),element(返回队列的头元素,如果队列为空,也会抛出异常)
  3. offer(添加元素,队列满了会抛出一个false),poll(取出一个元素,如果队列为空,则返回null),peek(查看队列的头元素,不会取出,队列为空,返回的为null)

ArrayBlockingQueue

有界队列,创建时需要指定容量。还可以指定是否需要保证公平,如果想要保证公平的话,那么等待了最长时间的线程会被优先处理,不过会带来一定的性能损耗

案例

面试官需要面试10个人,每面试一个需要1s,等待区只有3个座位

代码

public class ArrayBlockingQueueDemo {

    public static void main(String[] args) {
        ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
        Interviewer r1 = new Interviewer(queue);
        Consumer r2 = new Consumer(queue);
        new Thread(r1).start();
        new Thread(r2).start();
    }

}

class Interviewer implements Runnable {
    ArrayBlockingQueue queue;

    public Interviewer(ArrayBlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("10个人都来了");
        try {
            for (int i = 0; i < 10; i++) {
                String candidate = "Candidate" + i;
                queue.put(candidate);
                System.out.println(candidate + "安排好了");
            }
            queue.put("stop");
            System.out.println("全部安排好了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {

    ArrayBlockingQueue queue;

    public Consumer(ArrayBlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        String msg;
        try {
            while (!(msg = queue.take()).equals("stop")) {
                System.out.println(msg + "面试开始");
                Thread.sleep(1000);
                System.out.println(msg + "面试结束");
            }
            System.out.println("面试全部结束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

结果

10个人都来了
Candidate0安排好了
Candidate1安排好了
Candidate2安排好了
Candidate0面试开始
Candidate3安排好了
Candidate0面试结束
Candidate1面试开始
Candidate4安排好了
Candidate1面试结束
Candidate2面试开始
Candidate5安排好了
Candidate2面试结束
Candidate3面试开始
Candidate6安排好了
Candidate3面试结束
Candidate4面试开始
Candidate7安排好了
Candidate4面试结束
Candidate5面试开始
Candidate8安排好了
Candidate5面试结束
Candidate6面试开始
Candidate9安排好了
Candidate6面试结束
Candidate7面试开始
全部安排好了
Candidate7面试结束
Candidate8面试开始
Candidate8面试结束
Candidate9面试开始
Candidate9面试结束
面试全部结束

源码
put

首先判断数据不为null,然后拿到锁,使用lock.lockInterruptibly()可中断锁,代表在put过程中,如果发生阻塞,那么是可以被中断的,如果发现队列已经满了,就会调用 notFull.await()进行阻塞,否则调用enqueue进行入队。最后解锁。

take

和put基本一致,count为0,则进行阻塞,否则调用dequeue出队

LinkedBlockingQueue

无界队列,最大容量为Integer.MAX_VALUE,内部结构为Node,两把锁

源码

可以看出take和put分别有自己的锁,代表两者可以分别进行,

同样是先判断数据是否为null,不为空就拿到锁,然后判断是否已经达到最大容量,是的话就阻塞,不是的话调用enqueue进行入队,入队完成后查看当前容量是否小于最大值,是则唤醒一个之前等待的线程进入工作状态

take同put基本一样,只是拿的是take锁

PriorityBlockingQueue

支持优先级,自然排序(不是先进先出),无界队列,当容量不够时会进行扩容,priorityQueue的线程安全版本
put时因为会扩容,所以不会阻塞,但是take时,如果容量为0,则会被阻塞

SynchronousQueue

容量为0,因为SynchronousQueue不需要去持有元素。它所作的就是直接传递
效率很高
没有peek等函数,因为peek是取出头节点,但是SynchronousQueue的容量是0,所以连头节点都没有,也就没有peek方法。同理没有iterate相关方法
是一个极好的用来直接传递的并发数据结构
SynchronousQueue是线程池Executors.newCachedThreadPool()使用的阻塞队列

非阻塞队列

并发包中的非阻塞队列只有ConcurrentLinkedQueue这一种,顾名思义ConcurrentLinkedQueue是使用链表座位其数据结构的,使用CAS非阻塞算法来实现线程安全(不具备阻塞功能),适合用在对性能要求较高的并发场景。用的相对较少

如何选择适合自己的队列

  1. 是否需要边界,无边界的又分为容量极大和自动扩容
  2. 空间 SynchronousQueue是没有容量的,ArrayBlockingQueue是有容量的,所以从内存考虑来说,SynchronousQueue会更加的整齐
  3. 吞吐量 从性能角度来讲LinkedBlockingQueue有两把锁,吞吐量大于ArrayBlockingQueue,因为锁的粒度更加的细致,而ArrayBlockingQueue属于更全面,稳定,基础的队列,如果不需要保存,直接交换数据即可,那就是SynchronousQueue更优