channel一个类型管道,通过它可以在goroutine之间发送和接收消息。它是Golang在语言层面提供的goroutine间的通信方式。Go依赖于成为CSP的并发模型,通过Channel实现这种同步模式。Golang并发的核心哲学是不要通过共享内存进行通信。所以数据在不同协程中的传输都是通过拷贝的形式完成的。
简单说明:
buf
是有缓冲的channel所特有的结构,用来存储缓存数据。是个循环链表sendx
和recvx
用于记录buf
这个循环链表中的发送或者接收的indexlock
是个互斥锁。recvq
和sendq
分别是接收(<-channel)或者发送(channel <- xxx)的goroutine抽象出来的结构体(sudog)的队列。是个双向链表 type hchan struct {
qcount uint // total data in the queue 当前队列里还剩余元素个数
dataqsiz uint // size of the circular queue 环形队列长度,即缓冲区的大小,即make(chan T,N) 中的N
buf unsafe.Pointer // points to an array of dataqsiz elements 环形队列指针
elemsize uint16 //每个元素的大小
closed uint32 //标识当前通道是否处于关闭状态,创建通道后,该字段设置0,即打开通道;通道调用close将其设置为1,通道关闭
elemtype *_type // element type 元素类型,用于数据传递过程中的赋值
sendx uint // send index 环形缓冲区的状态字段,它只是缓冲区的当前索引-支持数组,它可以从中发送数据
recvx uint // receive index 环形缓冲区的状态字段,它只是缓冲区当前索引-支持数组,它可以从中接受数据
recvq waitq // list of recv waiters 等待读消息的goroutine队列
sendq waitq // list of send waiters 等待写消息的goroutine队列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G‘s status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex //互斥锁,为每个读写操作锁定通道,因为发送和接受必须是互斥操作
}
// sudog 代表goroutine
type waitq struct {
first *sudog
last *sudog
}
至于为什么channel会使用循环链表作为缓存结构,我个人认为是在缓存列表在动态的send
和recv
过程中,定位当前send
或者recvx
的位置、选择send
的和recvx
的位置比较方便吧,只要顺着链表顺序一直旋转操作就好。想不明白可以看这篇博客的动图:https://blog.csdn.net/phpduang/article/details/109278030
func makechan(t *chantype, size int) *hchan {
elem := t.elem
//编译器检查元素类型
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
//获取需要分配的内存
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 如果 hchan 中的元素不包含有指针,那么就没什么和 GC 相关的信息了
var c *hchan
//多实用switch少用if else
switch {
//队列或元素大小为0
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
//元素中不包含指针
case elem.kind&kindNoPointers != 0:
//元素不包含指针。
//一次调用分配hchan和buf。
//这种情况,gc不会对 channel 中的元素进行 scan
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素中包含指针
//区别:调用了两次分配空间的函数
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
make函数在创建channel的时候会在该进程的heap区申请一块内存,创建一个hchan结构体,返回执行该内存的指针,所以获取的的ch变量本身就是一个指针,在函数之间传递的时候是同一个channel。
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//如果channel为nil
if c == nil {
if !block {
return false
}
// nil channel 发送数据会永远阻塞下去
// 挂起当前 goroutine
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
//检查是否阻塞
//检查是否关闭,channel关闭之后不能发送
//检查channel容量&&检查channel 接收队列是否为空
//检查channel队列是否满了
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// channel 已被关闭,panic异常
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//找到等待的接收者
if sg := c.recvq.dequeue(); sg != nil {
//直接把要发的数据拷贝给这个 receiver
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//如果channel队列还没满
if c.qcount < c.dataqsiz {
// 通道缓冲区中有可用空间。发送的元素入队。
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
//将 goroutine 的数据拷贝到 buffer 中
typedmemmove(c.elemtype, qp, ep)
// 将发送 index 加一
c.sendx++
//满了,重置index
if c.sendx == c.dataqsiz {
c.sendx = 0
}
//长度加1
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
//在 channel 上阻塞,receiver 会帮我们完成后续的工作
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 将当前这个发送 goroutine 打包后的 sudog 入队到 channel 的 sendq 队列中
c.sendq.enqueue(mysg)
// 将这个发送 goroutine 从 Grunning -> Gwaiting
// 进入休眠
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
//确保发送的值保持活动状态,直到接收者将其复制出来
KeepAlive(ep)
// 唤醒
//如果不是合法的唤醒
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
//唤醒
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
//唤醒字后发现channel被关闭了
panic(plainError("send on closed channel"))
}
//可唤醒
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
总结一下发送流程
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan {
print("chanrecv: chan=", c, "\n")
}
//如果channel为空
if c == nil {
if !block {
return
}
// 挂起当前 goroutine
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
//同上
// 非阻塞且没内容可收的情况下要直接返回
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//加锁
lock(&c.lock)
//当前channel没有数据可读
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
//释放锁
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// sender 队列中有 sudog 在等待
// 直接从该 sudog 中获取数据拷贝到当前 g 即可
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
//还有可读数据
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 直接从 buffer 里拷贝数据
typedmemclr(c.elemtype, qp)
//接收索引 +1
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
//buffer 元素计数 -1// buffer 元素计数 -1
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
//没有可用的发送者,阻塞当前channel
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 进入 recvq 队列
c.recvq.enqueue(mysg)
//等待 Grunning -> Gwaiting
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// 非正常唤醒
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
//唤醒
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
总结一下接收流程
锁住整个channel
尝试在sendq等待队列中获取等待的goroutine
如果有等待的goroutine,而且缓冲区里没数据,取出goroutine并读取数据,然后唤醒这个goroutine,结束读取释放锁
如果有等待goroutine,且有缓冲区(缓冲区满了),从缓冲区队列首取数据,再从sendq取出一个goroutine,将goroutine中的数据存放到buf队列尾,结束读取释放锁。
如果没有等待的goroutine,且缓冲区有数据,直接读取缓冲区数据,结束释放锁。
如果没有等待的goroutine,且没有缓冲区或者缓冲区为空,将当前goroutine加入到sendq队列,进入睡眠,等待被写入goroutine唤醒,结束读取释放锁。
原文:https://www.cnblogs.com/kabuda/p/14853280.html