AQS

为什么需要AQS

  • 锁和协作类有共同点,都是像闸门一样,如ReentrantLock和Semaphore,lock和acquire都是拿到锁或拿到许可证以后才能继续,unlock和release都是释放已拿到的资源。tryLock()和tryAcquire()都是尝试获取资源,或者一段时间内尝试获取资源,获取成功就返回true.
  • 事实上,不仅是ReentrantLock和Semaphore,包括CountDownLatch\ReentrantReadWriteLock都有类似的"协作"功能,其实,他们底层都用了一个共同的基类,这就是AQS
  • 因为上面的哪些协作类,它们有很多工作都是类似的,所以如果能提取出一个工具类,那么就可以直接用,对于ReentrantLock和Semaphore而言,就可以屏蔽很多细节,只关注它们自己的"业务逻辑"就可以了

AQS的比喻

可以类比成工作中的HR,有面试者来了,安排就坐,叫号,先来后到的事情都是HR的工作,就是AQS的工作,面试官不用关心两个面试官是不是号码冲突,也不管面试官需要一个地方休息,这些都由HR来做。
Semaphore: 一个人面试结束,后面的人才能进来继续面试
CountDownLatch: 群面,等待10人到齐
Semaphore,CountDownLatch这些同步工具类,要做的就是写下自己“要人”的规则,比如是“出一个、进一个”,或者说“凑齐10人,一起面试”,剩下的排队,招呼人都有AQS来做

AQS的作用

AQS是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了AQS以后,更多的协作工具类都可以很方便的被写出来。有了AQS,构建线程协作类就容易多了

AQS内部原理解析

AQS三要素

state

查看在AbstractQueuedSynchronizer中的源码

  • 一个被volatile修饰的普通的int,会根据具体实现类的不同而不同,比如在Semaphore里,他表示“剩余许可证的数量”,而在CountDownLatch里,他表示“还需要倒数的数量”

  • state是被volatile修饰的,会被并发的修改,所以所有修改state的方法都需要保证线程安全,比如getState、setState以及compareAndSetState操作来读取和更新这个状态。这些方法都依赖于j.u.c.atomic包的支持


通过底层指令的原子性,保证了方法的原子性

  • 在ReentrantLock中,state用来表示锁的占有情况,包括可重入计数。当state的值为0的时候,表示该lock不被任何线程所占有
控制线程抢锁和配合的FIFO队列

这个队列用来存放“等待的线程”,AQS就是排队管理器,当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁

AQS会维护一个等待的线程队列,把线程都放在这个队列里,这是一个双向形式的队列

期望协作工具类去实现的获取/释放等重要方法

这里的获取和释放方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义各不相同。

获取方法
获取操作会依赖state变量,经常会阻塞(比如获取不到锁的时候)

释放方法
释放操作不会阻塞

应用实例、源码解析

AQS用法

第一步:写一个类,想好协作的逻辑,实现获取/释放方法

第二步:内部写一个Sync类继承AbstractQueuedSynchronizer

第三步:如果独占则重写tryAcquire和tryRelease,共享则重写tryAcquireShared和tryReleaseShared等,在之前写的获取和释放方法中调用AQS的acquire和release或者Shared方法

AQS在CountDownLatch的应用

构造函数


在创建的时候,传入一个值,即倒数的数量,紧接着调用Sync的构造方法

Sync将AQS的state赋值为传入的值大小

getCount


同样是调用了Sync的getConut方法

Sync的getConut方法返回出AQS的state值

countDown


调用sync的releaseShared方法,默认传1


接着,首先调用tryReleaseShared方法,此方法在sync中实现


首先,判断当前state值是否等于0,等于0代表当前已经完全释放,本次无需释放,返回false,不是则利用cas将state减1,然后,返回state是否等于0,等于在代表当前需要释放。


当返回结果为true的时候,将会调用doReleaseShared,释放之前陷入等待的线程。

await


同样是调用了Sync的acquireSharedInterruptibly方法


先判断中断,然后调用tryAcquireShared(arg)方法判断是否小于0,是的话就调用doAcquireSharedInterruptibly(arg),将当前方法放入等待队列,如果不小于0就代表正常获取到了这把锁。接着进入CountDownLatch的Sync实现的tryAcquireShared

tryAcquireShared获取当前的状态是否为0,是的话就返回1,否则返回-1.

而放入等待队列里,首先使用addWaiter将其包装成Node节点,一个Node节点里包含一个线程,而接下来具体的阻塞方法是在parkAndCheckInterrupt()方法中实现,parkAndCheckInterrupt()方法调用了LockSupport.park()

LockSupport.park()将UNSAFE.park进行包装,而UNSAFE.park(false, 0L)方法是一个native方法,所作的操作就是将当前线程挂起,进入阻塞状态

总结
  • 调用CountDownLatch的await方法时,便会尝试获取共享锁,不过一开始是获取不到该锁的,于是线程被阻塞。
  • 而共享锁可获取道的条件,就是锁计数器的值为0
  • 锁计数器的初始为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将锁计数器 -1
  • count个线程调用countDown()之后,"锁计数器"才为0么,而前面提到的等待获取共享锁的线程才能继续运行

AQS在Semaphore的应用

构造方法


传入一个值,然后调用NonfairSync方法


NonfairSync中调用super方法


然后再Sync中设置state为传进来的值

acquire 获取许可证


首先判断获取的数量是否小于1,小于1则抛出异常,否则就调用sync.acquireSharedInterruptibly(permits)


可以看到最终还是要调用tryAcquireShared,而tryAcquireShared又分为公平和非公平的方法。以非公平为例


非公平的会调用nonfairTryAcquireShared方法,


这个方法是获取我们的许可证数量,首先判断许可证数量是否大于需要获取的的数量,如果小于则直接返回负数代表获取失败,进入等待队列,如果大于就用自旋加CAS来改变state状态,直到改变成功返回正数获取成功,不必进入等待队列。如果自旋期间其他人改变了导致剩余数量不够,也会返回负数代表获取失败。

AQS在ReentrantLock的应用

unlock


直接调用sync.release的方法


release方法首先调用了tryRelease方法,如果tryRelease返回false,则代表锁还不能释放,如果返回true,则代表锁已经可以完全释放,那么就会从后面的节点中选择一个进行唤醒。


tryRelease方法中首先判断当前线程是否是持有锁的线程,因为只有持有锁才可以解锁,如果持有锁,同时也会判断两种情况,第一种就是重入很多次,那么本次只是减少一次重入次数。另一种就是没有重入,或重入次数已经为0,才会去释放锁。

lock


lock是个接口,因为分为公平和非公平,接下来以非公平为例


在非公平的情况下,会先进行cas操作,其中expect为0,也就是说只有在state为0的时候操作才会成功,也就是在没有任何人持有这把锁的时候,会执行成功,并把当前的线程设置为持有锁的线程。如果失败则会进入acquire方法


acquire方法中最主要的就是tryAcquire方法,继续进入tryAcquire,如果此方法返回了false,则会执行addWaiter(Node.EXCLUSIVE)方法,将当前线程添加到队列中,然后执行acquireQueued,这个方法的核心就是将当前线程进行等待,如果有机会获取锁,则会进行获取锁操作。


在非公平的方法中,tryAcquire会执行nonfairTryAcquire方法


首先,获取当前的状态是否为0,如果为0则代表当前锁没有被任何人持有,就会通过cas和线程获取的方式,把当前线程给获取到。如果不为0,则判断当前持有锁的线程是不是当前线程,如果是,就是说明当前操作时进行了重入操作,将当前state加1。如果不是当前线程,则代表锁被其他线程持有,那么就返回不成功。返回false.

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注