漏桶算法
水先进入到漏桶里,漏桶以一定的速度出水,当流入速度大于漏出的速度,桶中的水会越来越多直到慢,再来请求会溢出。
算法思想是:
1.水(请求)从上方倒入水桶,从水桶下方流出(被处理);
2.来不及流出的水存在水桶中(缓冲),以固定速率流出;
3.水桶满后水溢出(丢弃)。
4.这个算法的核心是:缓存请求、匀速处理、多余的请求直接丢弃。
令牌桶算法
系统以固定速度往桶中放令牌,如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
算法思想:
1.令牌以固定速率产生,并缓存到令牌桶中;
2.令牌桶放满时,多余的令牌被丢弃;
3.请求被处理前,先从桶里获取一个令牌
4.当桶里没有令牌可取时,则拒绝服务
两种算法的区别
1.令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;
2.漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
3.令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌),并允许一定程度突发流量;
4.漏桶限制的是常量流出速率(即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2),从而平滑突发流入速率;
两者主要区别在于是否允许突发流量的处理。令牌桶允许一定程度的突发,而漏桶主要目的是平滑流入速率、对突发流量不做额外处理;
RateLimiter内部有两个实现:平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)。上述两种都是对令牌桶算法的实现,
RateLimiter默认使用的是 SmoothBursty 平滑突发限流。
思考:实现令牌桶算法需要计算生产令牌桶的方式,如何生成,guava的RateLimiter使用的是哪种呢?
方式一:定时任务
开启一个,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制,假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。
方式二:延迟计算
如下resync函数。该函数会在每次获取令牌之前调用,其实现思路为,若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只需要在获取令牌时计算一次即可。
void resync(long nowMicros) { // 如果 nextFreeTicket 在过去,则重新同步到现在 if (nowMicros > nextFreeTicketMicros) { // 计算生产令牌的数量 storedPermits = min(maxPermits, storedPermits + (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros()); // 更新生产令牌的时间 nextFreeTicketMicros = nowMicros; } }
原理分析
private long nextFreeTicketMicros = 0L;//下一次请求可以获取令牌的开始时间 private double storedPermits;//当前存储令牌数 private double maxPermits;//最大存储令牌数 private double stableIntervalMicros;//添加令牌时间间隔 void resync(long nowMicros) { // 如果 nextFreeTicket 在过去,则重新同步到现在 if (nowMicros > nextFreeTicketMicros) { // 计算生产令牌的数量 storedPermits = min(maxPermits, storedPermits + (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros()); // 更新生产令牌的时间 nextFreeTicketMicros = nowMicros; } } final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; // 当前存的令牌数是否够用 double storedPermitsToSpend = min(requiredPermits, this.storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; // 计算需要等待的时间 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); try { // 本次计算的nextFreeTicketMicros不返回 this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros); } catch (ArithmeticException e) { this.nextFreeTicketMicros = Long.MAX_VALUE; } // 计算使用后剩下令牌的数量 this.storedPermits -= storedPermitsToSpend; return returnValue; }
注意:该函数的返回是更新前的(上次请求计算的)nextFreeTicketMicros,而不是本次更新的nextFreeTicketMicros,通俗来讲,本次请求需要为上次请求的预消费行为埋单,这也是RateLimiter可以预消费(处理突发)的原理所在。若需要禁止预消费,则修改此处返回更新后的nextFreeTicketMicros值。
如何理解 stableIntervalMicros 这个时间呢?
下一次请求可以获取令牌的起始时间
由于RateLimiter允许预消费,上次请求预消费令牌后
下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌
MT的老用法
整流器
import com.google.common.collect.Maps; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; /** * 流量整型 */ public class TrafficShaper { private static final ConcurrentMap<String, RateLimiter> resourceLimiterMap = Maps.newConcurrentMap(); public static void enter(String resource) throws RateLimitException { enter(resource, 1, 1000, TimeUnit.MILLISECONDS); } public static void enter(String resource, int permits, long timeout, TimeUnit unit) throws RateLimitException { RateLimiter limiter = resourceLimiterMap.get(resource); if (limiter == null) { throw new RateLimitException(resource, resource + " is not exist"); } // 尝试从 RateLimiter 获取令牌,超时返回false if (!limiter.tryAcquire(permits, timeout, unit)) { throw new RateLimitException(resource, resource + " should not be visited so frequently"); } } public static void updateResourceQps(String resource, double qps) { if (qps < 1) { qps = 1D; } RateLimiter limiter = resourceLimiterMap.get(resource); if (limiter == null) { // 创建限流器 limiter = RateLimiter.create(qps); // 缓存限流器 RateLimiter putByOtherThread = resourceLimiterMap.putIfAbsent(resource, limiter); // 如何此时已经有,返回旧的 if (putByOtherThread != null) { limiter = putByOtherThread; } } } public static class RateLimitException extends Exception { private static final long serialVersionUID = 1L; private String resource; public String getResource() { return resource; } public RateLimitException(String resource, String msg) { super(msg); this.resource = resource; } @Override public synchronized Throwable fillInStackTrace() { return this; } } }
public enum RateLimiterEnum { DB_CHECK("db_check", 500); private String code; private int limiter; public String getCode() { return this.code; } public int getLimiter() { return limiter; } private RateLimiterEnum(String code, int limiter) { this.code = code; this.limiter = limiter; } public String getResourceKey() { return "rate_limiter_res_" + getCode(); } }
public class SwitchConfig { public static boolean set_acquire_token(){ return true; } public static int acquire_token_times() { return 3600; } public static int acquire_token_sleep_times() { // 默认10毫秒 return 10; } public static boolean qps_retry_record_switch() { return false; } }
public class QpsLimitUtil { public static void main(String[] args) { // 获取令牌,失败会重试,重试上线还没有获取到会抛出异常 // num 表示本次请求需要消耗令牌的个数 QpsLimitUtil.acquireTokenResult(RateLimiterEnum.DB_CHECK, 5); } static { //初始化RateLimiter updateResource(); } public static void updateResource() { TrafficShaper.updateResourceQps(RateLimiterEnum.DB_CHECK.getResourceKey(), 100); } public static void acquireTokenResult(RateLimiterEnum resRateLimiterEnum, int num) { acquireToken(resRateLimiterEnum.getResourceKey(), num); } private static boolean acquireToken(String resource, int num) { if (num < 1) { if (SwitchConfig.set_acquire_token()) { num = 1; } } if (num < 1) { return true; } // 循环获取令牌,每次获取一个令牌 for (int i = 0; i < num; i++) { acquireToken(resource); } return true; } private static boolean acquireToken(String resource) { int retryMax = SwitchConfig.acquire_token_times(); int retrySleep = SwitchConfig.acquire_token_sleep_times(); int retry = 0; do { //获取失败进行重试 try { TrafficShaper.enter(resource); return true; } catch (Exception e) { retry++; //重试次数超过限定跑出异常 if (retry == retryMax) { throw new RuntimeException("所有次数重试完还是失败!"); } else { if (SwitchConfig.qps_retry_record_switch()) { System.out.println("资源" + resource + "次数" + retry + "原因" + e.getCause()); } } //睡眠后再重试获取 ThreadSleep(retrySleep); } } while (retry < retryMax); return false; } public static void ThreadSleep(long millis) { //睡眠10毫秒再重试获取 try { Thread.sleep(millis); } catch (Exception e) { System.out.println("QpsLimitUtil sleep error"); } } }
使用的Demo
@Test public void testLimit() { // 创建限流器 RateLimiter rateLimiter = RateLimiter.create(1); // 准备线程池 ExecutorService threadPool = Executors.newFixedThreadPool(5); // 准备任务集合 Runnable task1 = new MyTask("task1", 5, rateLimiter); Runnable task2 = new MyTask("task2", 10, rateLimiter); Runnable task3 = new MyTask("task3", 8, rateLimiter); Runnable task4 = new MyTask("task4", 6, rateLimiter); Runnable task5 = new MyTask("task5", 8, rateLimiter); List<Runnable> taskList = Arrays.asList(task1, task2, task3, task4, task5); for (Runnable task : taskList) { threadPool.execute(task); } try { TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } private class MyTask implements Runnable { private String name; private Integer sleepSecond; private RateLimiter rateLimiter; public MyTask(String name, Integer sleepSecond, RateLimiter rateLimiter) { this.name = name; this.sleepSecond = sleepSecond; this.rateLimiter = rateLimiter; } @Override public void run() { long startMs = Instant.now().toEpochMilli(); rateLimiter.acquire(1); long endMs = Instant.now().toEpochMilli(); System.out.println("任务:" + name + ",获取令牌耗时" + (endMs - startMs) + "毫秒,开始执行了..."); try { TimeUnit.SECONDS.sleep(sleepSecond); } catch (InterruptedException e) { e.printStackTrace(); } } }
原文:https://www.cnblogs.com/zwy008/p/14828650.html