J.U.C
一,AbstractQueuedSynchronizer -AQS
1.设计
使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
利用一个int类型表示状态
在AQS类中有一个叫waitStatus的成员变量,基于AQS有一个同步组件叫ReentrantLock,在这个组件中status表示
获取锁的线程数。比如:status=0,表示还没有线程获取锁。status=0表示已经有线程获取了锁。status>1表示
重入锁的数量
使用方法是继承(是基于模板方法设计的,使用者必须继承这个AQS里的方法)
子类通过继承并通过实现他的方法管理其状态的方法操纵状态(acquire方法和release()方法)
可以同时实现排他锁和共享锁模式(站在使用者角度有两种功能:独占功能,共享功能)
2.实现思路
AQS内部维护了一个clh队列来管理锁,线程会首先尝试获取锁。如果失败,就将当前线程,以及等待状态等信息包成一个Node节点
加入到同步队列SyncQueue中。之后,会不断的循环尝试获取锁。条件是但前节点为head节点的后继才会尝试。如果失败,就会
阻塞自己。直到自己被唤醒。当持有锁的线程释放锁的时候,会唤醒队列中的后继线程
二,同步组件
CountDownLatch:闭锁,通过一个计数来保证线程是否需要一直阻塞
Semaphore:控制同一时间并发线程的数目
CyclicBarrier:可以阻塞线程
ReentrantLock:
Condition
FutureTask
/** * Created by yaming . * CountDownLatch使用场景1: * 等待一个线程全部执行完毕后,才开始执行另外一个线程 */ public class CountDownLatchExample1 { //请求总数 public static int clientTotal = 200; public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); //计数器, final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int threadNum = i; executorService.execute(()->{ try { test(threadNum); }catch (Exception e){ System.out.println("exception"); e.printStackTrace(); }finally { //闭锁,每执行一次test()操作,请求数就减一 countDownLatch.countDown(); } }); } //等待上面的线程都执行完毕,countDown的值减为0,然后才向下执行主线程 try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("finish"); //关闭线程池 executorService.shutdown(); } private static void test(int threadNum) throws Exception{ Thread.sleep(100); System.out.println(""+threadNum); Thread.sleep(100); } }
/** * Created by yaming . * CountDownLatch使用场景2: * 等待一个线程指定时间,超时就开始执行另外一个线程 */ public class CountDownLatchExample2 { //请求总数 public static int clientTotal = 200; public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); //计数器, final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int threadNum = i; executorService.execute(()->{ try { test(threadNum); }catch (Exception e){ e.printStackTrace(); }finally { //闭锁,每执行一次test()操作,请求数就减一 //放到finally代码块里 countDownLatch.countDown(); } }); } //只等待上面的线程指定时间,超时后直接向下执行主线程 try { countDownLatch.await(1, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("finish"); //关闭线程池 executorService.shutdown(); } private static void test(int threadNum) throws Exception{ Thread.sleep(20); System.out.println(""+threadNum); Thread.sleep(20); } }
/** * Created by yaming * 控制并发执行的线程个数 */ public class SemaphoreExample1 { //请求总数 public static int clientTotal = 20; public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); //计数器, final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < clientTotal; i++) { final int threadNum = i; executorService.execute(()->{ try { if(semaphore.tryAcquire()){ //尝试获取一个许可,如果获取不到,该线程就会被丢弃 test(threadNum); semaphore.release();//释放一个许可 } // semaphore.acquire();//获取一个许可 // test(threadNum); // semaphore.release();//释放一个许可 }catch (Exception e){ e.printStackTrace(); } }); } //关闭线程池 executorService.shutdown(); } private static void test(int threadNum) throws Exception{ System.out.println(""+threadNum); Thread.sleep(1000); } }
三,区别
CountDownLatch:描述的是一个或n个线程等待其他线程的关系。CountDownLatch的计数器只能使用一次
CyclicBarrier:描述的是多个线程相互等待的,只有所有的线程都满足条件后,才继续执行下一个操作。CyclicBarrier计数器可以
重复使用(使用reset()方法重置)
四,ReentrantLock与锁
1.锁的分类:
synchronized关键字修饰的锁
J.U.C里提供的锁,(ReentrantLock)
2.区别:
都具有可重入性。(同一个线程进入一次,锁的计数器就自增1,当锁的计数器下降为0时,才释放锁)
锁的实现。synchronized是基于JVM实现的(操作系统来实现),ReentrantLock是jdk实现的(用户自己实现)
性能的区别。优化的的synchronized的性能和ReentrantLock差不多
功能的区别。synchronized使用比较方便简洁,是由编译器保证加锁的锁的释放。ReentrantLock需要手动声明加锁和释放锁
在锁的细粒度和灵活度上,ReentrantLock具有优势
3.ReentrantLock独有的功能
可以指定是公平锁还是非公平锁(是可以选择的)。synchronized是非公平锁(不能指定)。(公平锁:先等待的线程先获得锁)
提供了一个Condition类,可以分组唤醒需要唤醒的线程。而synchronized要么随机唤醒一个线程,要么唤醒所有线程
提供了能够中断等待锁的线程的机制,lock.lockInterruptibly();
4.总结:
ReentrantLock实现的是一种自旋锁,循环调用CAS操作来加锁,他的性能比较好也是因为避免了使线程进入内核态的阻塞状态。
想尽办法避免进入线程内核的阻塞状态,是我们分析和理解锁设计的关键
5.什么情况下使用ReentrantLock?
当我们需要使用ReentrantLock独有的功能的时候,就用ReentrantLock
6.源码:
private final static Lock lock = new ReentrantLock();
如何实现:
默认是不公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
我们还可以指定使用公平锁还是非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
tryLock()方法:本质作用是仅在调用时锁定未被另一个线程保持的情况下,才获取锁定
//synchronized实现锁 public class LockExample1 { //请求总数 public static int clientTotal = 5000; //同时并发执行的线程数 public static int threadTotal = 200; //全局变量 public static int count = 0; public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); //信号灯,同时允许执行的线程数 final Semaphore semaphore = new Semaphore(threadTotal); //计数器, final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { executorService.execute(()->{ try { //获取信号灯,当并发达到一定数量后,该方法会阻塞而不能向下执行 semaphore.acquire(); add(); //释放信号灯 semaphore.release(); }catch (InterruptedException e){ System.out.println("exception"); e.printStackTrace(); } //闭锁,每执行一次add()操作,请求数就减一 countDownLatch.countDown(); }); } //等待上面的线程都执行完毕,countDown的值减为0,然后才向下执行主线程 try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } //打印count的值 System.out.println("count:"+count); //关闭线程池 executorService.shutdown(); } private synchronized static void add(){ count++; } }
//ReentrantLock来加锁 public class LockExample2 { //请求总数 public static int clientTotal = 5000; //同时并发执行的线程数 public static int threadTotal = 200; //全局变量 public static int count = 0; private final static Lock lock = new ReentrantLock(); public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); //信号灯,同时允许执行的线程数 final Semaphore semaphore = new Semaphore(threadTotal); //计数器, final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { executorService.execute(()->{ try { //获取信号灯,当并发达到一定数量后,该方法会阻塞而不能向下执行 semaphore.acquire(); add(); //释放信号灯 semaphore.release(); }catch (InterruptedException e){ System.out.println("exception"); e.printStackTrace(); } //闭锁,每执行一次add()操作,请求数就减一 countDownLatch.countDown(); }); } //等待上面的线程都执行完毕,countDown的值减为0,然后才向下执行主线程 try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } //打印count的值 System.out.println("count:"+count); //关闭线程池 executorService.shutdown(); } private static void add(){ try { lock.lock(); count++; }finally { //放在finally里保证锁一定能释放 lock.unlock(); } } }
五,ReentrantReadWriteLock
在没有任何读读写锁的时候,才可以取得写入锁.
该类里有两个锁:读锁和写锁
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
可以并发读,只能同步写。当读的操作远大于写的操作数量时,会使写线程遭遇饥饿,写操作无法获取锁
//ReentrantReadWriteLock来加锁 public class LockExample3 { private final Map<String,Data> map = new TreeMap<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock readLock = lock.readLock(); private final Lock writeLock = lock.writeLock(); public Data get(String key){ readLock.lock(); try { return map.get(key); }finally { readLock.unlock(); } } public Data put(String key,Data data){ writeLock.lock(); try { return map.put(key,data); }finally { writeLock.unlock(); } } public Set<String> getAllKeys(){ readLock.lock(); try { return map.keySet(); }finally { readLock.unlock(); } } class Data{} }
六,StampedLock
StampedLock控制锁有三种模式。分别是写,读,乐观读。重点是在乐观读上
一个StampedLock的状态是由版本和模式两个部分组成
//StampedLock的官方例子 public class LockExample4 { class Point { private double x, y; private final StampedLock sl = new StampedLock(); void move(double deltaX, double deltaY) { // an exclusively locked method long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } //下面看看乐观读锁案例 double distanceFromOrigin() { // A read-only method long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁 double currentX = x, currentY = y; //将两个字段读入本地局部变量 if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生? stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁 try { currentX = x; // 将两个字段读入本地局部变量 currentY = y; // 将两个字段读入本地局部变量 } finally { sl.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } //下面是悲观读锁案例 void moveIfAtOrigin(double newX, double newY) { // upgrade // Could instead start with optimistic, not read mode long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合 long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁 if (ws != 0L) { //这是确认转为写锁是否成功 stamp = ws; //如果成功 替换票据 x = newX; //进行状态改变 y = newY; //进行状态改变 break; } else { //如果不能成功转换为写锁 sl.unlockRead(stamp); //我们显式释放读锁 stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试 } } } finally { sl.unlock(stamp); //释放读锁或写锁 } } } }
//StampedLock的例子 public class LockExample5 { //请求总数 public static int clientTotal = 5000; //同时并发执行的线程数 public static int threadTotal = 200; //全局变量 public static int count = 0; private final static StampedLock lock = new StampedLock(); public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); //信号灯,同时允许执行的线程数 final Semaphore semaphore = new Semaphore(threadTotal); //计数器, final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { executorService.execute(()->{ try { //获取信号灯,当并发达到一定数量后,该方法会阻塞而不能向下执行 semaphore.acquire(); add(); //释放信号灯 semaphore.release(); }catch (InterruptedException e){ System.out.println("exception"); e.printStackTrace(); } //闭锁,每执行一次add()操作,请求数就减一 countDownLatch.countDown(); }); } //等待上面的线程都执行完毕,countDown的值减为0,然后才向下执行主线程 try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } //打印count的值 System.out.println("count:"+count); //关闭线程池 executorService.shutdown(); } private static void add(){ long stamp = lock.writeLock(); try { count++; }finally { //放在finally里保证锁一定能释放 lock.unlock(stamp); } } }
七,总结:
当只有少量的竞争者时,synchronized是一个不错的选择,(synchronized不会引发死锁,jvm会自动解锁)
竞争者不少,但是线程增长的趋势我们是能够预估的,这时可以选择ReentrantLock
八,Condition
//Condition的使用 public class LockExample6 { public static void main(String[] args) { ReentrantLock reentrantLock = new ReentrantLock(); Condition condition = reentrantLock.newCondition(); new Thread(() -> { try { reentrantLock.lock(); System.out.println("wait signal"); // 1 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("get signal"); // 4 reentrantLock.unlock(); }).start(); new Thread(() -> { reentrantLock.lock(); System.out.println("get lock"); // 2 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } condition.signalAll(); System.out.println("send signal ~ "); // 3 reentrantLock.unlock(); }).start(); } }
原文:https://www.cnblogs.com/inspred/p/9520973.html