首页 > 其他 > 详细

30-单机限流

时间:2021-05-30 20:28:47      阅读:26      评论:0      收藏:0      [点我收藏+]

原理

漏桶算法
水先进入到漏桶里,漏桶以一定的速度出水,当流入速度大于漏出的速度,桶中的水会越来越多直到慢,再来请求会溢出。
算法思想是:
1.水(请求)从上方倒入水桶,从水桶下方流出(被处理);
2.来不及流出的水存在水桶中(缓冲),以固定速率流出;
3.水桶满后水溢出(丢弃)。
4.这个算法的核心是:缓存请求、匀速处理、多余的请求直接丢弃。

令牌桶算法
系统以固定速度往桶中放令牌,如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
算法思想:
1.令牌以固定速率产生,并缓存到令牌桶中;
2.令牌桶放满时,多余的令牌被丢弃;
3.请求被处理前,先从桶里获取一个令牌
4.当桶里没有令牌可取时,则拒绝服务

两种算法的区别
1.令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;
2.漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
3.令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌),并允许一定程度突发流量;
4.漏桶限制的是常量流出速率(即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2),从而平滑突发流入速率;
两者主要区别在于是否允许突发流量的处理。令牌桶允许一定程度的突发,而漏桶主要目的是平滑流入速率、对突发流量不做额外处理;

 

RateLimiter

RateLimiter内部有两个实现:平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)。上述两种都是对令牌桶算法的实现,

RateLimiter默认使用的是 SmoothBursty 平滑突发限流。

 

思考:实现令牌桶算法需要计算生产令牌桶的方式,如何生成,guava的RateLimiter使用的是哪种呢?

方式一:定时任务
开启一个,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制,假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。

方式二:延迟计算
如下resync函数。该函数会在每次获取令牌之前调用,其实现思路为,若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只需要在获取令牌时计算一次即可。

void resync(long nowMicros) {
    // 如果 nextFreeTicket 在过去,则重新同步到现在
    if (nowMicros > nextFreeTicketMicros) {
        // 计算生产令牌的数量
        storedPermits = min(maxPermits, storedPermits + (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros());
        // 更新生产令牌的时间
        nextFreeTicketMicros = nowMicros;
    }
}

 

原理分析

private long nextFreeTicketMicros = 0L;//下一次请求可以获取令牌的开始时间
private    double storedPermits;//当前存储令牌数
private    double maxPermits;//最大存储令牌数
private double stableIntervalMicros;//添加令牌时间间隔

void resync(long nowMicros) {
    // 如果 nextFreeTicket 在过去,则重新同步到现在
    if (nowMicros > nextFreeTicketMicros) {
        // 计算生产令牌的数量
        storedPermits = min(maxPermits, storedPermits + (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros());
        // 更新生产令牌的时间
        nextFreeTicketMicros = nowMicros;
    }
}

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    // 当前存的令牌数是否够用
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    double freshPermits = requiredPermits - storedPermitsToSpend;
    // 计算需要等待的时间
    long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros);
    try {
        // 本次计算的nextFreeTicketMicros不返回
        this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros);
    } catch (ArithmeticException e) {
        this.nextFreeTicketMicros = Long.MAX_VALUE;
    }
    // 计算使用后剩下令牌的数量
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
}

注意:该函数的返回是更新前的(上次请求计算的)nextFreeTicketMicros,而不是本次更新的nextFreeTicketMicros,通俗来讲,本次请求需要为上次请求的预消费行为埋单,这也是RateLimiter可以预消费(处理突发)的原理所在。若需要禁止预消费,则修改此处返回更新后的nextFreeTicketMicros值。

如何理解 stableIntervalMicros 这个时间呢?
下一次请求可以获取令牌的起始时间
由于RateLimiter允许预消费,上次请求预消费令牌后
下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌

 

 

 

MT的老用法

整流器

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

/**
 * 流量整型
 */
public class TrafficShaper {

    private static final ConcurrentMap<String, RateLimiter> resourceLimiterMap = Maps.newConcurrentMap();

    public static void enter(String resource) throws RateLimitException {
        enter(resource, 1, 1000, TimeUnit.MILLISECONDS);
    }

    public static void enter(String resource, int permits, long timeout, TimeUnit unit) throws RateLimitException {
        RateLimiter limiter = resourceLimiterMap.get(resource);
        if (limiter == null) {
            throw new RateLimitException(resource, resource + " is not exist");
        }
        // 尝试从 RateLimiter 获取令牌,超时返回false
        if (!limiter.tryAcquire(permits, timeout, unit)) {
            throw new RateLimitException(resource, resource + " should not be visited so frequently");
        }
    }

    public static void updateResourceQps(String resource, double qps) {
        if (qps < 1) {
            qps = 1D;
        }
        RateLimiter limiter = resourceLimiterMap.get(resource);
        if (limiter == null) {
            // 创建限流器
            limiter = RateLimiter.create(qps);
            // 缓存限流器
            RateLimiter putByOtherThread = resourceLimiterMap.putIfAbsent(resource, limiter);
            // 如何此时已经有,返回旧的
            if (putByOtherThread != null) {
                limiter = putByOtherThread;
            }
        }
    }

    public static class RateLimitException extends Exception {

        private static final long serialVersionUID = 1L;

        private String resource;

        public String getResource() {
            return resource;
        }

        public RateLimitException(String resource, String msg) {
            super(msg);
            this.resource = resource;
        }

        @Override
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    }
}
public enum RateLimiterEnum {

    DB_CHECK("db_check", 500);

    private String code;
    private int limiter;

    public String getCode() {
        return this.code;
    }

    public int getLimiter() {
        return limiter;
    }

    private RateLimiterEnum(String code, int limiter) {
        this.code = code;
        this.limiter = limiter;
    }

    public String getResourceKey() {
        return "rate_limiter_res_" + getCode();
    }
}
public class SwitchConfig {

    public static boolean set_acquire_token(){
        return true;
    }

    public static int acquire_token_times() {
        return 3600;
    }

    public static int acquire_token_sleep_times() {
        // 默认10毫秒
        return 10;
    }

    public static boolean qps_retry_record_switch() {
        return false;
    }
}
public class QpsLimitUtil {

    public static void main(String[] args) {
        // 获取令牌,失败会重试,重试上线还没有获取到会抛出异常
        // num 表示本次请求需要消耗令牌的个数
        QpsLimitUtil.acquireTokenResult(RateLimiterEnum.DB_CHECK, 5);
    }

    static {
        //初始化RateLimiter
        updateResource();
    }

    public static void updateResource() {
        TrafficShaper.updateResourceQps(RateLimiterEnum.DB_CHECK.getResourceKey(), 100);
    }

    public static void acquireTokenResult(RateLimiterEnum resRateLimiterEnum, int num) {
        acquireToken(resRateLimiterEnum.getResourceKey(), num);
    }

    private static boolean acquireToken(String resource, int num) {
        if (num < 1) {
            if (SwitchConfig.set_acquire_token()) {
                num = 1;
            }
        }
        if (num < 1) {
            return true;
        }
        // 循环获取令牌,每次获取一个令牌
        for (int i = 0; i < num; i++) {
            acquireToken(resource);
        }
        return true;
    }

    private static boolean acquireToken(String resource) {
        int retryMax = SwitchConfig.acquire_token_times();
        int retrySleep = SwitchConfig.acquire_token_sleep_times();
        int retry = 0;
        do {
            //获取失败进行重试
            try {
                TrafficShaper.enter(resource);
                return true;
            } catch (Exception e) {
                retry++;
                //重试次数超过限定跑出异常
                if (retry == retryMax) {
                    throw new RuntimeException("所有次数重试完还是失败!");
                } else {
                    if (SwitchConfig.qps_retry_record_switch()) {
                        System.out.println("资源" + resource + "次数" + retry + "原因" + e.getCause());
                    }
                }
                //睡眠后再重试获取
                ThreadSleep(retrySleep);
            }
        } while (retry < retryMax);
        return false;
    }

    public static void ThreadSleep(long millis) {
        //睡眠10毫秒再重试获取
        try {
            Thread.sleep(millis);
        } catch (Exception e) {
            System.out.println("QpsLimitUtil sleep error");
        }
    }
}

 

使用的Demo

    @Test
    public void testLimit() {
        // 创建限流器
        RateLimiter rateLimiter = RateLimiter.create(1);
        // 准备线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        // 准备任务集合
        Runnable task1 = new MyTask("task1", 5, rateLimiter);
        Runnable task2 = new MyTask("task2", 10, rateLimiter);
        Runnable task3 = new MyTask("task3", 8, rateLimiter);
        Runnable task4 = new MyTask("task4", 6, rateLimiter);
        Runnable task5 = new MyTask("task5", 8, rateLimiter);
        List<Runnable> taskList = Arrays.asList(task1, task2, task3, task4, task5);
        for (Runnable task : taskList) {
            threadPool.execute(task);
        }

        try {
            TimeUnit.MINUTES.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private class MyTask implements Runnable {
        private String name;
        private Integer sleepSecond;
        private RateLimiter rateLimiter;

        public MyTask(String name, Integer sleepSecond, RateLimiter rateLimiter) {
            this.name = name;
            this.sleepSecond = sleepSecond;
            this.rateLimiter = rateLimiter;
        }
        
        @Override
        public void run() {
            long startMs = Instant.now().toEpochMilli();
            rateLimiter.acquire(1);
            long endMs = Instant.now().toEpochMilli();
            System.out.println("任务:" + name + ",获取令牌耗时" + (endMs - startMs) + "毫秒,开始执行了...");
            try {
                TimeUnit.SECONDS.sleep(sleepSecond);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

 

30-单机限流

原文:https://www.cnblogs.com/zwy008/p/14828650.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!