首页 > 其他 > 详细

GoLang-channel原理

时间:2021-06-05 18:21:45      阅读:13      评论:0      收藏:0      [点我收藏+]

简介

channel一个类型管道,通过它可以在goroutine之间发送和接收消息。它是Golang在语言层面提供的goroutine间的通信方式。Go依赖于成为CSP的并发模型,通过Channel实现这种同步模式。Golang并发的核心哲学是不要通过共享内存进行通信。所以数据在不同协程中的传输都是通过拷贝的形式完成的。

简要源码解析

channel结构体

简单说明:

  • buf是有缓冲的channel所特有的结构,用来存储缓存数据。是个循环链表
  • sendxrecvx用于记录buf这个循环链表中的发送或者接收的index
  • lock是个互斥锁。
  • recvqsendq分别是接收(<-channel)或者发送(channel <- xxx)的goroutine抽象出来的结构体(sudog)的队列。是个双向链表
   type hchan struct {
       qcount   uint           // total data in the queue 当前队列里还剩余元素个数
       dataqsiz uint           // size of the circular queue 环形队列长度,即缓冲区的大小,即make(chan T,N) 中的N
       buf      unsafe.Pointer // points to an array of dataqsiz elements 环形队列指针
       elemsize uint16 //每个元素的大小
       closed   uint32 //标识当前通道是否处于关闭状态,创建通道后,该字段设置0,即打开通道;通道调用close将其设置为1,通道关闭
       elemtype *_type // element type 元素类型,用于数据传递过程中的赋值
       sendx    uint   // send index 环形缓冲区的状态字段,它只是缓冲区的当前索引-支持数组,它可以从中发送数据
       recvx    uint   // receive index 环形缓冲区的状态字段,它只是缓冲区当前索引-支持数组,它可以从中接受数据
       recvq    waitq  // list of recv waiters 等待读消息的goroutine队列
       sendq    waitq  // list of send waiters 等待写消息的goroutine队列
    
       // lock protects all fields in hchan, as well as several
       // fields in sudogs blocked on this channel.
       //
       // Do not change another G‘s status while holding this lock
       // (in particular, do not ready a G), as this can deadlock
       // with stack shrinking.
       lock mutex //互斥锁,为每个读写操作锁定通道,因为发送和接受必须是互斥操作
  }
  
  // sudog 代表goroutine
   type waitq struct {
        first *sudog
        last  *sudog
  }

至于为什么channel会使用循环链表作为缓存结构,我个人认为是在缓存列表在动态的sendrecv过程中,定位当前send或者recvx的位置、选择send的和recvx的位置比较方便吧,只要顺着链表顺序一直旋转操作就好。想不明白可以看这篇博客的动图:https://blog.csdn.net/phpduang/article/details/109278030

初始化

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	//编译器检查元素类型
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}
	
	//获取需要分配的内存
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// 如果 hchan 中的元素不包含有指针,那么就没什么和 GC 相关的信息了
	var c *hchan
	//多实用switch少用if else
	switch {
	//队列或元素大小为0
	case mem == 0:
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	//元素中不包含指针	
	case elem.kind&kindNoPointers != 0:
		//元素不包含指针。
		//一次调用分配hchan和buf。
		//这种情况,gc不会对 channel 中的元素进行 scan
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// 元素中包含指针
		//区别:调用了两次分配空间的函数
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
	}
	return c
}

make函数在创建channel的时候会在该进程的heap区申请一块内存,创建一个hchan结构体,返回执行该内存的指针,所以获取的的ch变量本身就是一个指针,在函数之间传递的时候是同一个channel。

发送数据

func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	//如果channel为nil
	if c == nil {
		if !block {
			return false
		}
		// nil channel 发送数据会永远阻塞下去
		// 挂起当前 goroutine
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	if debugChan {
		print("chansend: chan=", c, "\n")
	}
	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	}
	//检查是否阻塞
	//检查是否关闭,channel关闭之后不能发送
	//检查channel容量&&检查channel 接收队列是否为空
	//检查channel队列是否满了
	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)
	
	// channel 已被关闭,panic异常
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	//找到等待的接收者
	if sg := c.recvq.dequeue(); sg != nil {
		//直接把要发的数据拷贝给这个 receiver
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
	
	//如果channel队列还没满
	if c.qcount < c.dataqsiz {
		// 通道缓冲区中有可用空间。发送的元素入队。
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		//将 goroutine 的数据拷贝到 buffer 中
		typedmemmove(c.elemtype, qp, ep)
		// 将发送 index 加一
		c.sendx++
		//满了,重置index
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		//长度加1
		c.qcount++
		unlock(&c.lock)
		return true
	}

	if !block {
		unlock(&c.lock)
		return false
	}

	//在 channel 上阻塞,receiver 会帮我们完成后续的工作
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	// 将当前这个发送 goroutine 打包后的 sudog 入队到 channel 的 sendq 队列中
	c.sendq.enqueue(mysg)
	// 将这个发送 goroutine 从 Grunning -> Gwaiting
    // 进入休眠
	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
	
	//确保发送的值保持活动状态,直到接收者将其复制出来
	KeepAlive(ep)

	// 唤醒
	//如果不是合法的唤醒
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	//唤醒
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		//唤醒字后发现channel被关闭了
		panic(plainError("send on closed channel"))
	}
	//可唤醒
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

总结一下发送流程

  1. 先锁住整个channel
  2. 如果发送数据的过程中,先发现等待队列不为空,说明有阻塞等待获取channel中数据的协程,那么直接从recvq中取出协程,写入数据,就完成了
  3. 如果recvq为空,那么判断缓冲区是否满了,如果没满,把数据拷贝到缓冲区buf中,结束。如果满了,那么当前协程阻塞,封装成一个sudog放入发送队列sendq
  4. 释放lock

接收数据

func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}
	//如果channel为空
	if c == nil {
		if !block {
			return
		}
		// 挂起当前 goroutine
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	//同上
	// 非阻塞且没内容可收的情况下要直接返回
	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
	//加锁
	lock(&c.lock)

	//当前channel没有数据可读
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		//释放锁
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
	// sender 队列中有 sudog 在等待
    // 直接从该 sudog 中获取数据拷贝到当前 g 即可
	if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	//还有可读数据
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// 直接从 buffer 里拷贝数据
		typedmemclr(c.elemtype, qp)
		//接收索引 +1
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		//buffer 元素计数 -1// buffer 元素计数 -1
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}

	//没有可用的发送者,阻塞当前channel
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	// 进入 recvq 队列
	c.recvq.enqueue(mysg)
	//等待 Grunning -> Gwaiting
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	// 非正常唤醒
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	//唤醒
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}

总结一下接收流程

  1. 锁住整个channel

  2. 尝试在sendq等待队列中获取等待的goroutine

  3. 如果有等待的goroutine,而且缓冲区里没数据,取出goroutine并读取数据,然后唤醒这个goroutine,结束读取释放锁

  4. 如果有等待goroutine,且有缓冲区(缓冲区满了),从缓冲区队列首取数据,再从sendq取出一个goroutine,将goroutine中的数据存放到buf队列尾,结束读取释放锁。

  5. 如果没有等待的goroutine,且缓冲区有数据,直接读取缓冲区数据,结束释放锁。

  6. 如果没有等待的goroutine,且没有缓冲区或者缓冲区为空,将当前goroutine加入到sendq队列,进入睡眠,等待被写入goroutine唤醒,结束读取释放锁。

流程图

发送数据

技术分享图片

技术分享图片

技术分享图片

接收数据

技术分享图片

技术分享图片

GoLang-channel原理

原文:https://www.cnblogs.com/kabuda/p/14853280.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!