首页 > 其他 > 详细

redis实现分布式锁需要考虑的因素

时间:2019-12-18 15:55:38      阅读:131      评论:0      收藏:0      [点我收藏+]

死锁

错误例子

技术分享图片

 

 

 

解决方式

 防止死锁 通过设置超时时间
 不要使用setnx key   expire 20  不能保证原子性 如果setnx程序就挂了 没有执行expire就死锁了
 reidis2.8版本提供 set lock:key1 true ex 5 nx 方式 保证了  setnx+expire原子性方式执行(秒为单位)

锁超时

错误例子

        String lockKey="stock:product:1";
        boolean  isGetLock=false;
        try{
            //假设是原子性的 获取锁并设置锁10秒
            isGetLock==setnx(lockKey,10);
            if(!isGetLock){
                throw  new Exception("系统繁忙!请稍后再试");
            }
//模拟需要执行12秒 Thread.sleep(
12); }finally { if(isGetLock){ del(lockKey); } }

假设有线程A线程B 2个线程

线程A率先拿到锁因为我们设置的锁10秒自动释放(redis过期时间10秒) 而我们程序需要执行10秒以上

10.1ms秒的时候线程B进来 因为redis锁key已经过期成功拿到锁 并阻塞在12秒处

12秒后线程A 执行完 执行del操作 导致释放了线程B的锁

解决方式1

 String lockKey="stock:product:1";
        boolean  isGetLock=false;
        //用来标识当前身份
        String currentIndex=UUID.randomUUID().toString();
        try{
            //假设是原子性的 获取锁并设置锁10秒 同时设置一个值为currentIndex
            isGetLock==setnx(lockKey,currentIndex,10);
            if(!isGetLock){
                throw  new Exception("系统繁忙!请稍后再试");
            }
            //模拟需要执行12秒
            Thread.sleep(12);
        }finally {
            if(isGetLock){
                String lockValue=get(lockKey);
                //表示是当前线程的锁 释放
                if(lockValue!=null&&lockValue.equals(currentIndex)) {
                    del(lockKey);
                }
            }
        }

方式1优化方案

简单一看 好像并没有什么问题 但是需要注意 get 比较 和del并不是原子性的

比如 线程A get完之后 lockkey因为超时释放   线程B 成功获得锁    线程A再执行if判断 会删除调线程B的锁

改为lua脚本 

if redis.call("get",KEYS[1]==ARGV[1])  then
return redis.call("del","KEYS1")
else
return 0
end

主从切换

线程A从主节点加锁成功  这个时候主节点挂掉,从节点替换主节点 锁数据并没有同步过来 导致2个线程会获得锁  只会在 挂掉时 从节点还未同步时导致这样的情况 极少情况发生 不过一般业务场景都能接受
 

可重入锁实现

/**
 * @Auther: liqiang
 * @Date: 2019/7/14 14:59
 * @Description:
 */
public class RedisWithReentrantLock {
    private ThreadLocal<Map<String,Integer>> lockers=new ThreadLocal<>();
    private Jedis jedis;
    public  RedisWithReentrantLock(Jedis jedis){
        this.jedis=jedis;
    }
    /**
     * 加锁
     */
    private boolean _lock(String key){
        String value=String.valueOf(System.currentTimeMillis());;
        return jedis.set(key,value,"nx","ex",5L)!=null;
    }
    /**
     * 释放锁
     * @param key
     */
    private void  _unlock(String key){
        jedis.del(key);
    }

    /**
     * 从线程缓存获取map 没有就初始化一个
     * @return
     */
    private  Map<String,Integer> currentLockers(){
        Map<String,Integer> refs=lockers.get();
        if(refs==null){
            refs=new HashMap<String,Integer>();
            lockers.set(refs);
        }
        return lockers.get();
    }

    /**
     * 可重入锁
     * @param key
     * @return
     */
    public boolean lock(String key){
        /**
         * 选择map的原因是 一个线程里面可能有很多加锁的地方
         */
        Map<String,Integer> lockers=currentLockers();
        /**
         *如果存在 表示是重入加锁
         */
        if(lockers.containsKey(key)){
            lockers.put(key,lockers.get(key)+1);
            //延长过期时间
            jedis.expire(key,5000);
            return true;
        }
        //走到这里表示是头部第一次加锁 加锁并对应map数量+1
        boolean isGetLock=_lock(key);
        lockers.put(key,1);
        return  isGetLock;
    }

    /**
     * 释放锁
     * @param key
     * @return
     */
    public boolean unLock(String key){
        /**
         * 获得map
         */
        Map<String,Integer> lockers=currentLockers();
        /**
         * 表示key未加过锁 或者释放了
         */
        Integer refCnt=lockers.get(key);
        if(refCnt==null){
            return false;
        }
        //-1
        refCnt-=1;
        //大于0表示不是头部锁释放
        if(refCnt>0){
            lockers.put(key,refCnt);
        }else{
            //小于等于0 表示是头部锁释放 删除mapkey
            lockers.remove(key);
            /**
             * 释放锁
             */
            _unlock(key);
        }
        return true;
    }
    public static void main(String[] args) {
        Jedis conn = new Jedis("127.0.0.1",6379);
        conn.select(1);
        RedisWithReentrantLock redisWithReentrantLock=new RedisWithReentrantLock(conn);
        String lockKey="lock:key3";
        redisWithReentrantLock.lock(lockKey);
        redisWithReentrantLock.lock(lockKey);

        redisWithReentrantLock.unLock(lockKey);
        redisWithReentrantLock.unLock(lockKey);
    }
}

一些建议

建议涉及并发的地方能用原子性操作就用原子性

例子一

       tock stock=stockDao.get(id);
        if(stock.getNumber()-10<0){
            throw new Exception("库存不足");
        }
        stock.setNumber(stock.getNumber-10);
        stockDao.update(stock);

这种情况就算加锁的情况 如果出现上面说的几种极端情况 或者锁失效了 会导致超卖以及库存异常问题

优化方案

 Stock stock=stockDao.get(id);
        if(stock.getNumber()-10<0){
            throw new Exception("库存不足");
        }
        stock.setNumber(stock.getNumber-10);
        Integer updateNumber=stockDao.excuteSql("update stock set number-=10 where id=:id and number>=0",id);
        //表示未能成功修改 
        if(updateNumber<=0){
            throw new Exception("库存不足");
        }

 

 

 

 

redis实现分布式锁需要考虑的因素

原文:https://www.cnblogs.com/LQBlog/p/12059965.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!