锁类
在JDK 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,Lock接口提供了与synchronized关键字类似的同步功能,但需要在使用时手动获取锁和释放锁。
Lock和Condition接口
使用并发包中的锁和synchronized锁不太一样,这里的锁可以认为是一把真正意义上的锁,每个锁都是一个对应的锁对象,只需要向锁对象获取锁或是释放锁即可。Lock接口定义:
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
|
使用Lock类来进行加锁和释放锁操作:
public class Main { private static int i = 0; public static void main(String[] args) throws InterruptedException { Lock testLock = new ReentrantLock(); Runnable action = () -> { for (int j = 0; j < 100000; j++) { testLock.lock(); i++; testLock.unlock(); } }; new Thread(action).start(); new Thread(action).start(); Thread.sleep(1000); System.out.println(i); } }
|
和之前使用synchronized
相比,我们这里是真正在操作一个”锁”对象,当我们需要加锁时,只需要调用lock()
方法,而需要释放锁时,只需要调用unlock()
方法。程序运行的最终结果和使用synchronized
锁是一样的
如何像传统的加锁那样,调用对象的wait()
和notify()
方法呢,juc提供了Condition接口:
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
|
举例演示:
public static void main(String[] args) throws InterruptedException { Lock testLock = new ReentrantLock(); Condition condition = testLock.newCondition(); new Thread(() -> { testLock.lock(); System.out.println("线程1进入等待状态!"); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程1等待结束!"); testLock.unlock(); }).start(); Thread.sleep(100); new Thread(() -> { testLock.lock(); System.out.println("线程2开始唤醒其他等待线程"); condition.signal(); System.out.println("线程2结束"); testLock.unlock(); }).start(); }
|
可重入锁
前面用到了ReentrantLock
,它其实是锁的一种,叫做可重入锁,即同一个线程,可以反复进行加锁操作:
public static void main(String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock(); lock.lock(); lock.lock(); new Thread(() -> { System.out.println("线程2想要获取锁"); lock.lock(); System.out.println("线程2成功获取到锁"); }).start(); lock.unlock(); System.out.println("线程1释放了一次锁"); TimeUnit.SECONDS.sleep(1); lock.unlock(); System.out.println("线程1再次释放了一次锁"); }
|
主线程连续进行了两次加锁操作(此操作是不会被阻塞的),在当前线程持有锁的情况下继续加锁不会被阻塞,并且,加锁几次,就必须要解锁几次,否则此线程依旧持有锁。
可以使用getHoldCount()
方法查看当前线程的加锁次数,当锁不再被任何线程持有时,值为0
,并且通过isLocked()
方法查询结果为false
如果存在线程持有当前的锁,那么其他线程在获取锁时,会暂时进入到等待队列的,可以通过getQueueLength()
方法获取等待中线程数量的预估值
可以通过hasQueuedThread(Thread thread)
方法来判断某个线程是否正在等待获取锁状态
Condition也可以进行判断,通过使用getWaitQueueLength()
方法能够查看同一个Condition目前有多少线程处于等待状态:
public static void main(String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); new Thread(() -> { lock.lock(); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } lock.unlock(); }).start(); TimeUnit.SECONDS.sleep(1); lock.lock(); System.out.println("当前Condition的等待线程数:"+lock.getWaitQueueLength(condition)); condition.signal(); System.out.println("当前Condition的等待线程数:"+lock.getWaitQueueLength(condition)); lock.unlock(); }
|
公平锁与非公平锁
如果线程之间争抢同一把锁,会暂时进入到等待队列中,那么多个线程获得锁的顺序是不是一定是根据线程调用lock()
方法时间来定的?
锁分为公平锁和非公平锁,默认我们创建出来的ReentrantLock
是采用的非公平锁作为底层锁机制。
- 公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。
- 非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。
读写锁
除了可重入锁之外,还有一种类型的锁叫做读写锁,当然它并不是专门用作读写操作的锁,它和可重入锁不同的地方在于,可重入锁是一种排他锁,当一个线程得到锁之后,另一个线程必须等待其释放锁,否则一律不允许获取到锁。而读写锁在同一时间,是可以让多个线程获取到锁的,它其实就是针对于读写场景而出现的。读写锁维护了一个读锁和一个写锁:
- 读锁:在没有任何线程占用写锁的情况下,同一时间可以有多个线程加读锁。
- 写锁:在没有任何线程占用读锁的情况下,同一时间只能有一个线程加写锁。
读写锁也有一个专门的接口:
public interface ReadWriteLock { Lock readLock();
Lock writeLock(); }
|
此接口有一个实现类ReentrantReadWriteLock(实现的是ReadWriteLock接口,不是Lock接口,它本身并不是锁),注意我们操作ReentrantReadWriteLock时,不能直接上锁,而是需要获取读锁或是写锁,再进行锁操作:
public static void main(String[] args) throws InterruptedException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); lock.readLock().lock(); new Thread(lock.readLock()::lock).start(); }
|
ReentrantReadWriteLock不仅具有读写锁的功能,还保留了可重入锁和公平/非公平机制,比如同一个线程可以重复为写锁加锁,并且必须全部解锁才真正释放锁:
public static void main(String[] args) throws InterruptedException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); lock.writeLock().lock(); lock.writeLock().lock(); new Thread(() -> { lock.writeLock().lock(); System.out.println("成功获取到写锁!"); }).start(); System.out.println("释放第一层锁!"); lock.writeLock().unlock(); TimeUnit.SECONDS.sleep(1); System.out.println("释放第二层锁!"); lock.writeLock().unlock(); }
|
锁降级和锁升级
锁降级指的是写锁降级为读锁。当一个线程持有写锁的情况下,虽然其他线程不能加读锁,但是线程自己是可以加读锁的:
public static void main(String[] args) throws InterruptedException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); lock.writeLock().lock(); lock.readLock().lock(); System.out.println("成功加读锁!"); }
|
如果我们在同时加了写锁和读锁的情况下,释放写锁,其他的线程就可以一起加读锁,这种操作可以称之为”锁降级”(持有写锁的情况下申请读锁再释放写锁)
在仅持有读锁的情况下去申请写锁,属于”锁升级”,ReentrantReadWriteLock是不支持的
队列同步器AbstractQueuedSynchronizer(AQS)
AQS是实现锁机制的基础,它的内部封装了包括锁的获取、释放、以及等待队列
一个锁的基本功能就是获取锁、释放锁、当锁被占用时,其他线程来争抢会进入等待队列,AQS已经将这些基本的功能封装完成了,其中等待队列是核心内容,等待队列是由双向链表实现的,每个等待状态下的线程都可以被封装进结点中并放入双向链表中,而对于双向链表是以队列的形式进行操作的

AQS内部维护了两个关键变量:
- state(同步状态)
- 整数变量,表示资源的占用情况(例如:锁是否被占用,信号量剩余数量)
- 通过CAS操作进行更新,保证线程安全
- FIFO等待队列(CLH队列)
- 线程获取资源失败时,会被加入等待队列,并阻塞
- 每个节点存储线程信息,便于后续唤醒
AQS中有一个head
字段和一个tail
字段分别记录双向链表的头结点和尾结点,而之后的一系列操作都是围绕此队列来进行的。AQS的内部结构:
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null;
static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3;
volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter;
final boolean isShared() { return nextWaiter == SHARED; }
final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }
Node() { }
Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; }
Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
|
工作机制
- 获取锁(acquire)
独占锁(ReentrantLock)acquire:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
|
tryAcquire(arg)
:尝试获取锁(由子类实现,如ReentrantLock)
addWaiter(Node.EXCLUSIVE)
:将当前线程加入等待队列
acquireQueued()
:阻塞当前线程,直到锁被释放
- 释放锁(release)
- 修改
state
变量,表示锁已释放
- 唤醒等待队列中的后继线程
独占锁(ReentrantLock)release:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
|
tryRelease(arg)
:释放锁(由子类实现)
unparkSuccessor(h)
:唤醒等待队列中的下一个线程

AQS的应用
组件 |
模式 |
作用 |
ReentrantLock |
独占 |
可重入互斥锁 |
Semaphore |
共享 |
控制并发访问数量 |
CountDownLatch |
共享 |
线程同步(等待计数归零) |
ReentrantReadWriteLock |
共享/独占 |
读写分离锁 |
示例
基于AQS实现自定义独占锁:
class MyLock extends AbstractQueuedSynchronizer { protected boolean tryAcquire(int acquires) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
protected boolean tryRelease(int releases) { if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; }
public void lock() { acquire(1); }
public void unlock() { release(1); } }
|
执行流程
- 线程调用
lock()
,进入acquire(1)
- 尝试
tryAcquire()
,如果 state=0
,CAS设置为1
- 如果
tryAcquire()
失败,则进入等待队列,阻塞等待唤醒
- 调用
unlock()
时,执行 release(1)
,并唤醒等待队列中的线程
公平锁的逻辑
在并发的情况下,公平锁一定公平吗?回顾一下tryAcquire()
方法的实现:
@ReservedStackAccess protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
|
所以hasQueuedPredecessors()
这个环节对于公平锁的公平与否至关重要,否则会直接破坏掉公平性,假如现在出现了这样的情况:
线程1已经持有锁了,这时线程2来争抢这把锁,走到hasQueuedPredecessors()
,判断出为false
,线程2继续运行,然后线程2肯定获取锁失败(因为锁这时是被线程1占有的),因此就进入到等待队列中,而这个时候线程3也来抢锁了,按照正常流程走到了hasQueuedPredecessors()
方法,因为线程2已经在等待队列中了,所以head!=tail
,因此,线程3这时就紧接着准备开始CAS操作了,又碰巧,这时线程1释放锁了,现在的情况就是,线程3直接开始CAS判断,而线程2还在等待队列中,结果可想而知,居然是线程3先拿到了锁,这显然是违背了公平锁的公平机制。

所以公平锁,只有在等待队列存在节点时,才是真正公平的。
Condition原理
Condition类实际上就是用于代替传统对象的wait/notify操作的,同样可以实现等待/通知模式,并且同一把锁下可以创建多个Condition对象。
在AQS中,Condition有一个实现类ConditionObject,也是使用了链表实现条件队列:
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; private transient Node firstWaiter; private transient Node lastWaiter;
|
这里是直接使用了AQS中的Node类,但是使用的是Node类中的nextWaiter字段连接节点,并且Node的status为CONDITION:

当一个线程调用await()
方法时,会进入等待状态,直到其他线程调用signal()
方法将其唤醒,而这里的条件队列,正是用于存储这些处于等待状态的线程。
await()
方法的目标:
- 只有已经持有锁的线程才可以使用此方法
- 当调用此方法后,会直接释放锁,无论加了多少次锁
- 只有其他线程调用
signal()
或是被中断时才会唤醒等待中的线程
- 被唤醒后,需要等待其他线程释放锁,拿到锁之后才可以继续执行,并且会恢复到之前的状态(await之前加了几层锁唤醒后依然是几层锁)
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
|
signal()
方法的目标:
- 只有持有锁的线程才能唤醒锁所属的Condition等待的线程
- 优先唤醒条件队列中的第一个,如果唤醒过程中出现问题,接着找往下找,直到找到一个可以唤醒的
- 唤醒操作本质上是将条件队列中的结点直接丢进AQS等待队列中,让其参与到锁的竞争中
- 拿到锁之后,线程才能恢复运行
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;
Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
|
LockSupport.unpark(node.thread)
为什么要提前来一次unpark呢?
其实是为了进行优化而编写,直接unpark会有两种情况:
- 如果插入结点前,AQS等待队列的队尾节点就已经被取消,则满足
wc > 0
- 如果插入node后,AQS内部等待队列的队尾节点已经稳定,满足
tail.waitStatus == 0
,但在执行ws > 0
之后!compareAndSetWaitStatus(p, ws,Node.SIGNAL)
之前被取消,则CAS也会失败,满足compareAndSetWaitStatus(p, ws,Node.SIGNAL) == false
如果这里被提前unpark,那么在await()
方法中将可以被直接唤醒,并跳出while循环,直接开始争抢锁,因为前一个等待结点是被取消的状态,没有必要再等它了

原子类
之前如果要保证某一个变量操作的原子性,那么我们的唯一选择就是加锁,现在除了加锁之外,JUC还为我们提供了原子类,底层采用CAS算法,它是一种用法简单、性能高效、线程安全地更新变量的方式。
所有的原子类都位于java.util.concurrent.atomic
包下。
介绍
常用基本数据类,有对应的原子类封装:
- AtomicInteger:原子更新int
- AtomicLong:原子更新long
- AtomicBoolean:原子更新boolean
示例:
public class Main { public static void main(String[] args) { AtomicInteger i = new AtomicInteger(1); System.out.println(i.getAndIncrement()); } }
|
可以将int数值封装到此类中(注意必须调用构造方法,它不像Integer那样有装箱机制),并且通过调用此类提供的方法来获取或是对封装的int值进行自增:
public class Main { private static AtomicInteger i = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { Runnable r = () -> { for (int j = 0; j < 100000; j++) i.getAndIncrement(); System.out.println("自增完成!"); }; new Thread(r).start(); new Thread(r).start(); TimeUnit.SECONDS.sleep(1); System.out.println(i.get()); } }
|
除了基本类有原子类以外,基本类型的数组类型也有原子类:
- AtomicIntegerArray:原子更新int数组
- AtomicLongArray:原子更新long数组
- AtomicReferenceArray:原子更新引用数组
可以对数组内的元素进行原子操作:
public static void main(String[] args) throws InterruptedException { AtomicIntegerArray array = new AtomicIntegerArray(new int[]{0, 4, 1, 3, 5}); Runnable r = () -> { for (int i = 0; i < 100000; i++) array.getAndAdd(0, 1); }; new Thread(r).start(); new Thread(r).start(); TimeUnit.SECONDS.sleep(1); System.out.println(array.get(0)); }
|
在JDK8之后,新增了DoubleAdder
和LongAdder
,在高并发情况下,LongAdder
的性能比AtomicLong
的性能更好,主要体现在自增上,它的大致原理如下:在低并发情况下,和AtomicLong
是一样的,对value值进行CAS操作,但是出现高并发的情况时,AtomicLong
会进行大量的循环操作来保证同步,而LongAdder
会将对value值的CAS操作分散为对数组cells
中多个元素的CAS操作(内部维护一个Cell[] as数组,每个Cell里面有一个初始值为0的long型变量,在高并发时会进行分散CAS,就是不同的线程可以对数组中不同的元素进行CAS自增,这样就避免了所有线程都对同一个值进行CAS),只需要最后再将结果加起来即可。

使用示例:
public static void main(String[] args) throws InterruptedException { LongAdder adder = new LongAdder(); Runnable r = () -> { for (int i = 0; i < 100000; i++) adder.add(1); }; for (int i = 0; i < 100; i++) new Thread(r).start(); TimeUnit.SECONDS.sleep(1); System.out.println(adder.sum()); }
|
除了对基本数据类型支持原子操作外,对于引用类型也可以实现原子操作:
public static void main(String[] args) throws InterruptedException { String a = "Hello"; String b = "World"; AtomicReference<String> reference = new AtomicReference<>(a); reference.compareAndSet(a, b); System.out.println(reference.get()); }
|
JUC还提供了字段原子更新器,可以对类中的某个指定字段进行原子操作(字段必须添加volatile关键字):
public class Main { public static void main(String[] args) throws InterruptedException { Student student = new Student(); AtomicIntegerFieldUpdater<Student> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Student.class, "age"); System.out.println(fieldUpdater.incrementAndGet(student)); }
public static class Student{ volatile int age; } }
|
ABA问题
现在有这样一种场景:
线程1和线程2同时开始对a
的值进行CAS修改,但是线程1的速度比较快,将a的值修改为2之后紧接着又修改回1,这时线程2才开始进行判断,发现a的值是1,所以CAS操作成功。
很明显,这里的1已经不是一开始的那个1了,而是被重新赋值的1,这也是CAS操作存在的问题,它只会机械地比较当前值是不是预期值,但是并不会关心当前值是否被修改过,这种问题称之为ABA
问题。
对于ABA问题,JUC提供了带版本号的引用类型,只要每次操作都记录一下版本号,并且版本号不会重复:
public static void main(String[] args) throws InterruptedException { String a = "Hello"; String b = "World"; AtomicStampedReference<String> reference = new AtomicStampedReference<>(a, 1); reference.attemptStamp(a, 2); System.out.println(reference.compareAndSet(a, b, 2, 3)); }
|