我来详细介绍一下 Java AQS(AbstractQueuedSynchronizer)。
什么是 AQS?
AQS 是 Java 并发包中的核心同步框架,位于 java.util.concurrent.locks 包中。它提供了一个实现阻塞锁和相关同步器的框架。
核心原理
AQS 使用一个 volatile int state 变量来表示同步状态,以及一个 FIFO 队列来管理获取资源失败的线程。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer {
// 同步状态
private volatile int state;
// 等待队列的头节点和尾节点
private transient volatile Node head;
private transient volatile Node tail;
}
同步状态 State
state 的不同值代表不同的含义:
· 0: 资源可用,无线程占用
· 1: 资源被占用
· >1: 可重入锁的重入次数
两种同步模式
一次只有一个线程能获取资源,如 ReentrantLock。
多个线程可以同时获取资源,如 Semaphore、CountDownLatch。
核心方法
需要子类实现的方法:
· tryAcquire(int) - 独占模式获取
· tryRelease(int) - 独占模式释放
· tryAcquireShared(int) - 共享模式获取
· tryReleaseShared(int) - 共享模式释放
· isHeldExclusively() - 是否被当前线程独占
实现示例
public class MyMutex implements Lock {
private final Sync sync = new Sync();
// 自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 尝试获取锁
@Override
protected boolean tryAcquire(int arg) {
// 使用 CAS 将 state 从 0 改为 1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试释放锁
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0); // 不需要 CAS,因为只有持有锁的线程才能释放
return true;
}
// 是否被独占
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 创建 Condition
Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
// 其他 Lock 接口方法...
}
public class MySemaphore {
private final Sync sync;
public MySemaphore(int permits) {
sync = new NonfairSync(permits);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
// 共享模式获取
protected int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
// 共享模式释放
protected boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) { // 溢出检查
throw new Error("Maximum permit count exceeded");
}
if (compareAndSetState(current, next)) {
return true;
}
}
}
}
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void release() {
sync.releaseShared(1);
}
}
基于 AQS 的 JDK 类
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
// 临界区
} finally {
lock.unlock();
}
CountDownLatch latch = new CountDownLatch(3);
// 工作线程
new Thread(() -> {
// 执行任务
latch.countDown();
}).start();
// 等待所有线程完成
latch.await();
Semaphore semaphore = new Semaphore(3); // 允许3个线程同时访问
semaphore.acquire();
try {
// 访问共享资源
} finally {
semaphore.release();
}
AQS 的优势
使用注意事项
AQS 是理解 Java 并发编程的重要基础,掌握它有助于深入理解 Java 并发包的实现原理。