并发容器

在单线程模式下,集合类提供的容器可以说是非常方便了,比如链表、顺序表、哈希表等数据结构,但是这些容器在多线程环境下,并不能正常工作。要解决并发情况下的容器问题,可以给方法前面加个synchronzed,或者使用Vector或是Hashtable,但是它们的效率实在是太低了,完全依靠锁来解决问题。

JUC提供了专用于并发场景下的容器,比如可以代替ArrayListCopyOnWriteArrayList

public static void main(String[] args) throws InterruptedException {
List<String> list = new CopyOnWriteArrayList<>(); //使用CopyOnWriteArrayList保证线程安全
Runnable r = () -> {
for (int i = 0; i < 100; i++)
list.add("aaa");
};
for (int i = 0; i < 100; i++)
new Thread(r).start();
TimeUnit.SECONDS.sleep(1);
System.out.println(list.size());
}

CopyOnWriteArrayList对于读操作不加锁,而对于写操作是加锁的,类似于读写锁机制,这样就可以保证不丢失读性能的情况下,写操作不会出现问题:

public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); //直接加锁,保证同一时间只有一个线程进行添加操作
try {
Object[] elements = getArray(); //获取当前存储元素的数组
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); //直接复制一份数组
newElements[len] = e; //修改复制出来的数组
setArray(newElements); //将元素数组设定为复制出来的数组
return true;
} finally {
lock.unlock();
}
}

对于HashMap的并发容器ConcurrentHashMap

public static void main(String[] args) throws InterruptedException {
Map<Integer, String> map = new ConcurrentHashMap<>();
for (int i = 0; i < 100; i++) {
int tmp = i;
new Thread(() -> {
for (int j = 0; j < 100; j++)
map.put(tmp * 100 + j, "aaa");
}).start();
}
TimeUnit.SECONDS.sleep(1);
System.out.println(map.size());
}

JDK7之前,ConcurrentHashMap的原理类似于LongAdder的压力分散思想,既然每个线程都想抢锁,那就多搞几把锁,让每个线程都能拿到,就不会存在等待的问题了,ConcurrentHashMap将所有数据分为一段一段地存储,先分很多段出来,每一段都给一把锁,当一个线程占锁访问时,只会占用其中一把锁,也就是仅仅锁了一小段数据,而其他段的数据依然可以被其他线程正常访问。

JDK8之后,ConcurrentHashMap采用了CAS算法配合锁机制实现,其底层与HashMap大差不差,HashMap利用了一个用于存放后续节点的头结点的数组,数组里面的每一个元素都是一个头结点(也可以说就是一个链表),当要新插入一个数据时,会先计算该数据的哈希值,找到数组下标,然后创建一个新的节点,添加到对应的链表后面。当链表的长度达到8时,会自动将链表转换为红黑树,这样能使得原有的查询效率大幅度提高

ConcurrentHashMap也是维护了一个哈希表:

public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //计算键的hash值,用于确定在哈希表中的位置
int binCount = 0; //记录链表长度
for (Node<K,V>[] tab = table;;) { //CAS自旋锁
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); //如果数组(哈希表)为空要进行初始化
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果哈希表该位置为null,直接CAS插入结点作为头结即可(注意这里会将f设置当前哈希表位置上的头结点)
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // 如果CAS成功,直接break结束put方法,失败那就继续下一轮循环
} else if ((fh = f.hash) == MOVED) //头结点哈希值为-1,这里只需要知道是因为正在扩容即可
tab = helpTransfer(tab, f); //帮助进行迁移,完事之后再来下一次循环
else { //正常情况
V oldVal = null;
synchronized (f) { //在前面的循环中f肯定是被设定为了哈希表某个位置上的头结点,这里直接把它作为锁加锁了,防止同一时间其他线程也在操作哈希表中这个位置上的链表或是红黑树
if (tabAt(tab, i) == f) {
if (fh >= 0) { //头结点的哈希值大于等于0说明是链表,下面就是针对链表的一些列操作
//...
} else if (f instanceof TreeBin) { //针对红黑树的情况进行操作
//在ConcurrentHashMap并不是直接存储的TreeNode,而是TreeBin
//...
}
}
}
//根据链表长度决定是否要进化为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); //这里只是可能会进化为红黑树,如果当前哈希表的长度小于64,会优先考虑对哈希表进行扩容
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

ConcurrentHashMap的put操作,实际上是对哈希表上的所有头结点元素分别加锁,理论上来说哈希表的长度很大程度上决定了ConcurrentHashMap在同一时间能够处理的线程数量,这也是为什么treeifyBin()会优先考虑为哈希表进行扩容的原因

阻塞队列

除了常用的容器类之外,JUC还提供了各种各样的阻塞队列,用于不同的工作场景。

阻塞队列本身也是队列,基于ReentrantLock实现,它的接口定义如下:

public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);

//入队,如果队列已满,返回false否则返回true(非阻塞)
boolean offer(E e);

//入队,如果队列已满,阻塞线程直到能入队为止
void put(E e) throws InterruptedException;

//入队,如果队列已满,阻塞线程直到能入队或超时、中断为止,入队成功返回true否则false
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

//出队,如果队列为空,阻塞线程直到能出队为止
E take() throws InterruptedException;

//出队,如果队列为空,阻塞线程直到能出队超时、中断为止,出队成功正常返回,否则返回null
E poll(long timeout, TimeUnit unit)
throws InterruptedException;

//返回此队列理想情况下(在没有内存或资源限制的情况下)可以不阻塞地入队的数量,如果没有限制,则返回 Integer.MAX_VALUE
int remainingCapacity();

boolean remove(Object o);

public boolean contains(Object o);

//一次性从BlockingQueue中获取所有可用的数据对象(还可以指定获取数据的个数)
int drainTo(Collection<? super E> c);

int drainTo(Collection<? super E> c, int maxElements);

比如现在有一个容量为3的阻塞队列,这个时候一个线程put向其添加了三个元素,第二个线程接着put向其添加三个元素,那么这个时候由于容量已满,会直接被阻塞,而这时第三个线程从队列中取走2个元素,线程二停止阻塞,先丢两个进去,还有一个还是进不去,所以说继续阻塞。

可以理解为:当队列为空时,获取元素的操作会被阻塞;当队列已满时,添加元素的操作会被阻塞

常用的实现类

  1. ArrayBlockingQueue
  • 基于数组实现的有界阻塞队列
  • 初始化时需要指定队列容量:BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10)
  • 支持公平锁和非公平锁
  1. LinkedBlockingQueue
  • 基于链表实现的可选有界阻塞队列
  • 默认容量为Integer.MAX_VALUE,可以指定容量:BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10)
  • 吞吐量通常高于 ArrayBlockingQueue
  1. PriorityBlockingQueue
  • 基于堆实现的优先级阻塞队列
  • 元素按优先级排序,默认自然顺序或通过 Comparator 指定
  • 无界队列,容量为Integer.MAX_VALUE:PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>()
  1. DelayQueue
  • 基于优先级队列实现的延迟队列
  • 元素必须实现 Delayed 接口,只有在延迟时间到达后才能被取出
  • 无界队列:DelayQueue<DelayedTask> queue = new DelayQueue<>()
  1. SynchronousQueue
  • 一种不存储元素的阻塞队列:SynchronousQueue<Integer> queue = new SynchronousQueue<>()
  • 每个插入操作必须等待另一个线程的移除操作,反之亦然
  • 适用于线程之间的直接传递数据
阻塞队列 特点 适用场景
ArrayBlockingQueue 基于数组的有界队列,支持公平锁和非公平锁。 固定大小的队列,生产者-消费者模型。
LinkedBlockingQueue 基于链表的可选有界队列,默认无界,高吞吐量。 高吞吐量队列,生产者-消费者模型。
PriorityBlockingQueue 基于堆的优先级队列,无界。 按优先级处理任务,任务调度。
DelayQueue 延迟队列,元素必须实现 Delayed 接口。 延迟任务调度,缓存过期策略。
SynchronousQueue 不存储元素,直接传递数据。 线程间直接数据传递,高并发任务分发。

并发工具类

计数器锁 CountDownLatch

CountDownLatch允许一个或多个线程,等待其他线程完成工作。

比如有这样一个需求:

  • 有20个计算任务,需要将这些任务的结果全部计算出来后再汇总,每个任务的执行时间未知
  • 当所有任务结束之后,立即整合统计最终结果

使用CountDownLatch可以轻松实现:

public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(20); //创建一个初始值为20的计数器锁
for (int i = 0; i < 20; i++) {
new Thread(() -> {
try {
Thread.sleep((long) (2000 * new Random().nextDouble())); //模拟任务执行耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown(); //每执行一次计数器都会-1
}).start();
}

//开始等待所有的线程完成,当计数器为0时,恢复运行
latch.await(); //这个操作可以同时被多个线程执行,一起等待,这里只演示了一个
System.out.println("任务完成");
//注意这个计数器只能使用一次,用完只能重新创一个,没有重置的说法
}

我们在调用await()方法之后,实际上就是一个等待计数器衰减为0的过程,而进行自减操作则由各个子线程来完成,当子线程完成工作后,那么就将计数器-1,所有的子线程完成之后,计数器为0,结束等待

CountDownLatch的基本实现思路:

  • 利用共享锁实现,同一时刻能有多个线程拥有共享锁,如果一个线程刚获取了共享锁,那么在其之后等待的线程也很有可能能够获取到锁,所以得传播下去继续尝试唤醒后面的结点,并且如果一个线程刚释放了锁,不管是独占锁还是共享锁,都需要唤醒后续等待结点的线程
  • 在一开始的时候就是已经上了count层锁的状态,也就是state = count
  • await()就是加共享锁,但是必须state0才能加锁成功,否则按照AQS的机制,会进入等待队列阻塞,加锁成功后结束阻塞
  • countDown()就是解1层锁,靠这个方法一点一点把state的值减到0

循环屏障 CyclicBarrier

CyclicBarrier用于让一组线程互相等待,直到所有线程都到达某个屏障点。它的底层实现基于 ReentrantLockCondition,通过计数器机制和条件变量实现线程的等待和唤醒。

比如需要玩一个开房间游戏,不满足人数不能进,必须等待剩下的人到来之后才能开始游戏,并且保证游戏开始时所有玩家都是同时进入,就可以使用CyclicBarrier:

public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(10, //创建一个初始值为10的循环屏障
() -> System.out.println("准备开始")); //人等够之后执行的任务
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
try {
Thread.sleep((long) (2000 * new Random().nextDouble()));
System.out.println("玩家 "+ finalI +" 进入房间进行等待... ("+barrier.getNumberWaiting()+"/10)");

barrier.await(); //调用await方法进行等待,直到等待的线程足够多为止

//所有玩家一起进入
System.out.println("玩家 "+ finalI +" 进入游戏!");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}

循环屏障会不断阻挡线程,直到被阻挡的线程足够多时,才能一起通过,并且屏障是可循环的,它在被冲破后,会重新开始计数,继续阻挡后续的线程,直到再次满足预定的线程数量。

除了自动重置之外,也可以调用reset()方法来手动进行重置操作,同样会重新计数:

public static void main(String[] args) throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(10);

for (int i = 0; i < 3; i++)
new Thread(() -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();

Thread.sleep(500); //等上面的线程开始运行
System.out.println("当前屏障前的等待线程数:"+barrier.getNumberWaiting());

barrier.reset();
System.out.println("重置后屏障前的等待线程数:"+barrier.getNumberWaiting());
}

在调用reset()之后,处于等待状态下的线程,全部被中断并且抛出BrokenBarrierException异常,循环屏障等待线程数归零。

信号量 Semaphore

信号量可以决定某个资源同一时间能够被访问的最大线程数,它相当于对某个资源的访问进行了流量控制。简单来说,它就是一个可以被N个线程占用的排它锁(支持公平和非公平模式,默认是非公平模式),使用时可以在最开始设定Semaphore许可证的数量,每个线程都可以获得1个或n个许可证,当许可证耗尽或不足以供其他线程获取时,其他线程将被阻塞。

public static void main(String[] args) throws ExecutionException, InterruptedException {
//每一个Semaphore都会在一开始获得指定的许可证数数量,也就是许可证配额
Semaphore semaphore = new Semaphore(2); //许可证配额设定为2

for (int i = 0; i < 3; i++) { // 有3个线程抢这2个信号量
new Thread(() -> {
try {
semaphore.acquire(); //申请一个许可证,默认申请1个,acquire(n)则表示申请n个
System.out.println("许可证申请成功!");
semaphore.release(); //归还一个许可证
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

可以通过Semaphore获取一些常规信息:

  • availablePermits:剩余许可证数量
  • hasQueuedThreads:是否存在线程等待许可证
  • getQueueLength:等待许可证线程数量
  • drainPermits:回收全部许可证

Semaphore可以控制对共享资源的并发访问数量,避免资源过载。可以管理数据库连接池、线程池等资源,确保资源的使用不超过限制。可以控制并发任务的数量,避免系统资源耗尽等。

数据交换 Exchanger

Exchanger底层实现基于CAS和等待队列,能够实现线程之间的数据交换:

public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
System.out.println("收到主线程传递的交换数据:"+exchanger.exchange("AAAA"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("收到子线程传递的交换数据:"+exchanger.exchange("BBBB"));
}

Exchanger可以被重复使用,每次交换数据后,可以继续使用

在调用exchange方法后,当前线程会等待其他线程调用同一个exchanger对象的exchange方法,当另一个线程也调用之后,方法会返回对方线程传入的参数。如果一直没有另一个线程调用exchanger方法,当前线程会进入阻塞状态。

Fork/Join框架

在JDK7时,出现了一个新的框架用于并行执行任务,它能够把大型任务拆分为多个小任务,最后汇总多个小任务的结果,得到整大任务的结果,并且这些小任务都是并行执行的。Fork就是拆分,Join就是合并。它通过工作窃取(Work-Stealing)算法利用多核CPU资源。

工作窃取算法:是指某个线程从其他队列里窃取任务来执行。一个大任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务待处理。干完活的线程与其等着,不如帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。

Fork/Join框架的核心类包括:

  1. ForkJoinPool:任务执行的线程池,负责管理线程和任务队列
  2. ForkJoinTask:表示一个任务,通常使用其子类:
    • RecursiveAction:用于没有返回值的任务
    • RecursiveTask:用于有返回值的任务

工作流程

  1. 任务拆分:将一个大任务拆分为多个小任务,直到任务足够小,可以直接执行
  2. 任务执行:将小任务提交到 ForkJoinPool 中执行
  3. 结果合并:将小任务的执行结果合并,得到最终结果

示例

计算数组中1~1000的总和:

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;

public class ForkJoinSumExample {
public static void main(String[] args) {
int[] array = new int[1000];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}

ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
int result = pool.invoke(task); // 提交任务并获取结果
System.out.println("数组元素和: " + result);
}
}

class SumTask extends RecursiveTask<Integer> { //继承RecursiveTask,这样才可以作为一个任务,泛型就是计算结果类型
private final int[] array;
private final int start; // 限定范围
private final int end;
private static final int THRESHOLD = 125; // 任务拆分的阈值,拆分成8组

SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
// 任务足够小,直接计算
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// 任务较大,拆分为两个子任务,类似于递归
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
leftTask.fork(); // 将leftTask提交到ForkJoinPool中,由其他线程异步执行,如果范围仍然较大会继续拆分
int rightResult = rightTask.compute(); // 在当前线程中执行右子任务,如果范围仍然较大会继续拆分
int leftResult = leftTask.join(); // 获取左子任务的结果
return leftResult + rightResult; // 合并结果
}
}
}

Arrays工具类提供的并行排序也是利用了ForkJoinPool来实现:

public static void parallelSort(byte[] a) {
int n = a.length, p, g;
if (n <= MIN_ARRAY_SORT_GRAN ||
(p = ForkJoinPool.getCommonPoolParallelism()) == 1)
DualPivotQuicksort.sort(a, 0, n - 1);
else
new ArraysParallelSortHelpers.FJByte.Sorter
(null, a, new byte[n], 0, n, 0,
((g = n / (p << 2)) <= MIN_ARRAY_SORT_GRAN) ?
MIN_ARRAY_SORT_GRAN : g).invoke();
}