首页 > 其他 > 详细

Redis并发锁

时间:2020-04-20 18:12:07      阅读:46      评论:0      收藏:0      [点我收藏+]

Redis并发锁

1、 单线程redis为什么需要分布式锁

虽然一个redis是单进程单线程模式,但请求并不是一定按先后顺序处理的,多个请求会被redis交叉着执行,(就像单个cpu,在一个时间点只能执行一个命令,为什么多个线程执行的时候需要考虑线程安全的问题,因为程序执行的时候往往是一段代码,并不具有原子性,所以在执行一个命令后,就可能被其他的线程抢去执行权,那么就会造成线程安全的问题),redis类似,对于check and set这种操作,可能在check之后被其余的线程抢去了执行权,之后在set就会出现问题。

这里面涉及NIO/AIO的知识,redis需要锁也不是专门为了分布式锁,多个请求的异步交叉处理才是根本原因,一定程度上你可以理解为出现了对共享资源的"并发"访问,所以要锁

2、为什么不能用java中的锁机制

  • java中的锁synchronized或者Reentrantlock是针对同一个进程而言
  • 在同一进程中的线程是能通过共享的堆内存来进行通信的
  • 不同的线程可以通过对共享的内存进行标记,实现加锁,达到线程安全的目的
  • 而进程是相互隔离的,不能直接进行通信
  • 在这种情况下java中的锁就会失效,因此要利用线程间的通信机制来实现分布式的锁。

3、为什么redis还可以作为分布式锁呢

  • 当不同的进程中的线程针对同一份数据进行查找与修改的过程中,需要锁来确保在一个时间段内只能有一个线程来进行这些操作
  • 那么需要在公共内存(不是某一个进程中的内存)中来进行标记,如Redis、Memcache、数据库或者文件等,使得不同的进程中的线程能访问这些地方,从而达到加锁的目的
  • Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。
  • redis实现的分布式锁不仅能保证对自身数据的操作实现一致性,还能实现对其他共享数据库或者文件的一致性,主要通过对相同的redis内存进行加锁,实现加锁到释放锁中间的代码块的原子性。

4、java如何实现redis分布式锁

4.1 初始代码:

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相当于 jedis.get("stock")
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相当于 jedis.set(key,value)
            System.out.println("扣减成功,剩余库存:" + realStock + "");
        } else {
            System.out.println("扣减失败,库存不足");
                }

        return "end";
    }
}

但是当多个线程同时要访问deductStock()函数的时候,会存在超卖的现象
主要因为某一个线程正好在执行jedis.set(key,value)之前,其余线程就可能已经结束了获取数据的操作。

4.2 改进,利用SETNX加锁

  • setnx 只能对某一个键进行一次赋值
  • setnx name a; setnx name b; 第一条命令会成功,而第二条会失败

实现如下:

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {

        String lockKey = "protuct_001";
        Boolean result = stringRedisTemplate.opsForValue().selfAbsent(lockKey, "name");
        if (!result) {
            return "error";
        }

        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相当于 jedis.get("stock")
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相当于 jedis.set(key,value)
            System.out.println("扣减成功,剩余库存:" + realStock + "");
        } else {
            System.out.println("扣减失败,库存不足");
                }

        stringRedisTemplate.delete(lockKey);  --------------------- 1
        return "end";
    }
}

但是,当程序运行到步骤1之前就出现异常了,那么这个锁就会永久存在,不会被删除,造成死锁

4.3 使用try-catch来避免死锁

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {

        String lockKey = "protuct_001";

        try {

            Boolean result = stringRedisTemplate.opsForValue().selfAbsent(lockKey, "name");
            if (!result) {
                return "error";
            }

            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相当于 jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相当于 jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock + "");
            } else {
                System.out.println("扣减失败,库存不足");
                    }

        } finally {
            //释放锁
            stringRedisTemplate.delete(lockKey);  --------------------- 1
        }

        return "end";
    }
}

虽然解决了出现异常的情况,但是程序运行到1之前,挂掉了,还是不能将锁删除。

4.4 将锁加上超时时间,避免程序挂掉锁依然存在的情况

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {

        String lockKey = "protuct_001";

        try {

            Boolean result = stringRedisTemplate.opsForValue().selfAbsent(lockKey, "name");  -------------- 1

            //锁最多存活时间10秒
            stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);  ----------- 2

            if (!result) {
                return "error";
            }

            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相当于 jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相当于 jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock + "");
            } else {
                System.out.println("扣减失败,库存不足");
                    }

        } finally {
            //释放锁
            stringRedisTemplate.delete(lockKey);  --------------------- 3
        }

        return "end";
    }
}

但是,当程序执行完1处之后,在执行2处之前出现了问题,程序挂掉,那么还是没法执行程序2。

4.5 将多行代码封装成原子块,来解决上面问题

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {

        String lockKey = "protuct_001";

        try {
            //将两行代码封装成一个原子块代码
            Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "name", 10, TimeUnit.SECONDS);

            if (!result) {
                return "error";
            }

            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相当于 jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相当于 jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock + "");
            } else {
                System.out.println("扣减失败,库存不足");
                    }

        } finally {
            //释放锁
            stringRedisTemplate.delete(lockKey);  --------------------- 3
        }

        return "end";
    }
}

但是,当程序A的执行时间超过锁失效的时间,那么当锁失效的时候,新的程序B就能开始执行,并且当程序A执行完3处代码时候,将锁删除后,新的程序C又能进来,开始执行,如此,完全混乱,锁失效

4.6 只能让设置锁的线程来删除锁

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {

        String lockKey = "protuct_001";
        String clientId = UUID.randomUUID().toString();

        try {
            //将两行代码封装成一个原子块代码
            Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);

            if (!result) {
                return "error";
            }

            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相当于 jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相当于 jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock + "");
            } else {
                System.out.println("扣减失败,库存不足");
                    }

        } finally {

            //判断是否是设置锁的线程
            if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {  
                //释放锁
                stringRedisTemplate.delete(lockKey);  --------------------- 3
            }
        }

        return "end";
    }
}

但是,锁失效的时间还是不合理,当锁失效时间小于程序运行这个代码块的时间,还是能有其余的线程能进来这个程序

4.7 设置守护线程,延长锁的失效时间

  • 比如,设置一个timer线程,每隔10s,然后自动设置锁的失效时间为原来的时间
  • 当主线程执行完毕,或者挂掉,守护线程也会挂掉
  • 使用redisson框架来实现以上优化

如下:

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {

        String lockKey = "protuct_001";

        RLock redisLock = redisson.getLock(lockKey);

        try {
            //设置锁
            redisLock.lock(30, TimeUnit.SECONDS);

            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相当于 jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相当于 jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock + "");
            } else {
                System.out.println("扣减失败,库存不足");
                    }

        } finally {
            //释放锁
            redisLock.unlock();

        }

        return "end";
    }
}

但是,当Redis使用主从架构的时候,Master Redis坏掉,Slave Redis会变成新的Master Redis。那么当线程A刚给主节点上完锁,主节点就挂掉了,从节点变成主节点,并且原先主节点的锁还没有同步到从节点里面,线程B来的时候访问新的主节点(原来的从节点),这个时候线程A还在执行,B也在执行。这个时候可以用RedLock来解决。

参考链接:

https://www.zhihu.com/question/294599028
https://zhuanlan.zhihu.com/p/42056183
https://www.bilibili.com/video/av62657941?p=12

Redis并发锁

原文:https://www.cnblogs.com/Stephanie-boke/p/12739293.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!