class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
public interface ReadWriteLock { /** * Returns the lock used for reading. * * @return the lock used for reading. */ Lock readLock(); /** * Returns the lock used for writing. * * @return the lock used for writing. */ Lock writeLock(); }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //检测自己是否已经排到第一个了 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } //shouldParkAfterFailedAcquire的作用检测我是不是需要安心地等,如果是的话, //就调用parkAndCheckInterrupt进入等待,等待结束后会返回线程是否已经中断 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //SIGNAL的意思就是说前面那个家伙在释放锁以后会告诉我的,我安心等就是了 return true; if (ws > 0) { /* * ws大于0的意思就是这个Node已经被取消了,需要跳过,并且从队列中清除出去 * 这里会清除我前面所有这种类型的Node */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 这里ws是0或者PROPAGATE,表示他是傻乎乎的家伙,还不知道SIGNAL规则 * 0是独占锁,PROPAGATE是共享锁,compareAndSetWaitStatus会找人把他设成SIGNAL * 状态,(成功与否未知,所以返回false)参见上面关于SIGNAL的解释 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
public final void acquireInterruptibly(long arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
private void doAcquireInterruptibly(long arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); throw new InterruptedException(); }
private boolean doAcquireNanos(long arg, long nanosTimeout) throws InterruptedException { long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return true; } if (nanosTimeout <= 0) { cancelAcquire(node); return false; } // 多次parkNanos,计算实际耗费的时间才是安全的做法 if (nanosTimeout > spinForTimeoutThreshold && shouldParkAfterFailedAcquire(p, node)) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; // 如果线程被中断,不好意思,要抛出异常的 if (Thread.interrupted()) break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); throw new InterruptedException(); }
private void setHeadAndPropagate(Node node, long propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don‘t know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (isFirst(current) && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
final boolean isFirst(Thread current) { Node h, s; return ((h = head) == null || ((s = h.next) != null && s.thread == current) || fullIsFirst(current)); }
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
/* * Note that tryRelease and tryAcquire can be called by * Conditions. So it is possible that their arguments contain * both read and write holds that are all released during a * condition wait and re-established in tryAcquire. */ protected final boolean tryRelease(int releases) { int nextc = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 首先检查独占锁计数,如果是0表示独占锁已经被完全释放,则清除独占锁线程 // 更新状态 if (exclusiveCount(nextc) == 0) { setExclusiveOwnerThread(null); setState(nextc); return true; } else { setState(nextc); return false; } } protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. if read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // c != 0表示有共享锁或者独占锁存在,w == 0表示没有独占锁 // 那么两个条件同时成立表示有共享锁存在,就无法获得独占锁 // 或者有线程拥有独占锁但不是当前线程,那也无权获得独占锁 // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); } // 到了这一步,可能有以下几种情况: // 1) c == 0没有任何锁存在(这个时候 w == 0 也成立) // 2) 当前线程拥有独占锁,并且还没到锁的最大限制数 // w == 0是当前线程没有独占锁,属于新申请 // writerShouldBlock是抽象方法,对于FairSync和UnfairSync有不同实现 // 该发现检查当前线程申请独占锁应不应该被阻止 // 对于FairSync,writerShouldBlock会用isFirst检查, // 对于isFirst,如果如果没人排队,或者你是第一个排队的,或者fullIsFirst就返回true // 对于fullIsFirst,不是很理解 // 对于UnfairSync,writerShouldBlock永远返回false,因为没有排队的概念(体现Unfair) if ((w == 0 && writerShouldBlock(current)) || !compareAndSetState(c, c + acquires)) return false; // 获取独占锁成功,设置独占锁线程 setExclusiveOwnerThread(current); return true; } // 这里使用HoldCounter类型的ThreadLocal变量存储当前线程拥有的共享锁的计数 // cachedHoldCounter缓存最近一次成功获取共享锁的线程的ThreadLocal变量 protected final boolean tryReleaseShared(int unused) { HoldCounter rh = cachedHoldCounter; Thread current = Thread.currentThread(); if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); // tryDecrement()返回拥有的共享锁的计数,大于0则并且更新计数(减1)。 if (rh.tryDecrement() <= 0) throw new IllegalMonitorStateException(); // 更新共享锁的计数 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; // 高位的共享锁计数减一 if (compareAndSetState(c, nextc)) return nextc == 0; } } protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail * 2. If count saturated, throw error * 3. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 4. If step 3 fails either because thread * apparently not eligible or CAS fails, * chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); // 其他线程正在使用独占锁 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 共享锁计数到达最大限制 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 类似于writerShouldBlock,readerShouldBlock是抽象方法,有不同实现, // 检查是不是阻止当前线程共享锁的申请 // 对于UnfairSync,为了防止独占锁饿死的情况,如果发现队列中第一个排队的是独占锁申请, // 就是block当前共享锁的申请 // 对于FairSync,同样使用isFirst检查当前线程 if (!readerShouldBlock(current) && compareAndSetState(c, c + SHARED_UNIT)) { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); rh.count++; return 1; } // 针对CAS失败或者一些不太常见的失败的情况 // 思想:实现常规版本和完整版本(包含所有情况),在常规版本失败的情况下调用完整版本, 提高效率 return fullTryAcquireShared(current); }
/** * Full version of acquire for reads, that handles CAS misses * and reentrant reads not dealt with in tryAcquireShared. */ final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); for (;;) { int c = getState(); int w = exclusiveCount(c); // 红色部分表示没有占用共享锁,新申请共享锁 if ((w != 0 && getExclusiveOwnerThread() != current) || ((rh.count | w) == 0 && readerShouldBlock(current))) return -1; if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { cachedHoldCounter = rh; // cache for release rh.count++; return 1; } } }
《java.util.concurrent 包源码阅读》03 锁,布布扣,bubuko.com
《java.util.concurrent 包源码阅读》03 锁
原文:http://www.cnblogs.com/wanly3643/p/3835839.html