首页 > 其他 > 详细

【学习笔记】一个高效qps统计工具

时间:2020-05-25 23:02:14      阅读:74      评论:0      收藏:0      [点我收藏+]

最近学习了LongAdder和Striped64,打算写一个qps统计工具,刚好项目上也需要。

借鉴了一下前人的文章:https://www.cnblogs.com/ganRegister/p/9369131.html

 

优化点:

1、增加了一个最大qps的统计

2、对上锁部分的代码进行了一些简化

3、修改了一个类成员的语义(Bucket.latestPassedTime --> Bucket.firstPassTime)

 

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;

public class QpsCalculator {

    private RollingNumber rollingNumber;

    public QpsCalculator() {
        this.rollingNumber = new RollingNumber();
    }

    private final class RollingNumber {

        private int bucketNum;

        /**
         * 槽组
         */
        private Bucket[] buckets;

        /**
         * 时间片
         */
        private long bucketTimeSlice;


        /**
         * 目标槽位下标
         */
        private volatile Integer targetBucketPosition;

        /**
         * 临界跨槽时的时间点
         */
        private volatile long lastPassTimeCloseToTargetBucket;

        /**
         * 刷新槽位时使用的锁
         */
        private ReentrantLock enterNextBucketLock;


        /**
         * 最大qps
         */
        private volatile long maxSummary;


        /**
         * 默认60个槽位,槽位的时间片为1000毫秒
         */
        public RollingNumber() {
            this(60, 1000);
        }

        public RollingNumber(int bucketNum, int millerSecond) {
            this.bucketNum = bucketNum;
            buckets = new Bucket[bucketNum];
            for (int i = 0; i < bucketNum; i++) {
                buckets[i] = new Bucket();
            }
            this.bucketTimeSlice = millerSecond;
            enterNextBucketLock = new ReentrantLock();
            this.lastPassTimeCloseToTargetBucket = System.currentTimeMillis() - (2 * bucketTimeSlice);
            maxSummary = 0;
        }


        /**
         * 当前槽值统计
         *
         * @return
         */
        public long summary() {
            long time = System.currentTimeMillis();
            targetBucketPosition = (int) (time / bucketTimeSlice) % bucketNum;
            long sum = buckets[targetBucketPosition].sum();
            if(sum == 0){
                System.out.println();
            }
            return sum;
        }

        /**
         * 历史最大槽值统计
         *
         * @return
         */
        public long getMaxSummary() {
            return maxSummary;
        }

        /**
         * 清理历史记录
         */
        public void clearMaxSummary() {
            maxSummary = 0;
        }

        private void click() {
            long passTime = System.currentTimeMillis();
            if (targetBucketPosition == null) {
                targetBucketPosition = (int) (passTime / bucketTimeSlice) % bucketNum;
            }
            Bucket currentBucket = buckets[targetBucketPosition];
            if (passTime - lastPassTimeCloseToTargetBucket >= bucketTimeSlice) {
                if (enterNextBucketLock.isLocked()) {
                    //忽略跳过
                } else {
                    try {
                        enterNextBucketLock.lock(); //可以尝试用tryLock
                        //其他线程出来
                        if (passTime - lastPassTimeCloseToTargetBucket >= bucketTimeSlice) {
                            int nextTargetBucketPosition = (int) (passTime / bucketTimeSlice) % bucketNum;
                            Bucket nextBucket = buckets[nextTargetBucketPosition];
                            if (!nextBucket.equals(currentBucket)) {
                                //跨槽
                                long summary = buckets[targetBucketPosition].sum();
                                if (summary > maxSummary) {
                                    maxSummary = summary;
                                }
                                nextBucket.reset(passTime);
                                //目标槽位变动
                                targetBucketPosition = nextTargetBucketPosition;
                            }
                            lastPassTimeCloseToTargetBucket = passTime;
                            nextBucket.incr();
                            return;
                        } else {
                            currentBucket = buckets[targetBucketPosition];
                        }
                    } finally {
                        enterNextBucketLock.unlock();
                    }
                }
            } else {
                //没到接近跨槽临界值
            }
            currentBucket.incr();
        }
    }

    private final class Bucket {
        /**
         * 槽内计数器
         */
        private LongAdder adder;

        /**
         * 第一次时间,只记录一次
         */
        private long firstPassTime;

        public Bucket() {
            adder = new LongAdder();
            firstPassTime = System.currentTimeMillis();
        }

        /**
         * 计数
         */
        public void incr() {
            adder.increment();
        }

        /**
         * 重制
         */
        public void reset(long time) {
            adder.reset();
            firstPassTime = time;
        }

        /**
         * 统计
         *
         * @return
         */
        public long sum() {
            return adder.sum();
        }

        public long getFirstPassTime() {
            return firstPassTime;
        }

        public long sumThenReset() {
            return adder.sumThenReset();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        QpsCalculator qpsCalculator = new QpsCalculator();
        RollingNumber rollingNumber = qpsCalculator.rollingNumber;
        int threadNum = 4;
        int rollingCnt = 3000;
        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        List<Thread> threadList = new ArrayList<Thread>();
        Random random = new Random();
        for (int i = 0; i < threadNum; i++) {
            threadList.add(new Thread() {
                public void run() {
                    for (int i = 0; i < rollingCnt; i++) {
                        try {
                            TimeUnit.MILLISECONDS.sleep(random.nextInt(2));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        rollingNumber.click();
                    }
                    countDownLatch.countDown();
                }
            });
        }

        long startTime = System.currentTimeMillis();
        for (Thread thread : threadList) {
            thread.start();
        }
        countDownLatch.await();
        long endTime = System.currentTimeMillis();
        long totalTime = endTime - startTime;
        System.out.println("totalMilliseconds:  " + totalTime);
        System.out.println("current qps is " + rollingNumber.summary()); //计算totalTime会导致跨槽,current qps可能为0
        System.out.println("max qps is " + rollingNumber.getMaxSummary());
    }
}

 

【学习笔记】一个高效qps统计工具

原文:https://www.cnblogs.com/but999/p/12961110.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!