Semaphore实现原理
1 构造方法
从概念上讲,Semaphore维护一组许可,由一个可以递增或递减的计数器值表示,用来控制同时访问特定资源的线程数目。
Semaphore信号量来实现线程间通信,Semaphore支持公平锁和非公平锁,Semaphore底层是通过共享锁来实现的,其支持两种构造函数,如下所示:
// 默认使用非公平锁实现
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
2 Semaphore方法
//尝试获取一个信号量,如果信号量不为0,那么将信号量-1,返回
//如果信号量为0,WAITING直到信号量不为0
//可中断
public void acquire() throws InterruptedException
//尝试获取多个信号量,如果信号量足够,那么将信号量-permits,返回
//如果信号量不够,WAITING直到信号量不为0
//可中断
public void acquire(int permits) throws InterruptedException
//同acquire(),但不可中断
public void acquireUninterruptibly()
//同acquire(int permits),但不可中断
public void acquireUninterruptibly(int permits)
//释放一个信号量
public void release()
//释放permits个信号量
public void release(int permits)
3 Semaphore内部类及继承关系

Semaphore与ReentrantLock的内部类的结构相同,类内部总共存在Sync、NonfairSync、FairSync三个类,NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类。
3.1 类的内部类 - Sync类
// 内部类,继承自AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
// 版本号
private static final long serialVersionUID = 1192457210091910933L;
// 构造函数
Sync(int permits) {
// 设置状态数
setState(permits);
}
// 获取许可
final int getPermits() {
return getState();
}
// 共享模式下非公平策略获取
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // 无限循环
// 获取许可数
int available = getState();
// 剩余的许可
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) // 许可小于0或者比较并且设置状态成功
return remaining;
}
}
// 共享模式下进行释放
protected final boolean tryReleaseShared(int releases) {
for (;;) { // 无限循环
// 获取许可
int current = getState();
// 可用的许可
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // 比较并进行设置成功
return true;
}
}
// 根据指定的缩减量减小可用许可的数目
final void reducePermits(int reductions) {
for (;;) { // 无限循环
// 获取许可
int current = getState();
// 可用的许可
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next)) // 比较并进行设置成功
return;
}
}
// 获取并返回立即可用的所有许可
final int drainPermits() {
for (;;) { // 无限循环
// 获取许可
int current = getState();
if (current == 0 || compareAndSetState(current, 0)) // 许可为0或者比较并设置成功
return current;
}
}
}
3.2 类的内部类 - NonfairSync类
static final class NonfairSync extends Sync {
// 版本号
private static final long serialVersionUID = -2694183684443567898L;
// 构造函数
NonfairSync(int permits) {
super(permits);
}
// 共享模式下获取
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
从tryAcquireShared方法的源码可知,其会调用父类Sync的nonfairTryAcquireShared方法,表示按照非公平策略进行资源的获取。
3.3 类的内部类 - FairSync类
protected int tryAcquireShared(int acquires) {
for (;;) { // 无限循环
if (hasQueuedPredecessors()) // 同步队列中存在其他节点
return -1;
// 获取许可
int available = getState();
// 剩余的许可
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) // 剩余的许可小于0或者比较设置成功
return remaining;
}
}
从tryAcquireShared方法的源码可知,它使用公平策略来获取资源,它会判断同步队列中是否存在其他的等待节点。
3.4 类的属性
public class Semaphore implements java.io.Serializable {
// 版本号
private static final long serialVersionUID = -3222578661600680210L;
// 属性
private final Sync sync;
}
Semaphore自身只有两个属性,最重要的是sync属性,基于Semaphore对象的操作绝大多数都转移到了对sync的操作。
4 Semaphore.acquire流程分析(以非公平锁为例)

从上图可以看出,针对阻塞线程的部分实现,和ReentrantLock基本一致,我们不做赘述,主要来看下前半部分的源码实现:
// Semaphore.java
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果线程是中断状态,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取共享资源
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
从源码可以看出acquire主要依赖于tryAcquireShared和doAcquireSharedInterruptibly。

4.1 tryAcquireShared
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
//判断是否还有令牌
int remaining = available - acquires;
//无论是否还有令牌,都要返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// AbstractQueuedSynchronizer.java
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
从代码可以看出这里主要是根据申请的许可证数量,比较时否有许可证数量,如果可用许可证数量小于0,则直接返回,如果大于0,则通过CAS将state设置为可用许可证数量。
4.2 doAcquireSharedInterruptibly
当tryAcquireShared中返回的可用许可证数量小于0时,执行doAcquireSharedInterruptibly流程,代码如下:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//加入同步队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//自旋获取锁
for (;;) {
final Node p = node.predecessor();
//判断上一个节点是否是头节点
if (p == head) {
//如果是头节点则尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取到锁,通知其他节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//判断是否需要阻塞线程,设置waitStatus并阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// AbstractQueuedSynchronizer.java
// 在队尾新建Node对象并添加
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
执行setHeadAndPropagate的主要目的在于,这里能获取到说明在该线程自旋过程中有线程释放了许可证,释放的许可证数量有可能还有剩余,所以传递给其他节点的线程,唤醒其他阻塞状态的线程也尝试去获取许可证。
5 Semaphore.release流程分析(以非公平锁为例)

Semaphore.release流程相对而言,就比较简单,将release传递到AQS内部通过CAS更新许可证数量信息,更新完成后,遍历队列中Node节点,将Node waitStatus设置为0,并对对应线程执行unpark,相关代码如下:
@ReservedStackAccess
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 通过CAS更新许可证数量
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
// 许可证数量更新完成后,调用该方法唤醒线程
private void doReleaseShared() {
// 自旋
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继节点线程抢占许可证
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
6 公平锁
我们分析了Smaphore非公平锁的实现,公平锁的实现其本质区别在于在tryAcquireShared中只有当等待队列为空时,才会去尝试更新剩余许可证数量。
protected int tryAcquireShared(int acquires) {
for (;;) {
//判断是否是头节点
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
7 Semaphore和ReentrantLock的区别
- 可重入的性质
Semaphores在本质上是非可重入的 ,这意味着我们不能在同一个线程中第二次获得Semaphore。试图这样做会导致死锁(一个线程与自己死锁)。
另一方面, 可重入锁在本质上是可重入的,允许一个线程使用lock() 方法多次锁定一个特定的资源。 - 同步机制
Semaphores很适合信号传递(信号机制),线程使用acquire()&release() 方法来标记访问关键资源的开始和结束。
ReentrantLock 使用锁定机制,使用lock() 方法锁定一个特定的资源 ,在对该资源进行特定操作后,使用unlock() 方法释放该锁。 - 死锁恢复
Semaphores提供了一个强大的死锁恢复机制,因为它使用了一个非所有权的释放机制,因此任何线程都可以释放一个许可,以恢复一个卡住或等待的线程的死锁情况。
在 ReentrantLock的情况下,死锁恢复是有点困难的,因为它使用线程对资源的所有权,通过物理锁定它,只有所有者线程可以解锁该资源。如果所有者Thread进入无限等待或睡眠状态,就不可能释放该特定资源的锁,从而导致死锁情况。 - 抛出IllegalMonitorStateException
在Semaphores中,没有线程拥有获取或释放许可的所有权,所以任何线程都可以调用release() 方法来释放任何其他线程的许可,没有线程会引发 IllegalMonitorStateException。
在可重入锁中,一个Thread 通过调用lock() 方法成为一个关键共享资源的所有者,如果其他Thread在没有拥有锁的情况下调用unlock() 方法,那么 它将会产生 IllegalMonitorStateException。 - 修改
任何线程都可以使用Semaphore的acquire() 和release() 方法来修改它的可用许可。
只有通过lock()方法拥有资源的当前所有者线程可以修改ReentrantLock,而其他线程不允许这样做。
Semaphores可以用于非所有权-释放语义,即不止一个Thread 可以进入一个关键部分,并且不需要锁定机制来锁定一个共享资源。根据设计,Semaphore对哪个线程调用acquisition()和release()方法是盲目的,它所关心的是许可成为可用的。
如果我们需要可重入互斥或一个简单的互斥 ,那么 ReentrantLock是最好的选择。可重入锁 提供了对锁机制更好的控制,并且允许每次只有一个线程访问关键部分,从而提供了同步性,并消除了在多线程应用程序中工作时的数据不一致问题。
8 场景问题
semaphore初始化有10个令牌,11个线程同时各调用1次acquire方法,会发生什么?
答案:拿不到令牌的线程阻塞,不会继续往下运行。
semaphore初始化有10个令牌,一个线程重复调用11次acquire方法,会发生什么?
答案:线程阻塞,不会继续往下运行。可能你会考虑类似于锁的重入的问题,很好,但是,令牌没有重入的概念。你只要调用一次acquire方法,就需要有一个令牌才能继续运行。
semaphore初始化有1个令牌,1个线程调用一次acquire方法,然后调用两次release方法,之后另外一个线程调用acquire(2)方法,此线程能够获取到足够的令牌并继续运行吗?
答案:能,原因是release方法会添加令牌,并不会以初始化的大小为准。
semaphore初始化有2个令牌,一个线程调用1次release方法,然后一次性获取3个令牌,会获取到吗?
答案:能,原因是release会添加令牌,并不会以初始化的大小为准。Semaphore中release方法的调用并没有限制要在acquire后调用。
一条小咸鱼