描述:说到请求限流,一般都会用到MQ,无论何种MQ,都需要生产者和消费者才能发挥MQ的强大作用。但在对接项目,可能就会出现对接方不能够配合使用MQ的情况。此时,使用线程池做限流也是一种可行的思路。
流程:
1.需手动实现一个线程池。说到线程池,要考虑的因素有:核心线程数,任务队列,最大线程数,线程空闲时间,保留策略。
①开启线程池,接受任务,每接受一个任务创建一条线程。
②当线程数达到核心线程数时,之后的任务放入任务队列中。建议使用阻塞队列,防止内存溢出。
③当任务队列饱和,会在线程池中创建额外的线程来处理任务,直至达到最大线程数。
④当在线程池中的这部分额外线程处于空闲状态,并且达到线程空闲时间的要求,这部分线程会被销毁。
⑤当达到最大线程数,依然有后续的任务要处理,此时就要对这部分任务的去留做出决策。提供三种保留策略:
Ⅰ.直接丢弃,不予处理。
Ⅱ.开辟脱离线程池的线程来处理。
Ⅲ.将任务队列中等待时间久的任务丢弃,加入后续任务。
2.请求限流,先要了解server的运行原理
①服务端需要有一个监听器用来监听请求连接。当客户端发送来一个请求,服务端会先和客户端建立tcp连接。
②开辟一条线程用来单独处理这条tcp连接中发送来的http请求,直至http请求读取完毕,返回响应。默认tcp连接会存活90秒。我们要执行请求限流的操作便在此处进行,详细操作看代码。
//线程池
package myroutine
import (
"fmt"
"strconv"
)
/**
* @ Author : jgbb
* @ Date : Created in 2019/9/4 13:19
* @ Description : TODO 线程池
* @ Modified by :
* @ Version : 1.0
*/
func Init(poolSize int,name string) *RoutinePool{
pool := &RoutinePool{
Queue:make(chan func()),
PoolSize:poolSize,
Name:name,
}
defer pool.ExeTask()
return pool
}
type RoutinePool struct {
//缓存任务
Queue chan func()
PoolSize int
Name string
}
// 添加任务到线程池
func (pool *RoutinePool) AddTask(task func()){
pool.Queue <- task
}
//执行任务
func (pool *RoutinePool) ExeTask(){
counter := make(chan int)
for i:=0;i<pool.PoolSize;i++ {
go func() {
j := <- counter//哪条线程
var count int64= 0//计数(线程跑了多少次)
var stdout =pool.Name+"\t线程"+strconv.Itoa(j)+"\t"
for task := range pool.Queue{
count++
fmt.Printf("%p\t%s\n",pool,stdout+strconv.FormatInt(count,10))
task()
}
}()
counter <- i
}
}
//对server源码修改
const(
DefaultPoolSize = 10
)
//每个请求对应的线程池
var PoolMap = make(map[string]*myroutine.RoutinePool)
//golang源码
func (srv *Server) Serve(l net.Listener) error {
defer l.Close()
if fn := testHookServerServe; fn != nil {
fn(srv, l)
}
var tempDelay time.Duration // how long to sleep on accept failure
if err := srv.setupHTTP2_Serve(); err != nil {
return err
}
srv.trackListener(l, true)
defer srv.trackListener(l, false)
baseCtx := context.Background() // base is always background, per Issue 16220
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, e := l.Accept()
if e != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
time.Sleep(tempDelay)
continue
}
return e
}
tempDelay = 0
c := srv.newConn(rw)
/***********************修改开始*******************************/
//将c.server(ctx)的处理过程放入线程池中
//首先需要请求path,根据path获取对应的线程池
c.r = &connReader{conn: c}
c.r.setReadLimit(c.server.initialReadLimitSize()) //若不setReadLimit,无法读取到缓冲流中的数据
c.bufr = newBufioReader(c.r)//用来读取流
s,err := c.bufr.Peek(100)//缓冲流使用peek(),游标不会进行计数,这样才能流中的数据在后面的处理中复用。否则后续读取流会从游标开始
news := make([]byte,0)
for i:=0;i<100;i++ {
news = append(news,s[i])
if s[i] == 10 {
//10表示换行符,到此获取到所需信息
break
}
}
if err != nil {
fmt.Errorf("my err:%v",err)
}
newss := string(news)
//请求path当作线程池名称
poolName := newss[strings.Index(newss,"/"):strings.LastIndex(newss," ")]
c.setState(c.rwc, StateNew) // before Serve can return
//go c.serve(ctx) //源码
//放入线程池处理请求
putPoolMap(poolName).AddTask(func() {
c.serve(ctx)
})
/***********************修改结束*******************************/
}
}
//生成线程池
//-参数1:线程池大小
//-参数2:线程池名称
func PutPoolMap(poolSize int,name string) *myroutine.RoutinePool{
if _,ok := PoolMap[name]; !ok {
//如果不存在对应的线程池,则生成一个
PoolMap[name] = myroutine.Init(poolSize,name)
}
//返回对应的线程池
return PoolMap[name]
}
//默认使用此方法生成线程池
//-参数1:线程池名称
func putPoolMap(name string) *myroutine.RoutinePool{
return PutPoolMap(DefaultPoolSize,name)
}
原文:https://www.cnblogs.com/asceticmonks/p/13266310.html