No Regrets.

AQS详讲

Posted on By Marin



AQS

AQS,AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),它是JUC并发包中的核心基础组件。AQS解决了实现同步器时涉及当的大量细节问题,例如获取同步状态、FIFO同步队列。

基于AQS来构建同步器可以带来很多好处。它不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而降低上下文切换的开销,提高了吞吐量。同时在设计AQS时充分考虑了可伸缩行,因此J.U.C中所有基于AQS构建的同步器均可以获得这个优势。 AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。

AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。

AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

AQS主要提供了如下一些方法:

getState():返回同步状态的当前值;

setState(int newState):设置当前同步状态;

compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;

tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态;

tryRelease(int arg):独占式释放同步状态;

tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;

tryReleaseShared(int arg):共享式释放同步状态;

isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;

acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;

acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;

tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;

acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;

acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;

tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;

release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;

releaseShared(int arg):共享式释放同步状态;

Unsafe & LockSupport

Unsafe

sun.misc.Unsafe类型从名字看,这个类应该是封装了一些不安全的操作。

1、可以用来在任意内存地址位置处读写数据,可见,对于普通用户来说,使用起来还是比较危险的;

2、还支持一些CAS原子操作;

Java无法直接访问底层操作系统,而是通过本地(native)方法来访问。不过尽管如此,JVM还是开了一个后门,JDK中有一个类Unsafe,它提供了硬件级别的原子操作。

Unsafe的构造函数:

	private Unsafe()
    {
    }

    public static Unsafe getUnsafe()
    {
        Class class1 = Reflection.getCallerClass();
        if(!VM.isSystemDomainLoader(class1.getClassLoader()))
            throw new SecurityException("Unsafe");
        else
            return theUnsafe;
    }



从构造函数我们可以知道,不能直接new Unsafe(),原因是Unsafe被设计成单例模式,构造方法是私有的;

不能通过调用Unsafe.getUnsafe()获取,因为getUnsafe被设计成只能从引导类加载器(bootstrap class loader)加载,从getUnsafe的源码中也可以看出来;

JDK的开发人员并不希望大家使用这个类。获得Unsafe实例的方法是只能调用其工厂方法getUnsafe()。上面其中的if(!VM.isSystemDomainLoader(class1.getClassLoader()))的判断是为了只有JDK内部类调用。其它应用层调用它都报错。

所以我们在测试和分析Unsafe类的时候只能通过反射来获取其实例,从而使用其内部方法;



Unsafe类提供了硬件级别的原子操作,主要提供了以下功能:

1、通过Unsafe类可以分配内存,可以释放内存;

类中提供的3个本地方法allocateMemory、reallocateMemory、freeMemory分别用于分配内存,扩充内存和释放内存,与C语言中的3个方法对应。

//分配内存
public native long allocateMemory(long l);
//扩充内存
public native long reallocateMemory(long l, long l1);
//释放内存
public native void freeMemory(long l);



2、可以定位对象某字段的内存位置,也可以修改对象的字段值,即使它是私有的;

字段的定位:

JAVA中对象的字段的定位可能通过native staticFieldOffset方法实现,该方法返回给定field的内存地址偏移量,这个值对于给定的filed是唯一的且是固定不变的。

native getIntVolatile方法获取对象中offset偏移地址对应的整型field的值,支持volatile load语义。

native getLong方法获取对象中offset偏移地址对应的long型field的值。

数组元素定位:

Unsafe类中有很多以BASE_OFFSET结尾的常量,比如ARRAY_INT_BASE_OFFSET,ARRAY_BYTE_BASE_OFFSET等,这些常量值是通过arrayBaseOffset方法得到的。arrayBaseOffset方法是一个本地方法,可以获取数组第一个元素的偏移地址。Unsafe类中还有很多以INDEX_SCALE结尾的常量,比如 ARRAY_INT_INDEX_SCALE , ARRAY_BYTE_INDEX_SCALE等,这些常量值是通过arrayIndexScale方法得到的。arrayIndexScale方法也是一个本地方法,可以获取数组的转换因子,也就是数组中元素的增量地址。将arrayBaseOffset与arrayIndexScale配合使用,可以定位数组中每个元素在内存中的位置。 3、挂起与恢复

将一个线程进行挂起是通过park方法实现的,调用park后,线程将一直阻塞直到超时或者中断等条件出现。unpark可以终止一个挂起的线程,使其恢复正常。整个并发框架中对线程的挂起操作被封装在LockSupport类中,LockSupport类中有各种版本pack方法,但最终都调用了Unsafe.park()方法。

 public native void unpark(Object obj);

    public native void park(boolean flag, long l);



4、CAS操作

CAS操作是通过compareAndSwapXXX方法实现的

CAS操作有3个操作数,内存值M,预期值E,新值U,如果M==E,则将内存值修改为B,否则啥都不做。

看下natUnsafe.cc中的c++实现吧,加深理解,其实就是将内存值与预期值作比较,判断是否相等,相等的话,写入数据,不相等不做操作,返回旧数据;

static inline bool
compareAndSwap (volatile jint *addr, jint old, jint new_val)
{
  jboolean result = false;
  spinlock lock;
  if ((result = (*addr == old)))
    *addr = new_val;
  return result;
}



5、putLong,putInt,putDouble,putChar,putObject等方法,直接修改内存数据(可以越过访问权限) 这里,还有put对应的get方法,很简单就是直接读取内存地址处的数据;

我们可以举个putLong(Object, long, long)方法详细看下其具体实现,其它的类似,先看Java的源码,没啥好看的,就声明了一个native本地方法:

public native void    putLong(Object o, long offset, long x);



还是看下natUnsafe.cc中的c++实现吧,很简单,就是计算要写入数据的内存地址,然后写入数据,如下:

void
sun::misc::Unsafe::putLong (jobject obj, jlong offset, jlong value)
{
  jlong *addr = (jlong *) ((char *) obj + offset);//计算要修改的数据的内存地址=对象地址+成员属性地址偏移量
  spinlock lock;//自旋锁,通过循环来获取锁, i386处理器需要加锁访问64位数据,如果是int,则不需要改行代码
  *addr = value;//往该内存地址位置直接写入数据
}



6、getLongVolatile/putLongVolatile等等方法

这类方法使用volatile语义去存取数据,我的理解就是各个线程不缓存数据,直接在内存中读取数据;

Unsafe实例方法演示代码

1、User类的成员属性是私有的且没有提供对外的public方法,我们还是可以直接在它们的内存地址位置处写入数据

public class AQSTest {
    public static void main(String[] args) throws IllegalAccessException, NoSuchFieldException {
        Field field= Unsafe.class.getDeclaredField("theUnsafe");
        field.setAccessible(true);
        Unsafe unsafe= (Unsafe) field.get(null);

        User user=new User();
        System.out.println("修改前:");
        System.out.println(user.toString());

        Class userClass=user.getClass();
        Field name=userClass.getDeclaredField("name");
        Field id=userClass.getDeclaredField("id");
        Field age=userClass.getDeclaredField("age");
        Field height=userClass.getDeclaredField("height");

        unsafe.putObject(user,unsafe.objectFieldOffset(name),"mashuai");
        unsafe.putObject(user,unsafe.objectFieldOffset(id),007);
        unsafe.putObject(user,unsafe.objectFieldOffset(age),18);
        unsafe.putObject(user,unsafe.objectFieldOffset(height),185.5);

        System.out.println("修改后:");
        System.out.println(user.toString());
    }
}
class User {
    private String name = "test";
    private long id = 1;
    private int age = 2;
    private double height = 1.72;

    @Override
    public String toString() {
        return name + "," + id + "," + age + "," + height;
    }
}



运行结果:


2、简单的修改一个byte[]的数据的演示

public class AQSTest {
    private static int byteArrayBaseOffset;

    public static void main(String[] args)
            throws SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
        // 通过反射得到theUnsafe对应的Field对象
        Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
        // 设置该Field为可访问
        theUnsafe.setAccessible(true);
        // 通过Field得到该Field对应的具体对象,传入null是因为该Field为static的
        Unsafe UNSAFE = (Unsafe) theUnsafe.get(null);
        System.out.println("对象:"+UNSAFE);

        byte[] data = new byte[10];
        System.out.println("修改前:");
        System.out.println(Arrays.toString(data));
        //arrayBaseOffset方法是一个本地方法,可以获取数组第一个元素的偏移地址
        byteArrayBaseOffset = UNSAFE.arrayBaseOffset(byte[].class);

        System.out.println("偏移地址为:"+byteArrayBaseOffset);
        //putByte直接修改内存数据
        UNSAFE.putByte(data, byteArrayBaseOffset, (byte) 1);
        UNSAFE.putByte(data, byteArrayBaseOffset + 5, (byte) 5);
        System.out.println("修改后:");
        System.out.println(Arrays.toString(data));
    }
}



运行结果:


LockSupport

LockSupport是JDK中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语。java锁和同步器框架的核心 AQS: AbstractQueuedSynchronizer,就是通过调用 LockSupport .park()和 LockSupport .unpark()实现线程的阻塞和解除阻塞的。LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。

LockSupport类是Java6(JSR166-JUC)引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:

park:阻塞当前线程(Block current thread),字面理解park,就算占住,停车的时候不就把这个车位给占住了么?起这个名字还是很形象的。

unpark: 使给定的线程停止阻塞(Unblock the given thread blocked )。

 public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

    public static void park() {
        UNSAFE.park(false, 0L);
    }



两个函数声明清楚地说明了操作对象:park函数是将当前Thread阻塞,而unpark函数则是将指定线程Thread唤醒。



与Object类的wait/notify机制相比,park/unpark有两个优点:

  1. 以thread为操作对象更符合阻塞线程的直观定义;

  2. 操作更精准,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll唤醒所有等待的线程),增加了灵活性。

  3. park/unpark模型真正解耦了线程之间的同步,线程之间不再需要一个Object或者其它变量来存储状态,不再需要关心对方的状态。

  4. wait让线程阻塞前,必须通过synchronized获取同步锁,而park不需要。



    关于许可

    在上面的文字中,我使用了阻塞和唤醒,是为了和wait/notify做对比。

    其实park/unpark的设计原理核心是“许可”。park是等待一个许可。unpark是为某线程提供一个许可。如果某线程A调用park,那么除非另外一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。

    有一点比较难理解的,是unpark操作可以再park操作之前。也就是说,先提供许可。当某线程调用park时,已经有许可了,它就消费这个许可,然后可以继续运行。这其实是必须的。考虑最简单的生产者(Producer)消费者(Consumer)模型:Consumer需要消费一个资源,于是调用park操作等待;Producer则生产资源,然后调用unpark给予Consumer使用的许可。非常有可能的一种情况是,Producer先生产,这时候Consumer可能还没有构造好(比如线程还没启动,或者还没切换到该线程)。那么等Consumer准备好要消费时,显然这时候资源已经生产好了,可以直接用,那么park操作当然可以直接运行下去。如果没有这个语义,那将非常难以操作。

    但是这个“许可”是不能叠加的,“许可”是一次性的。比如线程B连续调用了三次unpark函数,当线程A调用park函数就使用掉这个“许可”,如果线程A再次调用park,则进入等待状态。



    Unsafe.park和Unsafe.unpark的底层实现原理

    在Linux系统下,是用的Posix线程库pthread中的mutex(互斥量),condition(条件变量)来实现的。 mutex和condition保护了一个_counter的变量,当park时,这个变量被设置为0,当unpark时,这个变量被设置为1。

    park 过程

    当调用park时,先尝试能否直接拿到“许可”,即_counter>0时,如果成功,则把_counter设置为0,并返回:

    ``` void Parker::park(bool isAbsolute, jlong time) {

// Ideally we’d do something useful while spinning, such
// as calling unpackTime().

// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.

if (Atomic::xchg(0, &_counter) > 0) return;

<br><br>
如果不成功,则构造一个ThreadBlockInVM,然后检查_counter是不是>0,如果是,则把_counter设置为0,unlock mutex并返回:<br><br>

ThreadBlockInVM tbivm(jt);
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);

<br><br>
否则,再判断等待的时间,然后再调用pthread_cond_wait函数等待,如果等待返回,则把_counter设置为0,unlock mutex并返回:<br><br>

if (time == 0) {
status = pthread_cond_wait (_cond, _mutex) ;
}
_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, “invariant”) ;
OrderAccess::fence();

<br><br>
unpark 过程<br><br>
当unpark时,则简单多了,直接设置_counter为1,再unlock mutex返回。如果_counter之前的值是0,则还要调用pthread_cond_signal唤醒在park中等待的线程:<br><br>

void Parker::unpark() {
int s, status ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, “invariant”) ;
s = _counter;
_counter = 1;
if (s < 1) {
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (_cond) ;
assert (status == 0, “invariant”) ;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, “invariant”) ;
} else {
status = pthread_mutex_unlock(_mutex);
assert (status == 0, “invariant”) ;
status = pthread_cond_signal (_cond) ;
assert (status == 0, “invariant”) ;
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, “invariant”) ;
}
}

<br><br>
## LockSupport实例代码演示
展示Locksupport的用法:<br><br>

public class LockSupportTest { private static Thread mainThread;

public static void main(String[] args) {

    ThreadA ta = new ThreadA("ta");
    // 获取主线程
    mainThread = Thread.currentThread();

    System.out.println(Thread.currentThread().getName() + " start ta");
    ta.start();
    LockSupport.unpark(mainThread);
    System.out.println(Thread.currentThread().getName() + " block");
    // 主线程阻塞
    LockSupport.park(mainThread);
    System.out.println(Thread.currentThread().getName() + " wakeup by self");

    System.out.println(Thread.currentThread().getName() + " block again");
    LockSupport.park(mainThread);

    System.out.println(Thread.currentThread().getName() + " continue");
}

static class ThreadA extends Thread {

    public ThreadA(String name) {
        super(name);
    }

    public void run() {
        System.out.println(Thread.currentThread().getName() + " wakeup others");
        // 唤醒“主线程”
        LockSupport.unpark(mainThread);
    }
} } ``` <br><br> 运行结果:<br> ![](https://itmanmzt.github.io/styles/images/aqs/003.jpg){:align="center"}<br><br>

LockSupport对应中断的响应性

public class LockSupportTest2 {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(new Runnable()
        {
            private int count = 0;

            @Override
            public void run()
            {
                long start = System.currentTimeMillis();
                long end = 0;

                while ((end - start) <= 1000)
                {
                    count++;
                    end = System.currentTimeMillis();
                }

                System.out.println("after 1 second.count=" + count);

                //等待或许许可
                LockSupport.park();
                System.out.println("thread over." + Thread.currentThread().isInterrupted());

            }
        });

        t.start();

        Thread.sleep(2000);

        // 中断线程
        t.interrupt();


        System.out.println("main over");
    }
}



运行结果:


PS:线程如果因为调用park而阻塞的话,能够响应中断请求(中断状态被设置成true),但是不会抛出InterruptedException 。

CLH同步队列

CLH(Craig, Landin, andHagersten locks)同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,内部使用一个int成员变量表示同步状态,代表共享资源,通过内置的FIFO队列来完成资源获取线程的排队工作,其中内部状态state,等待队列的头节点head和尾节点head,都是通过volatile修饰,保证了多线程之间的可见。当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

CLH是一钟自旋锁,能确保无饥饿性,提供先来先服务的公平性。它是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义如下:

static final class Node {
    /** 共享 */
    static final Node SHARED = new Node();

    /** 独占 */
    static final Node EXCLUSIVE = null;

    /**
     * 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
     */
    static final int CANCELLED =  1;

    /**
     * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
     */
    static final int SIGNAL    = -1;

    /**
     * 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
     */
    static final int CONDITION = -2;

    /**
     * 表示下一次共享式同步状态获取将会无条件地传播下去
     */
    static final int PROPAGATE = -3;

    /** 等待状态 */
    volatile int waitStatus;

    /** 前驱节点 */
    volatile Node prev;

    /** 后继节点 */
    volatile Node next;

    /** 获取同步状态的线程 */
    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {
    }

    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

CLH同步队列结构图如下:

同步器是默认head节点,其实是一个空节点,我觉得可以理解成代表当前持有锁的线程,每当有线程竞争失败,都是插入到队列的尾节点,tail节点始终指向队列中的最后一个元素。

每个节点中, 除了存储了当前线程,前后节点的引用以外,还有一个waitStatus变量,用于描述节点当前的状态。多线程并发执行时,队列中会有多个节点存在,这个waitStatus其实代表对应线程的状态:有的线程可能获取锁因为某些原因放弃竞争;有的线程在等待满足条件,满足之后才能执行等等。一共有4中状态:

CANCELLED 取消状态 SIGNAL 等待触发状态 CONDITION 等待条件状态 PROPAGATE 状态需要向后传播

等待队列是FIFO先进先出,只有前一个节点的状态为SIGNAL时,当前节点的线程才能被挂起。

入列 学了数据结构的我们,CLH队列入列是再简单不过了,无非就是tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。代码我们可以看看addWaiter(Node node)方法:

private Node addWaiter(Node mode) {
        //新建Node
        Node node = new Node(Thread.currentThread(), mode);
        //快速尝试添加尾节点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //CAS设置尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //多次尝试
        enq(node);
        return node;
    

addWaiter(Node node)先通过快速尝试设置尾节点,如果失败,则调用enq(Node node)方法设置尾节点

private Node enq(final Node node) {
        //多次尝试,直到成功为止
        for (;;) {
            Node t = tail;
            //tail不存在,设置为首节点
            if (t == null) {
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //设置为尾节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在上面代码中,两个方法都是通过一个CAS方法compareAndSetTail(Node expect, Node update)来设置尾节点,该方法可以确保节点是线程安全添加的。在enq(Node node)方法中,AQS通过“死循环”的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。

过程图如下:

出列 CLH同步队列遵循FIFO,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可,注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态。 过程图如下:





AQS是构建Java同步组件的基础,我们期待它能够成为实现大部分同步需求的基础。AQS的设计模式采用的模板方法模式,子类通过继承的方式,实现它的抽象方法来管理同步状态,对于子类而言它并没有太多的活要做,AQS提供了大量的模板方法来实现同步,主要是分为三类:独占式获取和释放同步状态、共享式获取和释放同步状态、查询同步队列中的等待线程情况。自定义子类使用AQS提供的模板方法就可以实现自己的同步语义。

独占式同步状态的获取和释放

独占式,同一时刻仅有一个线程持有同步状态。

独占式同步状态获取

acquire(int arg)方法为AQS提供的模板方法,该方法为独占式获取同步状态,但是该方法对中断不敏感,也就是说由于线程获取同步状态失败加入到CLH同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除。代码如下:

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }



各个方法定义如下:

tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法自定义同步组件自己实现,该方法必须要保证线程安全的获取同步状态。在AbstractQueuedSynchronizer中实现了公平锁和非公平锁的两种方式。

addWaiter:如果tryAcquire返回FALSE(获取同步状态失败),则调用该方法将当前线程加入到CLH同步队列尾部。

acquireQueued:当前线程会根据公平性原则来进行阻塞等待(自旋),直到获取锁为止;并且返回当前线程在等待过程中有没有中断过。

selfInterrupt:产生一个中断。

下列步骤中线程A和B进行竞争。

1、线程A执行CAS执行成功,state值被修改并返回true,线程A继续执行。

2、线程A执行CAS指令失败,说明线程B也在执行CAS指令且成功,这种情况下线程A会执行步骤3。

3、生成新Node节点node,并通过CAS指令插入到等待队列的队尾(同一时刻可能会有多个Node节点插入到等待队列中),如果tail节点为空,则将head节点指向一个空节点(代表线程B),具体实现如下:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}



4、node插入到队尾后,该线程不会立马挂起,会进行自旋操作。因为在node的插入过程,线程B(即之前没有阻塞的线程)可能已经执行完成,所以要判断该node的前一个节点pred是否为head节点(代表线程B),如果pred == head,表明当前节点是队列中第一个“有效的”节点,因此再次尝试tryAcquire获取锁:

1)、如果成功获取到锁,表明线程B已经执行完成,线程A不需要挂起。

2)、如果获取失败,表示线程B还未完成,至少还未修改state值。进行步骤5。

实现过程如下:

 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //中断标志
            boolean interrupted = false;
            /*
             * 自旋过程,其实就是一个死循环而已
             */
            for (;;) {
                //当前线程的前驱节点
                final Node p = node.predecessor();
                //当前线程的前驱节点是头结点,且同步状态成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //获取失败,线程等待--具体后面介绍
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }



5、前面我们已经说过只有前一个节点pred的线程状态为SIGNAL时,当前节点的线程才能被挂起。

1)、如果pred的waitStatus == 0,则通过CAS指令修改waitStatus为Node.SIGNAL。

2)、如果pred的waitStatus > 0,表明pred的线程状态CANCELLED,需从队列中删除。

3)、如果pred的waitStatus为Node.SIGNAL,则通过LockSupport.park()方法把线程A挂起,并等待被唤醒,被唤醒后进入步骤6。

具体实现如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}



6、线程每次被唤醒时,都要进行中断检测,如果发现当前线程被中断,那么抛出InterruptedException并退出循环。从无限循环的代码可以看出,并不是被唤醒的线程一定能获得锁,必须调用tryAccquire重新竞争,因为锁是非公平的,有可能被新加入的线程获得,从而导致刚被唤醒的线程再次被阻塞,这个细节充分体现了“非公平”的精髓。

从上面代码中可以看到,当前线程会一直尝试获取同步状态,当然前提是只有其前驱节点为头结点才能够尝试获取同步状态,理由:

1)、保持FIFO同步队列原则。

2)、头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点。

acquire(int arg)方法流程图如下:



独占式获取响应中断

AQS提供了acquire(int arg)方法以供独占式获取同步状态,但是该方法对中断不响应,对线程进行中断操作后,该线程会依然位于CLH同步队列中等待着获取同步状态(这一点在前面的LockSupport的实例演示代码中有效果演示)。为了响应中断,AQS提供了acquireInterruptibly(int arg)方法,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常InterruptedException。

  public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }



首先校验该线程是否已经中断了,如果是则抛出InterruptedException,否则执行tryAcquire(int arg)方法获取同步状态,如果获取成功,则直接返回,否则执行doAcquireInterruptibly(int arg)。doAcquireInterruptibly(int arg)定义如下:

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }



doAcquireInterruptibly(int arg)方法与acquire(int arg)方法仅有两个差别。1.方法声明抛出InterruptedException异常,2.在中断方法处不再是使用interrupted标志,而是直接抛出InterruptedException异常。

独占式超时获取

AQS除了提供上面两个方法外,还提供了一个增强版的方法:tryAcquireNanos(int arg,long nanos)。该方法为acquireInterruptibly方法的进一步增强,它除了响应中断外,还有超时控制。即如果当前线程没有在指定时间内获取同步状态,则会返回false,否则返回true。如下:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }



tryAcquireNanos(int arg, long nanosTimeout)方法超时获取最终是在doAcquireNanos(int arg, long nanosTimeout)中实现的,如下:

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        //nanosTimeout <= 0
        if (nanosTimeout <= 0L)
            return false;
        //超时时间
        final long deadline = System.nanoTime() + nanosTimeout;
        //新增Node节点
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            //自旋
            for (;;) {
                final Node p = node.predecessor();
                //获取同步状态成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                /*
                 * 获取失败,做超时、中断判断
                 */
                //重新计算需要休眠的时间
                nanosTimeout = deadline - System.nanoTime();
                //已经超时,返回false
                if (nanosTimeout <= 0L)
                    return false;
                //如果没有超时,则等待nanosTimeout纳秒
                //注:该线程会直接从LockSupport.parkNanos中返回,
                //LockSupport为JUC提供的一个阻塞和唤醒的工具类,后面做详细介绍
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //线程是否已经中断了
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }



针对超时控制,程序首先记录唤醒时间deadline ,deadline = System.nanoTime() + nanosTimeout(时间间隔)。如果获取同步状态失败,则需要计算出需要休眠的时间间隔nanosTimeout(= deadline - System.nanoTime()),如果nanosTimeout <= 0 表示已经超时了,返回false,如果大于spinForTimeoutThreshold(1000L)则需要休眠nanosTimeout ,如果nanosTimeout <= spinForTimeoutThreshold ,就不需要休眠了,直接进入快速自旋的过程。原因在于 spinForTimeoutThreshold 已经非常小了,非常短的时间等待无法做到十分精确,如果这时再次进行超时等待,相反会让nanosTimeout 的超时从整体上面表现得不是那么精确,所以在超时非常短的场景中,AQS会进行无条件的快速自旋。

整个流程如下:



独占式同步状态释放

当线程获取同步状态后,执行完相应逻辑后就需要释放同步状态。AQS提供了release(int arg)方法释放同步状态:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }



该方法同样是先调用自定义同步器自定义的tryRelease(int arg)方法来释放同步状态,释放成功后,会调用unparkSuccessor(Node node)方法唤醒后继节点

 private void unparkSuccessor(Node node) {
        /*
         * 如果状态为负(可能需要信号),请尝试清除预期信令。 如果失败或者等待
         * 线程改变了状态,则可以。
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * 线程到unpark是在后继节点,通常只是下一个节点。 但如果取消或显然为空,
         * 则从尾部向后移动以找到实际未取消的后继者。
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }



1、如果头结点head的waitStatus值为-1,则用CAS指令重置为0;

2、找到waitStatus值小于0的节点s,通过LockSupport.unpark(s.thread)唤醒线程。

小结



在AQS中维护着一个FIFO的同步队列,当线程获取同步状态失败后,则会加入到这个CLH同步队列的对尾并一直保持着自旋。在CLH同步队列中的线程在自旋时会判断其前驱节点是否为首节点,如果为首节点则不断尝试获取同步状态,获取成功则退出CLH同步队列。当线程执行完逻辑后,会释放同步状态,释放后会唤醒其后继节点。

共享式同步状态的获取和释放

共享式与独占式的最主要区别在于同一时刻独占式只能有一个线程获取同步状态,而共享式在同一时刻可以有多个线程获取同步状态。例如读操作可以有多个线程同时进行,而写操作同一时刻只能有一个线程进行写操作,其他操作都会被阻塞。

共享式同步状态获取

QS提供acquireShared(int arg)方法共享式获取同步状态:

 public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            //获取失败,自旋获取同步状态
            doAcquireShared(arg);
    }



从上面程序可以看出,方法首先是调用tryAcquireShared(int arg)方法尝试获取同步状态,如果获取失败则调用doAcquireShared(int arg)自旋方式获取同步状态,共享式获取同步状态的标志是返回 >= 0 的值表示获取成功。自选式获取同步状态如下:

private void doAcquireShared(int arg) {
        /共享式节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //前驱节点
                final Node p = node.predecessor();
                //如果其前驱节点,获取同步状态
                if (p == head) {
                    //尝试获取同步
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }



tryAcquireShared(int arg)方法尝试获取同步状态,返回值为int,当其 >= 0 时,表示能够获取到同步状态,这个时候就可以从自旋过程中退出。

acquireShared(int arg)方法不响应中断,与独占式相似,AQS也提供了响应中断、超时的方法,分别是:acquireSharedInterruptibly(int arg)、tryAcquireSharedNanos(int arg,long nanos),这里就不做解释了。

共享式同步状态释放

获取同步状态后,需要调用release(int arg)方法释放同步状态,方法如下:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }



因为可能会存在多个线程同时进行释放同步状态资源,所以需要确保同步状态安全地成功释放,一般都是通过CAS和循环来完成的。

阻塞和唤醒线程

阻塞线程

在线程获取同步状态时如果获取失败,则加入CLH同步队列,通过通过自旋的方式不断获取同步状态,但是在自旋的过程中则需要判断当前线程是否需要阻塞,其主要方法在acquireQueued():

if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;



通过这段代码我们可以看到,在获取同步状态失败后,线程并不是立马进行阻塞,需要检查该线程的状态,检查状态的方法为 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,该方法主要靠前驱节点判断当前线程是否应该被阻塞,代码如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前驱节点
        int ws = pred.waitStatus;
        //状态为signal,表示当前线程处于等待状态,直接放回true
        if (ws == Node.SIGNAL)
            return true;
        //前驱节点状态 > 0 ,则为Cancelled,表明该节点已经超时或者被中断了,需要从同步队列中取消
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } 
        //前驱节点状态为Condition、propagate
        else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }



这段代码主要检查当前线程是否需要被阻塞,具体规则如下:

1、如果当前线程的前驱节点状态为SINNAL,则表明当前线程需要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞;

2、如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false;

3、如果前驱节点非SINNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SINNAL,返回false;

如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程:

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }



parkAndCheckInterrupt() 方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。其内部则是调用LockSupport工具类的park()方法来阻塞该方法。

唤醒线程

当线程释放同步状态后,则需要唤醒该线程的后继节点:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }



调用unparkSuccessor(Node node)唤醒后继节点:

private void unparkSuccessor(Node node) {
        //当前节点状态
        int ws = node.waitStatus;
        //当前状态 < 0 则设置为 0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //当前节点的后继节点
        Node s = node.next;
        //后继节点为null或者其状态 > 0 (超时或者被中断了)
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从tail节点来找可用节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //唤醒后继节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }



可能会存在当前线程的后继节点为null,超时、被中断的情况,如果遇到这种情况了,则需要跳过该节点,但是为何是从tail尾节点开始,而不是从node.next开始呢?原因在于node.next仍然可能会存在null或者取消了,所以采用tail回溯办法找第一个可用的线程。最后调用LockSupport的unpark(Thread thread)方法唤醒该线程。



有Marin的地方就有你的收获