最近学习了LongAdder和Striped64,打算写一个qps统计工具,刚好项目上也需要。
借鉴了一下前人的文章:https://www.cnblogs.com/ganRegister/p/9369131.html
优化点:
1、增加了一个最大qps的统计
2、对上锁部分的代码进行了一些简化
3、修改了一个类成员的语义(Bucket.latestPassedTime --> Bucket.firstPassTime)
import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; public class QpsCalculator { private RollingNumber rollingNumber; public QpsCalculator() { this.rollingNumber = new RollingNumber(); } private final class RollingNumber { private int bucketNum; /** * 槽组 */ private Bucket[] buckets; /** * 时间片 */ private long bucketTimeSlice; /** * 目标槽位下标 */ private volatile Integer targetBucketPosition; /** * 临界跨槽时的时间点 */ private volatile long lastPassTimeCloseToTargetBucket; /** * 刷新槽位时使用的锁 */ private ReentrantLock enterNextBucketLock; /** * 最大qps */ private volatile long maxSummary; /** * 默认60个槽位,槽位的时间片为1000毫秒 */ public RollingNumber() { this(60, 1000); } public RollingNumber(int bucketNum, int millerSecond) { this.bucketNum = bucketNum; buckets = new Bucket[bucketNum]; for (int i = 0; i < bucketNum; i++) { buckets[i] = new Bucket(); } this.bucketTimeSlice = millerSecond; enterNextBucketLock = new ReentrantLock(); this.lastPassTimeCloseToTargetBucket = System.currentTimeMillis() - (2 * bucketTimeSlice); maxSummary = 0; } /** * 当前槽值统计 * * @return */ public long summary() { long time = System.currentTimeMillis(); targetBucketPosition = (int) (time / bucketTimeSlice) % bucketNum; long sum = buckets[targetBucketPosition].sum(); if(sum == 0){ System.out.println(); } return sum; } /** * 历史最大槽值统计 * * @return */ public long getMaxSummary() { return maxSummary; } /** * 清理历史记录 */ public void clearMaxSummary() { maxSummary = 0; } private void click() { long passTime = System.currentTimeMillis(); if (targetBucketPosition == null) { targetBucketPosition = (int) (passTime / bucketTimeSlice) % bucketNum; } Bucket currentBucket = buckets[targetBucketPosition]; if (passTime - lastPassTimeCloseToTargetBucket >= bucketTimeSlice) { if (enterNextBucketLock.isLocked()) { //忽略跳过 } else { try { enterNextBucketLock.lock(); //可以尝试用tryLock //其他线程出来 if (passTime - lastPassTimeCloseToTargetBucket >= bucketTimeSlice) { int nextTargetBucketPosition = (int) (passTime / bucketTimeSlice) % bucketNum; Bucket nextBucket = buckets[nextTargetBucketPosition]; if (!nextBucket.equals(currentBucket)) { //跨槽 long summary = buckets[targetBucketPosition].sum(); if (summary > maxSummary) { maxSummary = summary; } nextBucket.reset(passTime); //目标槽位变动 targetBucketPosition = nextTargetBucketPosition; } lastPassTimeCloseToTargetBucket = passTime; nextBucket.incr(); return; } else { currentBucket = buckets[targetBucketPosition]; } } finally { enterNextBucketLock.unlock(); } } } else { //没到接近跨槽临界值 } currentBucket.incr(); } } private final class Bucket { /** * 槽内计数器 */ private LongAdder adder; /** * 第一次时间,只记录一次 */ private long firstPassTime; public Bucket() { adder = new LongAdder(); firstPassTime = System.currentTimeMillis(); } /** * 计数 */ public void incr() { adder.increment(); } /** * 重制 */ public void reset(long time) { adder.reset(); firstPassTime = time; } /** * 统计 * * @return */ public long sum() { return adder.sum(); } public long getFirstPassTime() { return firstPassTime; } public long sumThenReset() { return adder.sumThenReset(); } } public static void main(String[] args) throws InterruptedException { QpsCalculator qpsCalculator = new QpsCalculator(); RollingNumber rollingNumber = qpsCalculator.rollingNumber; int threadNum = 4; int rollingCnt = 3000; CountDownLatch countDownLatch = new CountDownLatch(threadNum); List<Thread> threadList = new ArrayList<Thread>(); Random random = new Random(); for (int i = 0; i < threadNum; i++) { threadList.add(new Thread() { public void run() { for (int i = 0; i < rollingCnt; i++) { try { TimeUnit.MILLISECONDS.sleep(random.nextInt(2)); } catch (InterruptedException e) { e.printStackTrace(); } rollingNumber.click(); } countDownLatch.countDown(); } }); } long startTime = System.currentTimeMillis(); for (Thread thread : threadList) { thread.start(); } countDownLatch.await(); long endTime = System.currentTimeMillis(); long totalTime = endTime - startTime; System.out.println("totalMilliseconds: " + totalTime); System.out.println("current qps is " + rollingNumber.summary()); //计算totalTime会导致跨槽,current qps可能为0 System.out.println("max qps is " + rollingNumber.getMaxSummary()); } }
原文:https://www.cnblogs.com/but999/p/12961110.html