context简单概述:Go服务器的每个请求都有自己的goroutine,而有的请求为了提高性能,会经常启动额外的goroutine处理请求,当该请求被取消或超时,该请求上的所有goroutines应该退出,防止资源泄露。那么context来了,它对该请求上的所有goroutines进行约束,然后进行取消信号,超时等操作。
而context优点就是简洁的管理goroutines的生命周期。context简单使用:
接下来模拟一个超时继续分析context
注意: 使用时遵循context规则 1. 不要将 Context放入结构体,Context应该作为第一个参数传 入,命名为ctx。 2. 即使函数允许,也不要传入nil的 Context。如果不知道用哪种 Context,可以使用context.TODO()。 3. 使用context的Value相关方法,只应该用于在程序和接口中传递 和请求相关数据,不能用它来传递一些可选的参数 4. 相同的 Context 可以传递给在不同的goroutine;Context 是 并发安全的。
使用net/http/pprof对goroutines进行查看:
package main import ( "context" "fmt" "net/http" _ "net/http/pprof" "time" ) func main() { go http.ListenAndServe(":8080", nil) ctx, _ := context.WithTimeout(context.Background(), (10 * time.Second)) go testA(ctx) select {} } func testA(ctx context.Context) { ctxA, _ := context.WithTimeout(ctx, (5 * time.Second)) ch := make(chan int) go testB(ctxA, ch) select { case <-ctx.Done(): fmt.Println("testA Done") return case i := <-ch: fmt.Println(i) } } func testB(ctx context.Context, ch chan int) { //模拟读取数据 sumCh := make(chan int) go func(sumCh chan int) { sum := 10 time.Sleep(10 * time.Second) sumCh <- sum }(sumCh) select { case <-ctx.Done(): fmt.Println("testB Done") <-sumCh return //case ch <- <-sumCh: 注意这样会导致资源泄露 case i := <-sumCh: fmt.Println("send", i) ch <- i } }
从执行中和执行后的结果来看,我们完美的关闭了不需要的goroutine,
概述中提到:
当应用场景是由一个请求衍生出多个goroutine完成需求,那它们之间就需要满足一定的约束关系,才能中止routine树,超时等操作。
那么是如何到达约束关系?
简单理解,Context 的调用以链式存在,通过WithXxx方法派生出新的 Context与当前父Context 关联,当父 Context 被取消时,其派生的所有 Context 都将取消。
上图WithCancel派生简单分析图,接下里我们进行源码分析,进一步了解Context的约束关系(WithXxx方法派生大概与WithCancel相同)。
Context源码分析:
type Context interface { Deadline() (deadline time.Time, ok bool) Done() <-chan struct{} Err() error Value(key interface{}) interface{} }
Deadline() 返回的time.Time 它为Context 的结束的时间,ok 表示是否有 deadline
Done() 返回一个信道,当对Context进行撤销或过期时,该信道就会关闭的,可以简单认为它是关闭信号。
Err() 当Done信道关闭后,Err会返回关闭的原因(如超时,手动关闭)Value(key interface{}) 一个 K-V 存储的方法
canceler提供了cancal函数,同时要求数据结构实现Context
type canceler interface { cancel(removeFromParent bool, err error) Done() <-chan struct{} }
//默认错误 var Canceled = errors.New("context canceled") var DeadlineExceeded = errors.New("context deadline exceeded")
实现Context的数据结构
type emptyCtx int func (*emptyCtx) Deadline() (deadline time.Time, ok bool) { return } func (*emptyCtx) Done() <-chan struct{} { return nil } func (*emptyCtx) Err() error { return nil } func (*emptyCtx) Value(key interface{}) interface{} { return nil } func (e *emptyCtx) String() string { switch e { case background: return "context.Background" case todo: return "context.TODO" } return "unknown empty Context" }
两个实现Context的空结构。
var ( background = new(emptyCtx) todo = new(emptyCtx) ) func Background() Context { return background } func TODO() Context { return todo }
cancelCtx结构体继承了Context,同时实现了canceler接口:
type cancelCtx struct { Context done chan struct{} // closed by the first cancel call. mu sync.Mutex children map[canceler]bool // set to nil by the first cancel call err error // 当被cancel时将会把err设置为非nil } func (c *cancelCtx) Done() <-chan struct{} { return c.done } func (c *cancelCtx) Err() error { c.mu.Lock() defer c.mu.Unlock() return c.err } func (c *cancelCtx) String() string { return fmt.Sprintf("%v.WithCancel", c.Context) } func (c *cancelCtx) cancel(removeFromParent bool, err error) { if err == nil { panic("context: internal error: missing cancel error") } c.mu.Lock() //timerCtx会频繁使用这块代码,因为派生出来 //timerCtx全部指向同一个cancelCtx. if c.err != nil { c.mu.Unlock() return // already canceled } c.err = err //关闭c.done close(c.done) //依次遍历c.children,每个child分别cancel for child := range c.children { // NOTE: acquiring the child‘s lock while holding parent‘s lock. child.cancel(false, err) } c.children = nil c.mu.Unlock() //如果removeFromParent为true,则将c从其parent的children中删除 if removeFromParent { removeChild(c.Context, c) } } //如果parent为valueCtx类型,将循环找最近parent //为CancelCtx类型的,找到就从父对象的children //map 中删除这个child,否则返回nil(context.Background或者 context.TODO) func removeChild(parent Context, child canceler) { p, ok := parentCancelCtx(parent) if !ok { return } p.mu.Lock() if p.children != nil { delete(p.children, child) } p.mu.Unlock() }
接下来分析Cancel相关代码
type CancelFunc func() //WithCancel方法返回一个继承parent的Context对 //象,同时返回的cancel方法可以用来关闭当前 //Context中的Done channel func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { c := newCancelCtx(parent) propagateCancel(parent, &c) return &c, func() { c.cancel(true, Canceled) } } func newCancelCtx(parent Context) cancelCtx { return cancelCtx{ Context: parent, done: make(chan struct{}), } } func propagateCancel(parent Context, child canceler) { if parent.Done() == nil { return // parent is never canceled } if p, ok := parentCancelCtx(parent); ok { p.mu.Lock() if p.err != nil { //如果找到的parent已经被cancel, //则将方才传入的child树给cancel掉 child.cancel(false, p.err) } else { //否则, 将child节点直接添加到parent的children中 //(这样就有了约束关系,向上的父亲指针不变 //,向下的孩子指针可以直接使用 ) if p.children == nil { p.children = make(map[canceler]bool) } p.children[child] = struct{}{} } p.mu.Unlock() } else { //如果没有找到最近的可以被cancel的parent, //则启动一个goroutine,等待传入的parent终止, //并cancel传入的child树,或者等待传入的child终结。 go func() { select { case <-parent.Done(): child.cancel(false, parent.Err()) case <-child.Done(): } }() } } //判断parent是否为cancelCtx类型 func parentCancelCtx(parent Context) (*cancelCtx, bool) { for { switch c := parent.(type) { case *cancelCtx: return c, true case *timerCtx: return &c.cancelCtx, true case *valueCtx: parent = c.Context default: return nil, false } } }
timerCtx 继承cancelCtx的结构体,这种设计就可以避免写重复代码,提高复用
type timerCtx struct { cancelCtx timer *time.Timer // Under cancelCtx.mu. 计时器 deadline time.Time } func (c *timerCtx) Deadline() (deadline time.Time, ok bool) { return c.deadline, true } func (c *timerCtx) String() string { return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, c.deadline.Sub(time.Now())) } // 与cencelCtx有所不同,除了处理cancelCtx.cancel, // 还回对c.timer进行Stop(),并将c.timer=nil func (c *timerCtx) cancel(removeFromParent bool, err error) { c.cancelCtx.cancel(false, err) if removeFromParent { // Remove this timerCtx from its parent cancelCtx‘s children. removeChild(c.cancelCtx.Context, c) } c.mu.Lock() if c.timer != nil { c.timer.Stop() c.timer = nil } c.mu.Unlock() }
timerCtx具体的两个方法:
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) { //如果parent的deadline比新传入的deadline早,则直接 //返回WithCancel,因为parent的deadline会先失效,而新的 //deadline根据不需要 if cur, ok := parent.Deadline(); ok && cur.Before(deadline) { // The current deadline is already sooner than the new one. return WithCancel(parent) } c := &timerCtx{ cancelCtx: newCancelCtx(parent), deadline: deadline, } propagateCancel(parent, c) //检查如果已经过期,则cancel新child树 d := deadline.Sub(time.Now()) if d <= 0 { c.cancel(true, DeadlineExceeded) // deadline has already passed return c, func() { c.cancel(true, Canceled) } } c.mu.Lock() defer c.mu.Unlock() if c.err == nil { //没有被cancel的话,就设置deadline之后cancel的计时器 c.timer = time.AfterFunc(d, func() { c.cancel(true, DeadlineExceeded) }) } return c, func() { c.cancel(true, Canceled) } } // WithTimeout简单暴力,直接把WithTimeout返回 func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) { return WithDeadline(parent, time.Now().Add(timeout)) }
valueCtx主要用来传递一些可比较操作的数据
func WithValue(parent Context, key, val interface{}) Context { if key == nil { panic("nil key") } if !reflect.TypeOf(key).Comparable() { panic("key is not comparable") } return &valueCtx{parent, key, val} type valueCtx struct { Context key, val interface{} } func (c *valueCtx) String() string { return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val) } func (c *valueCtx) Value(key interface{}) interface{} { if c.key == key { return c.val } return c.Context.Value(key) }
原文:http://www.cnblogs.com/zhangboyu/p/7735535.html