注:本文参考摘抄于《Go语言核心编程》
Go 中的goroutine 之间并没有父与子的关系,也就是没有所谓子进程退出后的通知机制,多个goroutine都是平行地被调度,多个goroutine如何协作设计通信、同步、通知和退出四个方面。
通信: chan通道当然是goroutine 之间通信的基础。注意这里的通信主要是指程序的数据通信。
同步: 不带缓冲的chan提供了一个天然的同步等待机制;当然sync.WaitGroup
也为多个goroutine协同工作提供一种同步等待机制。
通知: 这个通知和上面通信的数据不一样,通知通常不是业务数据,而是管理、控制流数据。要处理这个也有方法,在输入端绑定两个chan,一个用于业务流数据,另一个用于异常通信数据,然后使用select收敛进行处理。这个方案可以解决简单的问题,但不是一个通用的解决方案。
退出: goroutine 之间没有父子关系,如何通知goroutine退出?可以通过增加一个单独的通道,借助通道和select的广播机制(close channel to broadcast) 实现退出。
Go 语言在语法上处理某个goroutine退出通知机制很简单。但是遇到复杂的并发结构处理起来就显得力不从心。实际编程中goroutine会拉起新的goroutine,新的goroutine又会拉起另外一个新的goroutine,最终形成一个树状的结构。由于goroutine里并没有父子概念,这个树状结构知识在程序员头脑中抽象出来的,程序的执行模型并没有维护这么一个树状结构。
怎么通知这个树状上所有的goroutine退出?仅依靠语法层面的支持显然比较难处理。为此Go1.7 提供了一个标准库context来解决这个问题。它提供两种功能:退出通知和 元数据传递。context库的设计目的就是跟踪goroutine调用,在其内部维护一个调用树。并在这些调用树中传递通知和元数据。
context库的目的就是跟踪goroutine的调用树,并在这些goroutine 调用树中传递通知和元数据。两个目的:
在介绍context包之前,先理解context包的整体工作机制:
第一个创建Context 的goroutine 被称之为root节点。
root节点负责创建一个实现Context接口的具体对象,并将该对象作为参数传递到其新拉起的goroutine中,下有的goroutine可以继续封装该对象,再传递到更下游的goroutine。
Context对象在传递的过程中最终形成一个树状的数据结构,这样通过位于root节点(树的根节点)的Context对象就能遍历整个Context 对象树,通知和消息就可以通过root节点传递出去。实现了上游goroutine 对下游goroutine 的消息传递。
Context 是一个基本接口,所有的Context 对象都要实现该接口,context 的使用者在调用接口中都使用Context 作为参数类型。
type Context interface {
//如果Context实现了超时控制,则该方法返回ok true, deadline 为超时时间。
Deadline() (deadline time.Time, ok bool)
//后端被调用的goroutine应该监听该方法返回的chan,以便及时释放资源
Done() <-chan struct{}
//Done 返回的chan 收到通知的时候,才可以访问Err() 获知为什么原因被取消
Err() error
//可以访问上游goroutine传递给下游goroutine 的值。
Value(key interface{}) interface{}
}
canceler 接口是一个扩展接口,规定了取消通知的Context具体类型需要实现的接口。context包中的具体类型*cancelCtx
和*timeCtx
都实现了该接口。如下:
//一个context对象如果实现了canceler 接口,则可以被取消。
type canceler interface {
//创建cancel接口实例的goroutine 调用cancel方法通知后续创建的goroutine退出
cancel(removeFromParent bool, err error)
//Done 方法返回的chan需要后端的goroutine 来监听,并及时退出
Done() <-chan struct{}
}
emptyCtx 实现了Context 接口,但不具备任何功能。因为其所有的额方法都是空实现。其存在的目的是作为Context对象树的根(root节点)。因为context包的使用思路就是不停地调用context包提供的包装函数来创建具有特殊功能的Context实例,每一个Context 实例的创建都以上一个Context对象为参数,最终形成一个树状结构。如下
//emptyCtx 实现了Context 接口
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
package 定义了两个全局变量和两个封装函数,返回两个emptyCtx实例对象,实际使用时通过调用这两个封装函数来构造Context 的root节点。 如下:
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}
// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it‘s unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
func TODO() Context {
return todo
}
cancelCtx 是一个实现了Context 接口的具体类型,同时实现了canceler接口。canceler 具有退出通知方法。注意退出通知机制不但能够通知自己,也能够逐层通知其children 节点。如下
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
func (c *cancelCtx) Value(key interface{}) interface{} {
if key == &cancelCtxKey {
return c
}
return c.Context.Value(key)
}
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
func contextName(c Context) string {
if s, ok := c.(stringer); ok {
return s.String()
}
return reflectlite.TypeOf(c).String()
}
func (c *cancelCtx) String() string {
return contextName(c.Context) + ".WithCancel"
}
// cancel closes c.done, cancels each of c‘s children, and, if
// removeFromParent is true, removes c from its parent‘s children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
// NOTE: acquiring the child‘s lock while holding parent‘s lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
timeCtx 是一个实现了Context 接口的具体类型,内部封装了cancelCtx 类型实例。同时有一个deadline变量,用来实现定时退出通知。如下:
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) String() string {
return contextName(c.cancelCtx.Context) + ".WithDeadline(" +
c.deadline.String() + " [" +
time.Until(c.deadline).String() + "])"
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx‘s children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
valueCtx 是一个实现了Context 接口的具体类型,内部封装了Context接口类型。同时封装了一个k/v 的存储变量。valueCtx 可用来传递通知信息。如下:
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) String() string {
return contextName(c.Context) + ".WithValue(type " +
reflectlite.TypeOf(c.key).String() +
", val " + stringify(c.val) + ")"
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
如下两个函数是构造Context取消树的根节点对象,根节点对象用作后续With包装函数的实参。
func Background() Context
func TODO() Context
With 包装函数
//创建一个带有退出通知的Context具体对象。内部创建一个cancelCtx 的类型实例。
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {}
//创建一个带有超时通知的Context具体对象。内部创建一个timeCtx 的类型实例。
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc){}
//
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc){}
//创建一个能够传递数据的Context对象,内部创建一个valueCtx的实例类型
func WithValue(parent Context, key, val interface{}) Context {}
这些函数都有一个共同的特点——parent参数,其实这就是实现Context通知树的必备条件。在goroutine 调用链中,Context的实例被逐层地包装并传递,每层又可以对传进来的Context实例在封装自己所需要的功能。整个调用链树只需要一个数据结构来维护,这个维护逻辑在这些包装对象函数内部实现。
With 开头的构造函数是给外部程序使用的API接口函数。Context具体对象的联调关系是在With函数内部维护的。下面展示的是With 内部使用的通用函数。
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
return // parent is never canceled
}
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
判断parent的方法Done 返回值是否是nil。如果是,则说明parent不是一个可取消的Context 对象,也就无所谓取消构造树。说明child 就是取消构造树的根(root)
如果parent 的方法Done 返回值不是nil。则向上回溯自己的祖先是否是cancelCtx 的类型实例。如果是,则将child的子节点注册维护到那棵关系树里。
如果向上回溯自己的祖先都不是cancelCTX类型实例,则说明整个链条的取消树是不连续的。此时只需要监听parent 和自己的取消信号即可。
如下函数,判断parent中是否封装有*cancelCtx
字段。或者接口里面存放的底层类型是否是*cancelCtx
parentCancelCtx(parent Context) (*cancelCtx, bool) {}
如下函数,如果parent 封装的*cancelCtx
类型字段,或者接口里存放的底层类型是*cancelCtx
类型,将其构造树上的节点删除。
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
package main
import (
"context"
"fmt"
"time"
)
//定义一个新类型,包含一个Context 字段
type otherContext struct {
context.Context
}
func main() {
//使用context.Background 构建一个withCancel 类型的上下文
ctxa, work1Cancel := context.WithCancel(context.Background())
//work模拟退出通知
go work(ctxa, "work1")
//使用withDeadline 包装前面的上下文对象ctxa
tm := time.Now().Add(3 * time.Second)
ctxb, _ := context.WithDeadline(ctxa, tm)
//work模拟超时通知
go work(ctxa, "work2")
//使用with 对象包装前面的上下文对象ctxb
oc := otherContext{ctxb}
ctxc := context.WithValue(oc, "key", "this is some things")
go workWithValue(ctxc, "work3")
//休眠10s,让work2 、work3 退出
time.Sleep(5 * time.Second)
//显示调用work1 的cancel 方法通知其退出
work1Cancel()
fmt.Println("================")
fmt.Println("work1 exec cancel ...")
fmt.Println("================")
//等待work1 打印退出信息
time.Sleep(2 * time.Second)
fmt.Println("all things done")
}
//模拟逻辑处理
func work(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Printf("%s get msg to cancel\n", name)
return
default:
fmt.Printf("%s is running\n", name)
time.Sleep(1* time.Second)
}
}
}
//等待前端的退出通知,并试图获取context 传递的数据
func workWithValue(ctx context.Context, name string){
for {
select {
case <- ctx.Done():
fmt.Printf("%s get msg to cancel\n", name)
return
default:
value := ctx.Value("key").(string)
fmt.Printf("%s is running value=%s\n", name , value)
time.Sleep(1 * time.Second)
}
}
}
结果如下:
work2 is running
work1 is running
work3 is running value=this is some things
work1 is running
work3 is running value=this is some things
work2 is running
work2 is running
work1 is running
work3 is running value=this is some things
work1 is running
work2 is running
work3 is running value=this is some things
work1 is running
work2 is running
work3 get msg to cancel
work2 is running
work1 get msg to cancel
================
work1 exec cancel ...
================
work2 get msg to cancel
all things done
在使用Context的过程中,程序在底层实际上维护了两条关系链。理解这个关系链对于理解context包非常有好处。引用关系链如下:
func propagateCancel(parent Context, child canceler) {}
程序有一层这样的树状结构,本示例是一个链表结构
ctxa.children ---> ctxb
ctxb.children ---> ctxc
这个树提供一种从根节点开始遍历树的方法。context包的取消广播通知的核心就是基于这一点实现的。
取消通知沿着这条链从根节点向下层节点逐层广播。当然也可以在任意一个子树上发布取消通知,一样会扩散到整棵树。
示例程序中ctxa 收到退出通知。会通知到其绑定到的work2,同时会广播给ctxc 绑定的work3.
ctxc.Context --> oc
ctxc.Context --> ctxb
ctxc.Context.Context.cancelCtx --> ctxa
ctxc.Context.Context.cancelCtx.Context ---> new(emptyCtx)
这个关系链主要用来切断当前Context 实例和上层的Context 实例之间的关系。ctxb 调用了退出通知或者定时器到期了。ctxb 后续就没有必要通知广播树继续存在,它需要找到自己的parent,然后执行如下逻辑,把自己从广播树上清理掉。
根据上文示例流程整理出使用Context 包的一般流程
func Background() Context
func TODO() Context
这些包装函数是context package 的核心。几乎所有的封装都是从包装函数开始的。原因很简单,使用context 包的核心就是使用其退出广播功能.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {}
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {}
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {}
func WithValue(parent Context, key, val interface{}) Context {}
将上一步创建的对象作为实参传递给后续启动的并发函数(通常作为函数的第一个参数),每个并发函数内部可以继续使用包装函数对传进来的Context 进行包装,添加自己所需要的功能。
顶端的goroutine 在超时后调用cancel 退出通知函数。通知树上所有的goroutine 函数释放资源。
后端的goroutine 通过chan 监听Context.Done 返回的chan,及时响应前端goroutine 退出通知,一般停止本次处理,释放所占用的资源。
首先要清楚使用context 包主要是解决goroutine的退出通知,传递数据只是一个额外功能。可以使用它传递一些元信息。总之使用context传递的信息不能影响业务的正常逻辑,程序不要期待在context 中传递一些必须的参数等,没有这些参数,程序也应该能够正常工作。
interface{}
类型的值,编译器不能进行严格的类型校验。interface{}
到具体类型需要使用类型断言和接口查询,有一定的运行期开销和性能损耗context 包提供的核心功能时多个goroutine 之间的退出机制,传递数据只是一个辅助功能,应该谨慎使用context 传递数据。
原文:https://www.cnblogs.com/roverliang/p/13638731.html