/** * @description 队列同步器,利用原子整形模仿AQS,非公平锁(简单自适应自旋) * @since 2020/2/4 */ public class QueueSynchronizer { private AtomicInteger state=new AtomicInteger(0);//0为可用,1为被用,2为已经重入一次,依此 private Thread onwThread; private ConcurrentLinkedQueue<Thread> concurrentLinkedQueue=new ConcurrentLinkedQueue();//阻塞队列 //AQS队列中的头的thread永远为null,这里与AQS不一致,ConcurrentLinkedQueue不允许空值 //线程私有变量 private ThreadLocal<Integer> spinCount=new ThreadLocal<Integer>();//线程竞争失败后的自旋次数 private ThreadLocal<Integer> spinIncrement=new ThreadLocal<Integer>();//自旋成功后的自旋增量 private ThreadLocal<Integer> spinDecrement=new ThreadLocal<Integer>();//自旋成功后的自旋减量 private int sc; private int si; private int sd; //指定默认的自旋次数,成功增量,失败减量 public QueueSynchronizer(){ sc=4000; si=900; sd=700; } //指定初始的自旋次数,成功增量,失败减量 public QueueSynchronizer(int spinCount, int spinIncrement, int spinDecrement){ sc=spinCount; si=spinIncrement; sd=spinDecrement; } /** * @description: 获取锁,有以下情况:(非公共平模式,不判断队伍存在与否) * 1.当前锁为自由状态,如果cas成功,那么线程成功持有,否则失败 * 2.当前锁被人持有,如果持有人正是当前线程,那么获取成功,线程重入,否则失败 * @param * @return: boolean 获取成功为true,失败为false * @date: 2020/2/5 */ public boolean aquire(){ if(state.get()==0){//这里不直接使用cas,是为了减少cas的调用,因为这是一条cpu指令,耗费资源较大(能省则省) if(state.compareAndSet(0,1)) { onwThread=Thread.currentThread(); return true; }else return false; }else if(onwThread==Thread.currentThread()) { state.incrementAndGet();//重入 return true; }else return false; } /** * @description: 自旋,如果成功,那么设置当前线程,增加下次自旋次数,否则减少下次自旋次数 * @param * @return: boolean 自旋成功为true,失败为false * @date: 2020/2/5 */ public boolean spin(){ int temp=spinCount.get();//缓存本次自旋次数 while(temp>0) { temp--; if(state.compareAndSet(0,1)) {//自旋成功,设置当前线程,增加下次自旋次数 onwThread=Thread.currentThread(); spinCount.set(spinCount.get()+spinIncrement.get()); return true; } } spinCount.set(Math.max(spinCount.get()-spinDecrement.get(),0));//自旋失败,减少下次自旋次数。 return false; } /** * @description: 并发入队,在入队前做最后一次询问,若成功入队,线程设置中断后,park, * 醒来之后中断取反,再去尝试获取锁 * @param * @return: void * @date: 2020/2/5 */ public void inQueue(){ if(!aquire()){//入队前的最后一次询问 concurrentLinkedQueue.add(Thread.currentThread()); System.out.println(Thread.currentThread().getName()+"入队等候"); Thread.currentThread().interrupt();//设置中断 LockSupport.park(); System.out.println(Thread.currentThread().getName()+"被唤醒"); Thread.currentThread().interrupt();//再次中断取反 tryAcquire();//醒来之后再去尝试 }else System.out.println(Thread.currentThread().getName()+"入队前最后一次获取成功"); } /** * @description: state-1,如果此时state=0而且队列不为空,则唤醒队头,让其重新竞争 * @param * @return: void * @date: 2020/2/5 */ public void release() throws RuntimeException{ if(onwThread==Thread.currentThread()) { //这里先减-1再去唤醒,此时若有新线程进来则刚好非公平地获取到 if(state.decrementAndGet()==0&&!concurrentLinkedQueue.isEmpty()) { Thread thread=concurrentLinkedQueue.poll(); LockSupport.unpark(thread); } if(state.get()==0) System.out.println(Thread.currentThread().getName()+"释放了锁"); else System.out.println(Thread.currentThread().getName()+"还要释放了"+state.get()+"次"); } else throw new RuntimeException("非法释放,当前线程并非锁的拥有者!"); } /** * @description: 线程首次Lock前调用,初始化个线程的自旋次数,增量,减量 * @param * @return: void * @date: 2020/2/5 */ public void init(){ this.spinCount.set(sc); this.spinIncrement.set(si); this.spinDecrement.set(sd); } /** * @description: 尝试获取锁,失败后尝试自旋获取,若仍然失败则进入阻塞队列,park,等待别人用完后来unpark, * 然后再次尝试获取锁。 * @param * @return: void * @date: 2020/2/5 */ public void tryAcquire(){ //if(!aquire()&&!spin()) inQueue(); if(aquire()) { if(state.get()==1) System.out.println(Thread.currentThread().getName()+"获取成功"); else System.out.println(Thread.currentThread().getName()+"获取成功,重入了"+(state.get()-1)+"次"); } else{ System.out.println(Thread.currentThread().getName()+"获取失败,自旋"+spinCount.get()+"次"); if(spinCount.get()>0&&spin()) System.out.println(Thread.currentThread().getName()+"自旋成功"); else{ System.out.println(Thread.currentThread().getName()+"自旋失败,尝试进入队列"); inQueue(); } } } }
public class MyLock implements Lock{ private QueueSynchronizer queueSynchronizer =new QueueSynchronizer(); public void init() { queueSynchronizer.init();} @Override public void lock() { queueSynchronizer.tryAcquire(); } @Override public void unlock() { queueSynchronizer.release(); } }
public interface Lock { void lock(); void unlock(); }
public class MyLockTest { private static int sum=100; static MyLock myLock = new MyLock(); public static void main(String[] args) { for (int j = 0; j < 10; j++) { new Thread(()->{ myLock.init();//各线程初次使用时先初始化 myLock.lock(); myLock.lock(); for (int i = 0; i < 10; i++) { System.out.println(--sum+" "+Thread.currentThread().getName()+"one"); } myLock.unlock(); for (int i = 0; i < 10; i++) { System.out.println(--sum+" "+Thread.currentThread().getName()+"two"); } myLock.unlock(); }).start(); } } }
利用Atomic, ThreadLocal, 模仿AQS, ReentrantLock
原文:https://www.cnblogs.com/AllenDuke/p/12272605.html