首页 > 其他 > 详细

GO瞬间并发数控制

时间:2021-01-19 12:31:19      阅读:24      评论:0      收藏:0      [点我收藏+]

 

    var wg2 sync.WaitGroup
    wg2.Add(nums)

    xc :=0
    parallelNum := plt.MaxParallel
    var waitCount int32 = 0

    for i:=0;i<nums*lll;i=i+lll  {

    begin:

        if i % 30 == 1 {
            tools.L2 <- msg
        }

        if i % 10 == 1 {
            mm := fmt.Sprintf("子任务%v开始执行",i+1)
            tools.L2 <- mm
        }

        currentParallelNum := atomic.LoadInt32(&plt.currentParallel)
        if i % 10 == 1 {
            mm := fmt.Sprintf("当前任务数%v,最大并发数%v",currentParallelNum,parallelNum)
            tools.L2 <- mm
        }


        if currentParallelNum > parallelNum {
            waitCount++
            if waitCount > plt.MaxWaitCount {    //等待超过一定次数后,就放开一次并行度
                parallelNum = plt.MaxParallel2

                mm := fmt.Sprintf("当前等待次数%v超过最大等待次数%v,开始将并行数从%v增加到%v",waitCount,plt.MaxWaitCount,plt.MaxParallel,plt.MaxParallel2)
                waitCount = 0

                //tools.LogLevelByConfigFile(mm,2)
                //tools.LogTask(mm,2)
                tools.L2 <- mm
            }else {
                if parallelNum != plt.MaxParallel {
                    parallelNum = plt.MaxParallel
                }
            }

            mm := fmt.Sprintf("当前并行度%v超过最大并行度%v,开始等待",currentParallelNum,parallelNum)
            //tools.LogLevelByConfigFile(mm,2)
            //tools.LogTask(mm,2)
            tools.L2 <- mm
            tools.SleepByMil(100)
            tools.SleepByRandMil(3000*common.SleepInterval)
            goto begin
        }


        xc++
        mm := fmt.Sprintf("第 %v 个协程开始运行",xc)
        tools.L2 <- mm

        endIndex := i + lll
        if endIndex > l {
            endIndex = l
        }

        lstNew := (*sourceDataList)[i:endIndex]
        go func(lst *[]map[string]string,wg *sync.WaitGroup) {
            for _,row :=  range *lst {
                atomic.AddInt32(&plt.currentParallel,1)
                rowMap(&row,&plt.CommonStruct)          //每行数据该如何处理的函数
                atomic.AddInt32(&plt.currentParallel,-1)
            }
            wg.Done()
        }(&lstNew,&wg2)

        tools.SleepByMil(100)
    }
    wg2.Wait()

 

在for循环中加入一个时间等待,否则的话,GO会在瞬间启动上千个并发,可能会直接把程序打死;

tools.SleepByMil(100)

 如果不加这个时间等待,代码中的超过指定并发数开始等待的控制,根据控制不住;

GO瞬间并发数控制

原文:https://www.cnblogs.com/perfei/p/14296628.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!