首页 > 其他 > 详细

redis事务

时间:2020-02-06 18:56:43      阅读:64      评论:0      收藏:0      [点我收藏+]

 

# -*- coding: utf-8 -*-
import redis
import time
import threading
from redis import WatchError
from redis_lock import synchronized  # redis_lock.py文件中实现

REDIS_DATABASE = {
    HOST: localhost,
    PORT: 6379,
    DB: 0
}

rds = redis.Redis(host=REDIS_DATABASE[HOST], port=REDIS_DATABASE[PORT], db=REDIS_DATABASE[DB])
rds.delete(mount)
rds.incr("mount", amount=100)


class TestRedis(threading.Thread):
    def __init__(self, t_num):
        self.t_num = t_num
        super(TestRedis, self).__init__()

    def run(self):
        # self.no_transaction()
        # self.optimistic_lock()
        self.pessimistic_lock()

    def no_transaction(self):
        """
        无事务
        :return:
        """
        mount = int(rds.get(mount))
        if mount > 0:
            print(t_num=%s, mount=%s % (self.t_num, mount))
            rds.decr(mount)

    def optimistic_lock(self):
        """
        乐观锁
        :return:
        """
        while 1:
            with rds.pipeline(transaction=True) as r_pip:
                r_pip.watch(mount)
                try:
                    r_pip.multi()
                    mount = int(rds.get(mount))
                    if mount > 0:
                        r_pip.decr(mount)
                    r_pip.execute()
                    return
                except WatchError:
                    r_pip.unwatch()

    @synchronized(rds, "lock", 1000)
    def pessimistic_lock(self):
        """
        悲观锁
        :return:
        """
        mount = int(rds.get(mount))
        if mount > 0:
            rds.decr(mount)


tests = []
for i in xrange(300):
    t = TestRedis(i+1)
    tests.append(t)

s = time.time()
for t in tests:
    t.start()
for t in tests:
    t.join()

print(result mount=%s, time=%s % (rds.get(mount), (time.time()-s)*1000))

 

 

redis_lock.py

# coding: utf-8
"""
    redis 分布式悲观锁,需要解决以下几个问题
    1、A获取锁后崩溃,需要能将锁释放
    2、A获取锁后处理时间过长,导致锁过期,被B获取,A处理完后错误的将B锁释放
    
    redis.Redis()会有些问题,连接最好使用redis.StrictRedis()
"""

import math
import time
import uuid
from contextlib import contextmanager
from functools import wraps

from redis import WatchError


def acquire_lock(conn, lock_name, acquire_timeout=1, lock_timeout=1):
    """
    获取锁
    :param conn: redis连接
    :param lock_name: 锁名称
    :param acquire_timeout: 获取锁最长等待时间,-1为永久阻塞等待
    :param lock_timeout: 锁超时时间
    :return: 
    """

    def should_acquire():
        if acquire_timeout == -1:
            return True
        acquire_end = time.time() + acquire_timeout
        return time.time() < acquire_end

    identity = str(uuid.uuid1())
    lock_timeout = int(math.ceil(lock_timeout))
    while should_acquire():
        if conn.set(lock_name, identity, ex=lock_timeout, nx=True):
            return identity
        else:
            pttl = conn.pttl(lock_name)
            # Redis or StrictRedis
            # 如果使用的是Redis , 可能会存在pttl为0 但是显示为None的情况
            if pttl is None or pttl == -1:
                conn.expire(lock_name, lock_timeout)
        time.sleep(.1)
    return None


def release_lock(conn, lock_name, identity):
    pipe = conn.pipeline(True)
    while True:
        try:
            pipe.watch(lock_name)
            if pipe.get(lock_name) == identity:
                pipe.delete(lock_name)
                return True
            pipe.unwatch()
            break
        except WatchError:
            pass
    return False


@contextmanager
def lock(conn, lock_name, lock_timeout):
    """
    with lock(conn, "lock", 10):
        do something
    """
    id_ = None
    try:
        id_ = acquire_lock(conn, lock_name, -1, lock_timeout)
        yield id_
    finally:
        release_lock(conn, lock_name, id_)


def synchronized(conn, lock_name, lock_timeout):
    """
    @synchronized(conn, "lock", 10)
    def fun():
        counter = int(r.get("counter"))
        counter += 1
        r.set("counter", counter)
    """

    def decorator(func):
        @wraps(func)
        def wrap(*args, **kwargs):
            with lock(conn, lock_name, lock_timeout):
                return func(*args, **kwargs)

        return wrap

    return decorator


if __name__ == __main__:
    import redis

    r = redis.Redis("localhost", db=5)

    id_ = acquire_lock(r, "lock", acquire_timeout=1, lock_timeout=10)
    release_lock(r, "lock", id_)
    with lock(r, "lock", 1):
        print "do something"


    @synchronized(r, "lock", 10)
    def fun():
        counter = int(r.get("counter"))
        counter += 1
        r.set("counter", counter)


    for i in range(10000):
        fun()

 

redis事务

原文:https://www.cnblogs.com/aaron-agu/p/12269623.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!