JUC并发容器与并发工具
并发容器
在单线程模式下,集合类提供的容器可以说是非常方便了,比如链表、顺序表、哈希表等数据结构,但是这些容器在多线程环境下,并不能正常工作。要解决并发情况下的容器问题,可以给方法前面加个synchronzed,或者使用Vector或是Hashtable,但是它们的效率实在是太低了,完全依靠锁来解决问题。
JUC提供了专用于并发场景下的容器,比如可以代替ArrayList
的CopyOnWriteArrayList
:
public static void main(String[] args) throws InterruptedException { |
CopyOnWriteArrayList
对于读操作不加锁,而对于写操作是加锁的,类似于读写锁机制,这样就可以保证不丢失读性能的情况下,写操作不会出现问题:
public boolean add(E e) { |
对于HashMap的并发容器ConcurrentHashMap
:
public static void main(String[] args) throws InterruptedException { |
JDK7之前,ConcurrentHashMap
的原理类似于LongAdder的压力分散思想,既然每个线程都想抢锁,那就多搞几把锁,让每个线程都能拿到,就不会存在等待的问题了,ConcurrentHashMap
将所有数据分为一段一段地存储,先分很多段出来,每一段都给一把锁,当一个线程占锁访问时,只会占用其中一把锁,也就是仅仅锁了一小段数据,而其他段的数据依然可以被其他线程正常访问。
JDK8之后,ConcurrentHashMap
采用了CAS算法配合锁机制实现,其底层与HashMap
大差不差,HashMap
利用了一个用于存放后续节点的头结点的数组,数组里面的每一个元素都是一个头结点(也可以说就是一个链表),当要新插入一个数据时,会先计算该数据的哈希值,找到数组下标,然后创建一个新的节点,添加到对应的链表后面。当链表的长度达到8时,会自动将链表转换为红黑树,这样能使得原有的查询效率大幅度提高
ConcurrentHashMap
也是维护了一个哈希表:
public V put(K key, V value) { |
ConcurrentHashMap
的put操作,实际上是对哈希表上的所有头结点元素分别加锁,理论上来说哈希表的长度很大程度上决定了ConcurrentHashMap
在同一时间能够处理的线程数量,这也是为什么treeifyBin()
会优先考虑为哈希表进行扩容的原因
阻塞队列
除了常用的容器类之外,JUC还提供了各种各样的阻塞队列,用于不同的工作场景。
阻塞队列本身也是队列,基于ReentrantLock实现,它的接口定义如下:
public interface BlockingQueue<E> extends Queue<E> { |
比如现在有一个容量为3的阻塞队列,这个时候一个线程put
向其添加了三个元素,第二个线程接着put
向其添加三个元素,那么这个时候由于容量已满,会直接被阻塞,而这时第三个线程从队列中取走2个元素,线程二停止阻塞,先丢两个进去,还有一个还是进不去,所以说继续阻塞。
可以理解为:当队列为空时,获取元素的操作会被阻塞;当队列已满时,添加元素的操作会被阻塞
常用的实现类
ArrayBlockingQueue
- 基于数组实现的有界阻塞队列
- 初始化时需要指定队列容量:
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10)
- 支持公平锁和非公平锁
LinkedBlockingQueue
- 基于链表实现的可选有界阻塞队列
- 默认容量为
Integer.MAX_VALUE
,可以指定容量:BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10)
- 吞吐量通常高于
ArrayBlockingQueue
PriorityBlockingQueue
- 基于堆实现的优先级阻塞队列
- 元素按优先级排序,默认自然顺序或通过
Comparator
指定 - 无界队列,容量为
Integer.MAX_VALUE
:PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>()
DelayQueue
- 基于优先级队列实现的延迟队列
- 元素必须实现
Delayed
接口,只有在延迟时间到达后才能被取出 - 无界队列:
DelayQueue<DelayedTask> queue = new DelayQueue<>()
SynchronousQueue
- 一种不存储元素的阻塞队列:
SynchronousQueue<Integer> queue = new SynchronousQueue<>()
- 每个插入操作必须等待另一个线程的移除操作,反之亦然
- 适用于线程之间的直接传递数据
阻塞队列 | 特点 | 适用场景 |
---|---|---|
ArrayBlockingQueue |
基于数组的有界队列,支持公平锁和非公平锁。 | 固定大小的队列,生产者-消费者模型。 |
LinkedBlockingQueue |
基于链表的可选有界队列,默认无界,高吞吐量。 | 高吞吐量队列,生产者-消费者模型。 |
PriorityBlockingQueue |
基于堆的优先级队列,无界。 | 按优先级处理任务,任务调度。 |
DelayQueue |
延迟队列,元素必须实现 Delayed 接口。 |
延迟任务调度,缓存过期策略。 |
SynchronousQueue |
不存储元素,直接传递数据。 | 线程间直接数据传递,高并发任务分发。 |
并发工具类
计数器锁 CountDownLatch
CountDownLatch允许一个或多个线程,等待其他线程完成工作。
比如有这样一个需求:
- 有20个计算任务,需要将这些任务的结果全部计算出来后再汇总,每个任务的执行时间未知
- 当所有任务结束之后,立即整合统计最终结果
使用CountDownLatch可以轻松实现:
public static void main(String[] args) throws InterruptedException { |
我们在调用await()
方法之后,实际上就是一个等待计数器衰减为0的过程,而进行自减操作则由各个子线程来完成,当子线程完成工作后,那么就将计数器-1,所有的子线程完成之后,计数器为0,结束等待
CountDownLatch的基本实现思路:
- 利用共享锁实现,同一时刻能有多个线程拥有共享锁,如果一个线程刚获取了共享锁,那么在其之后等待的线程也很有可能能够获取到锁,所以得传播下去继续尝试唤醒后面的结点,并且如果一个线程刚释放了锁,不管是独占锁还是共享锁,都需要唤醒后续等待结点的线程
- 在一开始的时候就是已经上了count层锁的状态,也就是
state = count
await()
就是加共享锁,但是必须state
为0
才能加锁成功,否则按照AQS的机制,会进入等待队列阻塞,加锁成功后结束阻塞countDown()
就是解1
层锁,靠这个方法一点一点把state
的值减到0
循环屏障 CyclicBarrier
CyclicBarrier用于让一组线程互相等待,直到所有线程都到达某个屏障点。它的底层实现基于 ReentrantLock
和 Condition
,通过计数器机制和条件变量实现线程的等待和唤醒。
比如需要玩一个开房间游戏,不满足人数不能进,必须等待剩下的人到来之后才能开始游戏,并且保证游戏开始时所有玩家都是同时进入,就可以使用CyclicBarrier:
public static void main(String[] args) { |
循环屏障会不断阻挡线程,直到被阻挡的线程足够多时,才能一起通过,并且屏障是可循环的,它在被冲破后,会重新开始计数,继续阻挡后续的线程,直到再次满足预定的线程数量。
除了自动重置之外,也可以调用reset()
方法来手动进行重置操作,同样会重新计数:
public static void main(String[] args) throws InterruptedException { |
在调用reset()
之后,处于等待状态下的线程,全部被中断并且抛出BrokenBarrierException
异常,循环屏障等待线程数归零。
信号量 Semaphore
信号量可以决定某个资源同一时间能够被访问的最大线程数,它相当于对某个资源的访问进行了流量控制。简单来说,它就是一个可以被N个线程占用的排它锁(支持公平和非公平模式,默认是非公平模式),使用时可以在最开始设定Semaphore许可证的数量,每个线程都可以获得1个或n个许可证,当许可证耗尽或不足以供其他线程获取时,其他线程将被阻塞。
public static void main(String[] args) throws ExecutionException, InterruptedException { |
可以通过Semaphore获取一些常规信息:
availablePermits
:剩余许可证数量hasQueuedThreads
:是否存在线程等待许可证getQueueLength
:等待许可证线程数量drainPermits
:回收全部许可证
Semaphore可以控制对共享资源的并发访问数量,避免资源过载。可以管理数据库连接池、线程池等资源,确保资源的使用不超过限制。可以控制并发任务的数量,避免系统资源耗尽等。
数据交换 Exchanger
Exchanger底层实现基于CAS和等待队列,能够实现线程之间的数据交换:
public static void main(String[] args) throws InterruptedException { |
Exchanger
可以被重复使用,每次交换数据后,可以继续使用
在调用exchange
方法后,当前线程会等待其他线程调用同一个exchanger对象的exchange
方法,当另一个线程也调用之后,方法会返回对方线程传入的参数。如果一直没有另一个线程调用exchanger
方法,当前线程会进入阻塞状态。
Fork/Join框架
在JDK7时,出现了一个新的框架用于并行执行任务,它能够把大型任务拆分为多个小任务,最后汇总多个小任务的结果,得到整大任务的结果,并且这些小任务都是并行执行的。Fork就是拆分,Join就是合并。它通过工作窃取(Work-Stealing)算法利用多核CPU资源。
工作窃取算法:是指某个线程从其他队列里窃取任务来执行。一个大任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务待处理。干完活的线程与其等着,不如帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
Fork/Join框架的核心类包括:
ForkJoinPool
:任务执行的线程池,负责管理线程和任务队列ForkJoinTask
:表示一个任务,通常使用其子类:RecursiveAction
:用于没有返回值的任务RecursiveTask
:用于有返回值的任务
工作流程
- 任务拆分:将一个大任务拆分为多个小任务,直到任务足够小,可以直接执行
- 任务执行:将小任务提交到
ForkJoinPool
中执行 - 结果合并:将小任务的执行结果合并,得到最终结果
示例
计算数组中1~1000的总和:
import java.util.concurrent.RecursiveTask; |
Arrays工具类提供的并行排序也是利用了ForkJoinPool来实现:
public static void parallelSort(byte[] a) { |