并发容器
a简介
java.util 包下提供了一些容器类(集合框架),其中 Vector 和 Hashtable 是线程安全的,但实现方式比较粗暴,通过在方法上加「sychronized」关键字实现。
但即便是 Vector 这样线程安全的类,在应对多线程的复合操作时也需要在客户端继续加锁以保证原子性。

。
阻塞队列
我们假设一种场景,生产者一直生产资源,消费者一直消费资源(后面会细讲,戳链接直达),资源存储在一个缓冲池中,生产者将生产的资源存进缓冲池中,消费者从缓冲池中拿到资源进行消费,这就是大名鼎鼎的生产者-消费者模式。
该模式能够简化开发过程,一方面消除了生产者类与消费者类之间的代码依赖性,另一方面将生产数据的过程与使用数据的过程解耦简化负载。
我们自己 coding 实现这个模式的时候,因为需要让多个线程操作共享变量(即资源),所以很容易引发线程安全问题,造成重复消费和死锁,尤其是生产者和消费者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者,这些个等待-唤醒逻辑都需要自己实现。
这么容易出错的事情,JDK 当然帮我们做啦,这就是阻塞队列(BlockingQueue),你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。
BlockingQueue 是 Java util.concurrent 包下重要的数据结构,区别于普通的队列,BlockingQueue 提供了线程安全的队列访问方式,并发包下很多高级同步类的实现都是基于 BlockingQueue 实现的。
BlockingQueue 一般用于生产者-消费者模式,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。BlockingQueue 就是存放元素的容器。
BlockingQueue 的操作方法
阻塞队列提供了四组不同的方法用于插入、移除、检查元素:
| 方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
|---|---|---|---|---|
| 插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除方法 | remove() | poll() | take() | poll(time,unit) |
| 检查方法 | element() | peek() | - | - |
- 抛出异常:如果操作无法立即执行,会抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出
IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常 。 - 返回特殊值:如果操作无法立即执行,会返回一个特殊值,通常是 true / false。
- 一直阻塞:如果操作无法立即执行,则一直阻塞或者响应中断。
- 超时退出:如果操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功,通常是 true / false。
注意:
- 不能往阻塞队列中插入 null,会抛出空指针异常。
- 可以访问阻塞队列中的任意元素,调用
remove(o)可以将队列之中的特定对象移除,但并不高效,尽量避免使用。
我们会在后面单独开一篇BlockingQueue来细讲,戳链接直达。
BlockingQueue 的实现类
ArrayBlockingQueue
由数组结构组成的有界阻塞队列。内部结构是数组,具有数组的特性。
1 | |
可以初始化队列大小,一旦初始化将不能改变。构造方法中的 fair 表示控制对象的内部锁是否采用公平锁,默认是非公平锁。
LinkedBlockingQueue
由链表结构组成的有界阻塞队列。内部结构是链表,具有链表的特性。默认队列的大小是Integer.MAX_VALUE,也可以指定大小。此队列按照先进先出的原则对元素进行排序。
DelayQueue
该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。
DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
PriorityBlockingQueue
基于优先级的无界阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),内部控制线程同步的锁采用的是非公平锁。
网上大部分博客上PriorityBlockingQueue为公平锁,其实是不对的,查阅源码(感谢 github:ambition0802同学的指出):
1 | |
SynchronousQueue
这个队列比较特殊,没有任何内部容量,甚至连一个队列的容量都没有。并且每个 put 必须等待一个 take,反之亦然。
需要区别容量为 1 的 ArrayBlockingQueue、LinkedBlockingQueue。
以下方法的返回值,可以帮助理解这个队列:
iterator()永远返回空,因为里面没有东西peek()永远返回 nullput()往 queue 放进去一个 element 以后就一直 wait 直到有其他 thread 进来把这个 element 取走。offer()往 queue 里放一个 element 后立即返回,如果碰巧这个 element 被另一个 thread 取走了,offer 方法返回 true,认为 offer 成功;否则返回 false。take()取出并且 remove 掉 queue 里的 element,取不到东西他会一直等。poll()取出并且 remove 掉 queue 里的 element,只有到碰巧另外一个线程正在往 queue 里 offer 数据或者 put 数据的时候,该方法才会取到东西。否则立即返回 null。isEmpty()永远返回 trueremove()&removeAll()永远返回 false
注意
PriorityBlockingQueue不会阻塞数据生产者(因为队列是无界的),而只会在没有可消费的数据时阻塞数据的消费者。因此使用的时候要特别注意,**生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。**对于使用默认大小的**LinkedBlockingQueue**也是一样的。