刚刚看完了并发实践这本书,算是理论具备了,看到了AQS的介绍,再看看源码,发现要想把并发理解透还是很难得,花了几个小时细分析了一下把可能出现的场景尽可能的往代码中去套,还是有些收获,但是真的很费脑,还是对多线程的理解太浅了,不多说了,直接上代码吧。
这段代码不是为跑通,只是把AQS,ReentrantLock中的部分源码合并到了一起,便于理解。
1 package com.yb.interview.concurrent; 2 3 4 import java.util.concurrent.locks.LockSupport; 5 6 public class AQSSourceStudy { 7 8 abstract static class AQS { 9 /** 10 * 这个状态是有子类来维护的,AQS不会用这个状态做什么 11 */ 12 private volatile int state; 13 /** 14 * 队尾节点 15 */ 16 private volatile Node tail; 17 /** 18 * 可能情况 19 */ 20 private volatile Node head; 21 /** 22 * 独占线程 23 */ 24 private Thread exclusiveOwnerThread; 25 26 27 /** 28 * 由子类实现 29 * 判断当前线程是否需要排队 30 */ 31 abstract boolean tryAcquire(int i); 32 33 public int getState() { 34 return state; 35 } 36 37 public void setState(int state) { 38 this.state = state; 39 } 40 41 /** 42 * 主方法 43 * 可能的情况 44 * 当前状态可以直接运行 45 * 当前状态要放入队列里等待 46 * 状态->子类获取 47 * 过程,尽可能的不要去阻塞,循环多次,竞争多次 48 * 创建节点 49 * 节点入队,队尾 50 * 判断新节点的前一个节点的状态,更新,前一个节点,因为在入队的过程中每个节点的状态是动的 51 * 最后,阻塞当前线程 52 */ 53 public final void acquire(int arg) { 54 if (!tryAcquire(arg) && 55 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 56 // 中断状态传播 57 // 实时或者将来阻塞,抛中断异常 58 selfInterrupt(); 59 } 60 61 /** 62 * 当有新节点入队时,循环的把新节点关联到一个有效节点的后面 63 * 然后,阻塞这个节点的线程(当前线程) 64 */ 65 private boolean acquireQueued(Node node, int arg) { 66 boolean failed = true; 67 try { 68 boolean interrupted = false; 69 for (; ; ) { 70 final Node p = node.predecessor(); 71 // 新节点的前个节点是头结点,如果头结点的线程释放,新节点接可以直接执行 72 // 所有不要着急阻塞,在判断一次,头结点释放没有,如果头结点释放,新节点不阻塞,把新节点设为头结点 73 // 当新节点没有排队直接运行了,之后要将节点标记为无效 cancelAcquire 74 if (p == head && tryAcquire(arg)) { 75 // 想了很久这段代码发生的情况 76 // 这段代码发生的情况 77 // 1.node在入队列时,有不同的线程在获得了锁,且队列中没有节点 78 // 2.当执行到这里再次tryAcquire之前,之前释放了锁 79 // 3.这时hasQueuedPredecessors中的判断,头结点的后一个节点,是新建的这个节点,满足s.thread==Thread.currentThread(不考虑这时有其他线程进入,或者进入无效) 80 // 满足了tryAcquire返回true的情况 81 // 将头结点改为新节点 82 /**** 83 * head tail 84 * | | 85 * | | 86 * ---------- --------- 87 * nullNode newNode 88 * --------- ---------- 89 * next=newNode prev=nullNode 90 * prev=null next=null 91 * ------- ---------- 92 * 93 * 改完后 94 * 95 * head tail 96 * | | 97 * | | 98 * --------- --------- 99 * nullNode newNode 100 * --------- --------- 101 * next=newNode prev=nullNode 102 * prev=null next=null 103 * --------- ---------- 104 * */ 105 106 setHead(node); 107 p.next = null; 108 failed = false; 109 return interrupted; 110 } 111 // 之前的节点不是正在执行线程的节点,调整位置和状态再阻塞 112 // 在线程解除阻塞后,使者节点失效 113 if (shouldParkAfterFailedAcquire(p, node) && 114 parkAndCheckInterrupt()) 115 interrupted = true; 116 } 117 } finally { 118 if (failed) 119 // 节点解除阻塞后,可能是中断或者超时 120 // 非unlock的解锁 121 cancelAcquire(node); 122 } 123 } 124 125 private void cancelAcquire(Node node) { 126 if (node == null) 127 return; 128 node.thread = null; 129 Node pred = node.prev; 130 // 那个空的节点会保证终止 131 while (pred.waitStatus > 0) 132 // 将节点的prev关联到最近的有效节点 133 node.prev = pred = pred.prev; 134 Node predNext = pred.next; 135 // 任何情况都执行的 136 node.waitStatus = Node.CANCELLED; 137 138 // 如果取消的节点是队尾节点,并且将前节点设为队尾节点 139 if (node == tail && compareAndSetTail(node, pred)) { 140 // cancel的节点和cancel之前的无效节点会移出队列 141 compareAndSetNext(pred, predNext, null); 142 } else { 143 // 如果不是队尾节点 144 int ws; 145 if (pred != head && 146 ((ws = pred.waitStatus) == Node.SIGNAL || 147 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 148 pred.thread != null) { 149 Node next = node.next; 150 if (next != null && next.waitStatus <= 0) 151 // prev->node->next 改为 prev->next 152 compareAndSetNext(pred, predNext, next); 153 } else { 154 // 判断锁定的状态 155 // 如果前节点是头结点,或者不是SIGNAL状态并且无法设置为SIGNAL状态 156 // 总结,取消一个节点是,要保证这个节点能被释放,要不通过前节点通知,在锁锁,对应release 157 unparkSuccessor(node); 158 } 159 160 node.next = node; // help GC 161 } 162 } 163 164 private void unparkSuccessor(Node node) { 165 // 解锁节点的线程 166 // 当node时头节点时,是当前获取线程释放的炒作 167 // 不是偷节点 168 int ws = node.waitStatus; 169 if (ws < 0) 170 // 不用再去通知下个节点了,即将释放node了 171 compareAndSetWaitStatus(node, ws, 0); 172 Node s = node.next; 173 if (s == null || s.waitStatus > 0) { 174 s = null; 175 // 从队尾向前找到最前有效的节点 176 for (Node t = tail; t != null && t != node; t = t.prev) 177 if (t.waitStatus <= 0) 178 s = t; 179 } 180 if (s != null) 181 LockSupport.unpark(s.thread); 182 183 } 184 185 private void compareAndSetNext(Node pred, Node predNext, Object o) { 186 187 } 188 189 private boolean parkAndCheckInterrupt() { 190 // 阻塞 191 LockSupport.park(this); 192 // 当前前程标记中断 193 return Thread.interrupted(); 194 } 195 196 private boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 197 int ws = pred.waitStatus; 198 // 如果前节点是需要被通知的,前节点正在被阻塞,阻塞当先线程 199 if (ws == Node.SIGNAL) 200 return true; 201 // 如果前节点是无效的,找到最近的一个有效节点,并关联,返回,在外部调用方法中会再次调用这个方法 202 if (ws > 0) { 203 do { 204 node.prev = pred = pred.prev; 205 } while (pred.waitStatus > 0); 206 // 这是个切断调用链的过程 207 pred.next = node; 208 } else { 209 // 更新前节点的状态,释放时通知新节点 210 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 211 } 212 return false; 213 } 214 215 /** 216 * 创建节点 217 * 节点入队 218 * 219 * @return 新节点 220 */ 221 private Node addWaiter(Node mode) { 222 Node node = new Node(Thread.currentThread(), mode); 223 Node pred = tail; 224 // 之前有节点在队列中 225 if (pred != null) { 226 node.prev = pred; 227 // 直接修改队尾,不成功要进入接下类的循环,循环中也有类型的判断,这里添加会减少一些逻辑(这样说可能是理解的有偏差) 228 if (compareAndSetTail(pred, node)) { 229 pred.next = node; 230 return node; 231 } 232 } 233 enq(node); 234 return node; 235 } 236 237 /** 238 * 节点入队 239 * 循环,直到把新节点放到队尾,在多线程中这个过程是不确定的 240 */ 241 private Node enq(Node node) { 242 for (; ; ) { 243 Node t = tail; 244 // Must initialize 245 // 队尾没值,新节点是第一个入队的节点,创建一个空的节点,头尾都指向这个空节点 246 if (t == null) { 247 if (compareAndSetHead(new Node())) 248 tail = head; 249 } else { 250 node.prev = t; 251 if (compareAndSetTail(t, node)) { 252 t.next = node; 253 return t; 254 } 255 } 256 } 257 } 258 259 /** 260 * 字面理解,是否有已经排队的线程 261 * 实际意义,有重入锁的情况,在这里要考虑到 262 * 没有节点在排队的情况,头结点与未节点是相同的 263 * 判断重入,当前线程是头结点的线程. 264 */ 265 protected boolean hasQueuedPredecessors() { 266 Node t = tail; 267 Node h = head; 268 Node s; 269 //为什么是头结点的线程,而不是exclusiveOwnerThread,因为只有在 270 // 当前队列里没有值得时候才回设置独占线程,如果是通过节点释放的线 271 // 程还会和节点绑定,不会映射到exclusiveOwnerThread 272 return h != t && 273 ((s = h.next) == null || s.thread != Thread.currentThread()); 274 } 275 276 public final boolean release(int arg) { 277 if (tryRelease(arg)) { 278 Node h = head; 279 // 在独占锁的时候,waitStatus只能为0 -1 -2 -3 280 // 这个里不为0代表头节点是空节点 281 // 空节点不需要释放 282 // 头节点是释放锁的时候,最先被考虑的 283 if (h != null && h.waitStatus != 0) 284 unparkSuccessor(h); 285 return true; 286 } 287 return false; 288 } 289 290 protected abstract boolean tryRelease(int arg); 291 292 293 public void setHead(Node head) { 294 this.head = head; 295 } 296 297 private boolean compareAndSetHead(Node node) { 298 return (true || false); 299 } 300 301 private boolean compareAndSetTail(Node pred, Node node) { 302 return (true || false); 303 } 304 305 protected void selfInterrupt() { 306 Thread.currentThread().interrupt(); 307 } 308 309 310 /** 311 * CAS更新队列状态,CAS的问题在其他的机会介绍 312 */ 313 boolean compareAndSetState(int o, int n) { 314 return (false || true); 315 } 316 317 /** 318 * 独占线程标记改为指定线程 319 */ 320 void setExclusiveOwnerThread(Thread t) { 321 exclusiveOwnerThread = t; 322 } 323 324 /** 325 * 返回独占线程 326 */ 327 Thread getExclusiveOwnerThread() { 328 return exclusiveOwnerThread; 329 } 330 331 // 修改节点的状态 332 private boolean compareAndSetWaitStatus(Node pred, int ws, int signal) { 333 return (true || false); 334 } 335 336 static class Node { 337 338 public int waitStatus; 339 340 Node() { 341 } 342 343 /** 344 * @param thread 345 * @param mode SHARED or EXCLUSIVE 346 */ 347 Node(Thread thread, Node mode) { 348 this.thread = Thread.currentThread(); 349 this.mode = mode; 350 } 351 352 // 共享模式标记 353 static final Node SHARED = new Node(); 354 // 独占模式标记 355 static final Node EXCLUSIVE = null; 356 357 // 节点被取消,因为超时或者中断 358 static final int CANCELLED = 1; 359 // next被阻塞,当节点释放时,notice next 360 static final int SIGNAL = -1; 361 // 在条件队列中,等待某个条件被阻塞 362 static final int CONDITION = -2; 363 // 节点在共享模式下,可以传播锁 364 static final int PROPAGATE = -3; 365 366 volatile Node next; 367 volatile Node prev; 368 Node mode; 369 370 public Thread thread; 371 372 public Node predecessor() { 373 Node p = prev; 374 if (p == null) 375 throw new NullPointerException(); 376 else 377 return p; 378 } 379 } 380 381 382 } 383 384 /** 385 * 这是一个独占锁的实现,从ReentrantLock中粘贴出来的部分代码 386 */ 387 class SYC extends AQS { 388 389 public void lock() { 390 acquire(1); 391 } 392 393 public void unlock() { 394 release(1); 395 } 396 397 protected final boolean tryAcquire(int acquires) { 398 final Thread current = Thread.currentThread(); 399 int c = getState(); 400 // 如果当前的状态 401 if (c == 0) { 402 if (!hasQueuedPredecessors() && 403 compareAndSetState(0, acquires)) { 404 setExclusiveOwnerThread(current); 405 return true; 406 } 407 } else if (current == getExclusiveOwnerThread()) { 408 int nextc = c + acquires; 409 if (nextc < 0) 410 throw new Error("Maximum lock count exceeded"); 411 setState(nextc); 412 return true; 413 } 414 return false; 415 } 416 417 protected final boolean tryRelease(int releases) { 418 int c = getState() - releases; 419 if (Thread.currentThread() != getExclusiveOwnerThread()) 420 throw new IllegalMonitorStateException(); 421 boolean free = false; 422 if (c == 0) { 423 free = true; 424 setExclusiveOwnerThread(null); 425 } 426 setState(c); 427 return free; 428 } 429 430 431 } 432 }
基于ReentrantLock的AQS的源码分析(独占、非中断、不超时部分)
原文:http://www.cnblogs.com/yanbo2016/p/5266825.html