首页 > 编程语言 > 详细

多线程(五)Lock

时间:2021-02-16 23:31:10      阅读:36      评论:0      收藏:0      [点我收藏+]

ReentrantLock VS ReentrantReadWriteLock VS StampedLock

特性 是否支持重入 是否支持锁升级 是否支持Condition 适合场景
ReentrantLock 独占可重入 纯写入
ReentrantReadWriteLock 非独占可重读,读写锁,悲观锁 读写均衡
StampedLock 非独占不可重入,多模式锁,乐观锁 读多写少

 

ReentrantLock 

  可重入锁,顾名思义,这个锁可以被线程多次重复进入进行获取操作。ReentantLock继承接口Lock并实现了接口中定义的方法,除了能完成synchronized所能完成的所有工作外,还提供了诸如可响应中断锁、可轮询锁请求、定时锁等避免多线程死锁的方法。
  Lock实现的机理依赖于特殊的CPU指定,可以认为不受JVM的约束,并可以通过其他语言平台来完成底层的实现。在并发量较小的多线程应用程序中,ReentrantLock与synchronized性能相差无几,但在高并发量的条件下,synchronized性能会迅速下降几十倍,而ReentrantLock的性能却能依然维持一个水准。
  ReentrantLock引入两个概念:公平锁与非公平锁。公平锁指的是锁的分配机制是公平的,通常先对锁提出获取请求的线程会先被分配到锁。反之,JVM按随机、就近原则分配锁的机制则称为不公平锁。
  ReentrantLock通过方法lock()与unlock()来进行加锁与解锁操作,与synchronized会被JVM自动解锁机制不同,ReentrantLock加锁后需要手动进行解锁。为了避免程序出现异常而无法正常解锁的情况,使用ReentrantLock必须在finally控制块中进行解锁操作。

  synchronized与wait()和nitofy()/notifyAll()方法相结合可以实现等待/通知模型,ReentrantLock同样可以,但是需要借助Condition,且Condition有更好的灵活性,具体体现在:

  • 一个Lock里面可以创建多个Condition实例,实现多路通知
  • notify()方法进行通知时,被通知的线程时Java虚拟机随机选择的,但是ReentrantLock结合Condition可以实现有选择性地通知

  ReentrantLock实例

技术分享图片
public class TestReentrantLock {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();

    public static void main(String[] args) throws InterruptedException {
        TestReentrantLock rl = new TestReentrantLock();

        new Thread() {
            @Override
            public void run() {
                for (Integer i = 0; i < 5; i++) {
                    rl.getTask();
                }
            }
        }.start();
        Thread.sleep(100);
        new Thread() {
            @Override
            public void run() {
                for (Integer i = 0; i < 5; i++) {
                    rl.addTask(Math.random() + "");
                }
            }
        }.start();
    }

    public void addTask(String s) {
        lock.lock();
        try {
            queue.add(s);
            System.out.println("addTask:" + s);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public String getTask() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            String result = queue.remove();
            System.out.println("getTask:" + result);
            return result;
        } finally {
            lock.unlock();
        }
    }
}
View Code

执行结果(虽然getTask的线程先执行,但是由于await方法阻塞线程,等待被唤醒):

addTask:0.1532530140339844
addTask:0.9855533133771119
getTask:0.1532530140339844
getTask:0.9855533133771119
addTask:0.6634186213426154
addTask:0.24053864333002573
getTask:0.6634186213426154
getTask:0.24053864333002573
addTask:0.2089403771816799
getTask:0.2089403771816799

  ReentrantLock源码分析

技术分享图片
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * 尝试获取非公平锁
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current); // 占用锁成功,设置独占线程为当前线程
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {//当前线程已占用该锁
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);// 更新state值为新的重入次数
                return true;
            }
            return false;
        }

     //释放当前线程占用的锁
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
        //如果不是当前线程占用锁,则抛出异常 
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);//清空独占线程
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    /**
     * Sync object for non-fair locks 非公平锁
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))  //cas设置state状态,如果原值为0,置为1
                setExclusiveOwnerThread(Thread.currentThread());
            else  
                acquire(1); // 调用的是tryAcquire方法
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1); //调用的是tryAcquire
        }

        /**
         * Fair version of tryAcquire.  Don‘t grant access unless recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //相比非公平锁,增加了同步队列中当前节点是否有前驱节点的判断
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    /**
     * Creates an instance of {@code ReentrantLock}. This is equivalent to using {@code ReentrantLock(false)}.  默认是非公平锁
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the given fairness policy.  参数true是公平锁
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    /**
     * Acquires the lock.
     */
    public void lock() {
        sync.lock();
    }

    /**
     * Acquires the lock unless the current thread is {@linkplain Thread#interrupt interrupted}.
     * @throws InterruptedException if the current thread is interrupted
     */
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /**
     * Acquires the lock only if it is not held by another thread at the time of invocation.
     */
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

    /**
     * Acquires the lock if it is not held by another thread within the given waiting time and the current thread has not been
     * {@linkplain Thread#interrupt interrupted}.
     */
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    /**
     * Attempts to release this lock.
     * @throws IllegalMonitorStateException if the current thread does not hold this lock
     */
    public void unlock() {
        sync.release(1);
    }

    /**
     * Returns a {@link Condition} instance for use with this {@link Lock} instance.
     * @return the Condition object
     */
    public Condition newCondition() {
        return sync.newCondition();
    }

    /**
     * Queries the number of holds on this lock by the current thread.
     * @return the number of holds on this lock by the current thread,or zero if this lock is not held by the current thread
     */
    public int getHoldCount() {
        return sync.getHoldCount();
    }

    /**
     * Queries if this lock is held by the current thread.
     * @return {@code true} if current thread holds this lock and {@code false} otherwise
     */
    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    /**
     * Queries if this lock is held by any thread. This method is designed for use in monitoring of the system state,not for synchronization control.
     * @return {@code true} if any thread holds this lock and {@code false} otherwise
     */
    public boolean isLocked() {
        return sync.isLocked();
    }

    /**
     * Returns {@code true} if this lock has fairness set true.
     * @return {@code true} if this lock has fairness set true
     */
    public final boolean isFair() {
        return sync instanceof FairSync;
    }

    /**
     * Returns the thread that currently owns this lock, or {@code null} if not owned. 
     * @return the owner, or {@code null} if not owned
     */
    protected Thread getOwner() {
        return sync.getOwner();
    }

    /**
     * Queries whether any threads are waiting to acquire this lock.查询是否有线程正在等待获取此锁
     * @return {@code true} if there may be other threads waiting to acquire the lock
     */
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * Queries whether the given thread is waiting to acquire this lock. 查询是否有指定线程正在获取此锁
     * @param thread the thread
     * @return {@code true} if the given thread is queued waiting for this lock
     * @throws NullPointerException if the thread is null
     */
    public final boolean hasQueuedThread(Thread thread) {
        return sync.isQueued(thread);
    }

    /**
     * Returns an estimate of the number of threads waiting to acquire this lock. 
     * @return the estimated number of threads waiting for this lock
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    /**
     * Returns a collection containing threads that may be waiting to acquire this lock. 
     * @return the collection of threads
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    /**
     * Queries whether any threads are waiting on the given condition associated with this lock. 
     * @param condition the condition
     */
    public boolean hasWaiters(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    /**
     * Returns an estimate of the number of threads waiting on the given condition associated with this lock.
     */
    public int getWaitQueueLength(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    /**
     * Returns a collection containing those threads that may be waiting on the given condition associated with this lock.
     */
    protected Collection<Thread> getWaitingThreads(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    /**
     * Returns a string identifying this lock, as well as its lock state.*/
    public String toString() {
        Thread o = sync.getOwner();
        return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]");
    }
}
View Code

总结:

  • 每一个ReentrantLock自身维护一个AQS队列记录申请锁的线程信息;
  • 通过大量CAS保证多个线程竞争锁的时候的并发安全;
  • 可重入的功能是通过维护state变量来记录重入次数实现的。
  • 公平锁需要维护队列,通过AQS队列的先后顺序获取锁,缺点是会造成大量线程上下文切换;
  • 非公平锁可以直接抢占,所以效率更高;
  • AbstractQueuedSynchronizer简称AQS,是一个用于构建锁和同步容器的框架。AQS解决了在实现同步容器时设计的大量细节问题。AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus。AQS中还有一个表示状态的字段state,例如ReentrantLocky用它表示线程重入锁的次数,Semaphore用它表示剩余的许可数量,FutureTask用它表示任务的状态。对state变量值的更新都采用CAS操作保证更新操作的原子性。

技术分享图片

 

ReadWriteLock

  ReentrantLock保证了只有一个线程可以执行临界区代码,但是有些时候,这种保护有点过头。任何时刻,只允许一个线程修改,也就是调用inc()方法是必须获取锁,但是,get()方法只读取数据,不修改数据,它实际上允许多个线程同时调用。使用ReadWriteLock可以解决这个问题。使用ReadWriteLock时,适用条件是同一个数据,有大量线程读取,但仅有少数线程修改

  ReadWriteLock实例

技术分享图片
public class TestReadWriteLock {
    public static void main(String[] args) {
        PricesInfo pricesInfo = new PricesInfo();
        for (Integer i = 0; i < 3; i++) {
            new Thread(new Reader(pricesInfo), "ReadLock" + i).start();
        }
        new Thread(new Writer(pricesInfo), "WriterLock").start();
    }

}

class PricesInfo {
    private double price;

    private ReadWriteLock lock;

    public PricesInfo() {
        this.price = 1.0;
        this.lock = new ReentrantReadWriteLock();
    }

    public double getPrice() {
        lock.readLock().lock();
        System.out.printf("%s : Price 开始读了!\n", Thread.currentThread().getName());
        double value = price;
        System.out.printf("%s : Price 读取完毕 : %f\n", Thread.currentThread().getName(), value);
        lock.readLock().unlock();
        return value;
    }

    public void setPrices(double price) {
        lock.writeLock().lock();
        System.out.printf("Writer:Attempt to modify the price.\n");
        this.price = price;
        for (Integer i = 0; i < Integer.MAX_VALUE; i++) {

        }
        System.out.printf("Writer:Prices have been modified.%s \n", price);
        lock.writeLock().unlock();

    }

}

class Reader implements Runnable {
    private PricesInfo priceInfo;

    public Reader(PricesInfo priceInfo) {
        this.priceInfo = priceInfo;
    }

    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            priceInfo.getPrice();
            try {
                Thread.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Writer implements Runnable {
    private PricesInfo pricesInfo;

    public Writer(PricesInfo pricesInfo) {
        this.pricesInfo = pricesInfo;
    }

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            pricesInfo.setPrices((i + 1) * 10);
            try {
                Thread.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
View Code

执行结果如下:

ReadLock1 : Price 开始读了!
ReadLock1 : Price 读取完毕 : 1.000000
ReadLock2 : Price 开始读了!
ReadLock0 : Price 开始读了!
ReadLock0 : Price 读取完毕 : 1.000000
ReadLock2 : Price 读取完毕 : 1.000000
Writer:Attempt to modify the price.
Writer:Prices have been modified.10.0 
ReadLock1 : Price 开始读了!
ReadLock1 : Price 读取完毕 : 10.000000
ReadLock0 : Price 开始读了!
ReadLock0 : Price 读取完毕 : 10.000000
ReadLock2 : Price 开始读了!
ReadLock2 : Price 读取完毕 : 10.000000
Writer:Attempt to modify the price.
Writer:Prices have been modified.20.0 
ReadLock2 : Price 开始读了!
ReadLock1 : Price 开始读了!
ReadLock1 : Price 读取完毕 : 20.000000
ReadLock0 : Price 开始读了!
ReadLock0 : Price 读取完毕 : 20.000000
ReadLock2 : Price 读取完毕 : 20.000000
ReadLock2 : Price 开始读了!
ReadLock2 : Price 读取完毕 : 20.000000
Writer:Attempt to modify the price.
Writer:Prices have been modified.30.0 
ReadLock0 : Price 开始读了!
ReadLock1 : Price 开始读了!
ReadLock1 : Price 读取完毕 : 30.000000
ReadLock2 : Price 开始读了!
ReadLock2 : Price 读取完毕 : 30.000000
ReadLock0 : Price 读取完毕 : 30.000000
ReadLock1 : Price 开始读了!
ReadLock1 : Price 读取完毕 : 30.000000
ReadLock0 : Price 开始读了!
ReadLock0 : Price 读取完毕 : 30.000000

总结:

  • Java并发库中ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性
  • ReetrantReadWriteLock读写锁的效率明显高于synchronized关键字
  • ReetrantReadWriteLock读写锁的实现中,读锁使用共享模式;写锁使用独占模式,换句话说,读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的
  • ReetrantReadWriteLock读写锁的实现中,需要注意的,当有读锁时,写锁就不能获得;而当有写锁时,除了获得写锁的这个线程可以获得读锁外,其他线程不能获得读锁

 

StampedLock

  ReadWriteLock可以解决多线程同时读,但只有一个线程能写的问题。但是它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLockStampedLockReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。StampedLock是不可重入锁。

  StampedLock实例

技术分享图片
public class TestStampedLock {
    private double x, y;
    
    private final StampedLock stampedLock = new StampedLock();
    
    //写锁的使用
    void move(double deltaX, double deltaY){
        long stamp = stampedLock.writeLock(); //获取写锁
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            stampedLock.unlockWrite(stamp); //释放写锁
        }
    }
    
    //乐观读锁的使用。乐观读锁获取失败则获取悲观读锁
    double distanceFromOrigin() {
        
        long stamp = stampedLock.tryOptimisticRead(); //获得一个乐观读锁
        double currentX = x;
        double currentY = y;
        if (!stampedLock.validate(stamp)) { //检查乐观读锁后是否有其他写锁发生,有则返回false
            
            stamp = stampedLock.readLock(); //获取一个悲观读锁
            
            try {
                currentX = x;
            } finally {
                stampedLock.unlockRead(stamp); //释放悲观读锁
            }
        } 
        return Math.sqrt(currentX*currentX + currentY*currentY);
    }
    
    //悲观读锁以及读锁升级写锁的使用
    void moveIfAtOrigin(double newX,double newY) {
        
        long stamp = stampedLock.readLock(); //悲观读锁
        try {
            while (x == 0.0 && y == 0.0) {
                long ws = stampedLock.tryConvertToWriteLock(stamp); //读锁转换为写锁
                if (ws != 0L) { //转换成功
                    
                    stamp = ws; //票据更新
                    x = newX;
                    y = newY;
                    break;
                } else {
                    stampedLock.unlockRead(stamp); //转换失败释放读锁
                    stamp = stampedLock.writeLock(); //强制获取写锁
                }
            }
        } finally {
            stampedLock.unlock(stamp); //释放所有锁
        }
    }
}
View Code

总结:

  1. 所有获取锁的方法,都返回一个邮戳(stamp),stamp为0表示获取失败,其余都表示成功;
  2. 所有释放锁的方法,都需要一个邮戳(stamp),这个stamp必须是和成功获取锁时得到的stamp一致;
  3. StampedLock是不可重入的;(如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁)
  4. StampedLock有三种访问模式:
    • Reading(读模式):功能和ReentrantReadWriteLock的读锁类似
    • Writing(写模式):功能和ReentrantReadWriteLock的写锁类似
    • Optimistic reading(乐观读模式):这是一种优化的读模式。
  5. StampedLock支持读锁和写锁的相互转换
    我们知道RRW中,当线程获取到写锁后,可以降级为读锁,但是读锁是不能直接升级为写锁的。
    StampedLock提供了读锁和写锁相互转换的功能,使得该类支持更多的应用场景。
  6. 无论写锁还是读锁,都不支持Conditon等待


多线程(五)Lock

原文:https://www.cnblogs.com/ryjJava/p/14398794.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!