acquire()
方法阻塞,直到有一个许可证可以获得然后拿走一个许可证。release()
方法增加一个许可证,这可能会释放一个阻塞的 acquire()
方法。public class SemaphoreDemo {
private static Semaphore s = new Semaphore(2);
static class ParkTask implements Runnable {
private String name;
public ParkTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
s.acquire();
System.out.println("Thread " + this.name + " start...");
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
s.release();
}
}
public static void main(String[] args) {
ExecutorService pool = Executors.newCachedThreadPool();
pool.submit(new ParkTask("1"));
pool.submit(new ParkTask("2"));
pool.submit(new ParkTask("3"));
pool.submit(new ParkTask("4"));
pool.submit(new ParkTask("5"));
pool.submit(new ParkTask("6"));
pool.shutdown();
}
}
/**
--- print ---
Thread 2 start...
Thread 6 start...
Thread 3 start...
Thread 1 start...
Thread 4 start...
Thread 5 start...
*/
// 创建具有给定的许可数和非公平的公平设置的 Semaphore。
Semaphore(int permits)
// 创建具有给定的许可数和给定的公平设置的 Semaphore。
Semaphore(int permits, boolean fair)
// 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void acquire()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
void acquire(int permits)
// 从此信号量中获取许可,在有可用的许可前将其阻塞。
void acquireUninterruptibly()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信号量中当前可用的许可数。
int availablePermits()
// 获取并返回立即可用的所有许可。
int drainPermits()
// 返回一个 collection,包含可能等待获取的线程。
protected Collection<Thread> getQueuedThreads()
// 返回正在等待获取的线程的估计数目。
int getQueueLength()
// 查询是否有线程正在等待获取。
boolean hasQueuedThreads()
// 如果此信号量的公平设置为 true,则返回 true。
boolean isFair()
// 根据指定的缩减量减小可用许可的数目。
protected void reducePermits(int reduction)
// 释放一个许可,将其返回给信号量。
void release()
// 释放给定数目的许可,将其返回到信号量。
void release(int permits)
// 返回标识此信号量的字符串,以及信号量的状态。
String toString()
// 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
boolean tryAcquire()
// 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
boolean tryAcquire(int permits)
// 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。
boolean tryAcquire(long timeout, TimeUnit unit)
构造函数
//permits是允许同时运行的线程数目
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
NonfairSync(int permits) {
super(permits);
}
......
Sync(int permits) {
setState(permits);
}
响应中断获取资源
acquire()
方法每次获取一个信号量,也可以使用 acquire(int permits)
方法获取指定数量的信号量 。//从semaphore中获取一个许可,线程会一直被阻塞直到获取一个许可或是被中断,获取一个许可后立即返回,并把许可数减1,如果没有可用的许可,当前线程会处于休眠状态直到
//1.某些其他线程调用release方法,并且当前线程是下一个要被分配许可的线程
//2.某些其他线程中断当前线程
//如果当前线程被acquire方法使得中断状态设置为on或者在等待许可时被中断则抛出InterruptedException,并且清除当前线程的已中断状态
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
tryAcquireShared()
方法。
// 非公平锁的获取方式
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();//获取去中的信号量数
int remaining = available - acquires;//剩余信号量数
//1.信号量数大于0,获取共享锁,并设置执行compareAndSetState(available, remaining),返回剩余信号量数
//2.信号量数小于等于0,直接返回负数
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
// 公平锁获取
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a <tt>volatile</tt> read.
* @return current state value
*/
protected final int getState() {
return state;
}
不响应中断获取资源
acquire()
方法一致。//从semaphore中获取一个许可,线程会一直被阻塞直到获取一个许可或是被中断,获取一个许可后立即返回,并把许可数减1,如果没有可用的许可,当前线程会处于休眠状态直到
//1.某些其他线程调用release方法,并且当前线程是下一个要被分配许可的线程
//2.如果当前线程在等待许可时被中断,那么它会接着等待,但是与没有发生中断相比,为线程分配许可的时间可能改变
ublic void acquireUninterruptibly() {
sync.acquireShared(1);
}
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}!
尝试获得信号量
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
释放资源
acquire(10)
最后释放的时候不能只写一个 release()
而是 release(10)
才对。// 尝试释放锁
public final boolean release(int arg) {
// 如果释放锁成功 唤醒同步队列中的后继节点
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 为了方便对比把两个代码放在一块 可以看到 release 中的结构完全一样
// 区别就在于 doReleaseShared 中有更多的判断操作
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); //在里面执行的 unparkSuccessor(h)
return true;
}
return false;
}
tryReleaseShared()
方法判断是否释放成功。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//获取当前许可数量
int current = getState();
//计算回收后的数量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS改变许可数量成功,返回true
if (compareAndSetState(current, next))
return true;
}
}
doReleaseShared()
方法释放阻塞的线程。其他方法
获取当前剩余的信号量数量
public int availablePermits() {
return sync.getPermits();
}
// Sync
final int getPermits() {
return getState();
}
耗尽许可数量
drainPermits()
方法,获取 1 个信号量后将可用的信号量个数置为 0。
drainPermits()
方法后,可以获得一个信号量,剩余 4 个信号量就消失了,总共可用的信号量就变成 6 个了。public int drainPermits() {
return sync.drainPermits();
}
// Sync
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
缩减许可数量
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
// Sync
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
判断 AQS 同步队列中是否还有 Node
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
// AbstractQueuedSynchronizer
public final boolean hasQueuedThreads() {
//头结点不等于尾节点就说明链表中还有元素
return head != tail;
}
release()
操作是统一的。https://www.meiwen.com.cn/subject/expkcxtx.html
https://blog.csdn.net/u014634338/article/details/78701445
原文:https://www.cnblogs.com/youngao/p/12573918.html